用户登录
用户注册

分享至

mapreduce 文件操作

  • 作者: 达?矢抾哆拉?
  • 来源: 51数据库
  • 2020-09-30
你看日志的第三行,你的job的jobid是job_local_0001说明你的job是在本地运行的,并不是在分布式环境下,但是你的url是hdfs://master:9000/说明你是在hdfs上创建文件。这个问题说明当job运行在local环境下时不能操作hdfs。
解决方法:把你的项目打包成jar文件,提交集群运行即可,或者把uri的scheme改成file:///也可以,但是此时文件是创建在本地



  众所周知,从hadoop 0.20.x之后,hadoop引入了新版的mapreduce
api,目前hadoop已经到了1.0版本,但是网上所有mapreduce教程还是使用的旧版mapreduce api,因此决定研究一下新版api。
首先是准备一下用于mapreduce的源文件,如下所示:

1900,35.3

1900,33.2

....

1905,38.2

1905,37.1

如上所示,记录的是每个年份的温度值,现在要求出每个年份最高的温度值,这是一个典型的mapreduce可以很好处理的问题,在map阶段,得出[1900,(35.3,
333.2,...)],....[1905, (38.2, 37.1, ....)],然后再通过reduce阶段求出每个年份最高温度值。

首先是写出mapreduce类,这和旧版api比较类似,但是需要注意的是,这里引用的新包:org.apache.hadoop.mapreduce.*而不是原来的org.apache.hadoop.mapred.*,具体程序如下所示:

package com.bjcic.hadoop.guide;

import java.io.bufferedreader;

import java.io.file;

import java.io.fileinputstream;

import java.io.ioexception;

import java.io.inputstreamreader;

import org.apache.hadoop.fs.path;

import org.apache.hadoop.io.doublewritable;

import org.apache.hadoop.io.longwritable;

import org.apache.hadoop.io.text;

import org.apache.hadoop.mapreduce.job;

import org.apache.hadoop.mapreduce.mapper;

import org.apache.hadoop.mapreduce.reducer;

import org.apache.hadoop.mapreduce.reducer.context;

import org.apache.hadoop.mapreduce.lib.input.fileinputformat;

import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;

public class maxtptr {

public static class maxtptrmapper extends mapper {

@override

public void map(longwritable key, text value, context context)

throws ioexception, interruptedexception {

string[] items = value.tostring().split(",");

context.write(new text(items[0]), new
doublewritable(double.parsedouble(items[1])));

}

}

public static class maxtptrreducer extends reducer {

@override

public void reduce(text key, iterable values, context
context)

throws ioexception, interruptedexception {

double maxtptr = double.min_value;

for (doublewritable val : values) {

maxtptr = math.max(val.get(), maxtptr);

}

context.write(key, new doublewritable(maxtptr));

}

}

public static void main(string[] argv) {

//jobconf conf = new jobconf(maxtptr.class);

job job = null;

try {

job = new job();

job.setjarbyclass(maxtptr.class);

fileinputformat.addinputpath(job, new path("input"));

fileoutputformat.setoutputpath(job, new path("output"));

job.setmapperclass(maxtptrmapper.class);

job.setreducerclass(maxtptrreducer.class);

job.setoutputkeyclass(text.class);

job.setoutputvalueclass(doublewritable.class);

system.exit( job.waitforcompletion(true) ? 0 : 1 );

} catch (ioexception e) {

// todo auto-generated catch block

e.printstacktrace();

} catch (interruptedexception e) {

// todo auto-generated catch block

e.printstacktrace();

} catch (classnotfoundexception e) {

// todo auto-generated catch block

e.printstacktrace();

}

}

}

对上面的代码有以下点需要说明:

扩展基类发生了变化,分别为mapper和reducer
引入了重要的context类
提交并运行任务采用的是新类job而不是旧版本的jobconf

将该源文件连同src目录上传到伪分布式部署的hadoop机器上,首先需要编译并打包hadoop任务:

javac -classpath $hadoop_home/hadoop-core-1.0.1.jar -d classes
src/com/bjcic/hadoop/guide/maxtptr.java

jar -cvf ./maxtptr.jar -c classes/ .

将在当前目录生成maxtptr.jar。

转到$hadoop_home目录,首先确保hadoop已经运行在伪分布式模式下,如果没有则启动hadoop(bin/start-all.sh,注意如果启动不成功,可以先运行bin/hadoop
namenode -format)。

上传温度文件到hadoop的hdfs文件系统中:

$>bin/hadoop dfs -put /data_dir/tempreture.txt input

运行hadoop任务:

$>bin/hadoop jar maxtptr.jar com.bjcic.hadoop.guide.maxtptr input
output

从hdfs中取下输出结果:

$>bin/hadoop dfs -get output /data_dir/

此时会在data_dir目录下建立output目录,其中就有生成的结果文件。
软件
前端设计
程序设计
Java相关