用户登录
用户注册

分享至

java mapreduce api

  • 作者: 竹楼一夜听风雨
  • 来源: 51数据库
  • 2020-09-28
众所周知,从Hadoop 0.20.x之后,Hadoop引入了新版的MapReduce
API,目前Hadoop已经到了1.0版本,但是网上所有MapReduce教程还是使用的旧版MapReduce API,因此决定研究一下新版API。
首先是准备一下用于MapReduce的源文件,如下所示:

1900,35.3

1900,33.2

....

1905,38.2

1905,37.1

如上所示,记录的是每个年份的温度值,现在要求出每个年份最高的温度值,这是一个典型的MapReduce可以很好处理的问题,在Map阶段,得出[1900,(35.3,
333.2,...)],....[1905, (38.2, 37.1, ....)],然后再通过Reduce阶段求出每个年份最高温度值。

首先是写出MapReduce类,这和旧版API比较类似,但是需要注意的是,这里引用的新包:org.apache.hadoop.mapreduce.*而不是原来的org.apache.hadoop.mapred.*,具体程序如下所示:

package com.bjcic.hadoop.guide;

import java.io.BufferedReader;

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Reducer.Context;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTptr {

public static class MaxTptrMapper extends Mapper {

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String[] items = value.toString().split(",");

context.write(new Text(items[0]), new
DoubleWritable(Double.parseDouble(items[1])));

}

}

public static class MaxTptrReducer extends Reducer {

@Override

public void reduce(Text key, Iterable values, Context
context)

throws IOException, InterruptedException {

double maxTptr = Double.MIN_VALUE;

for (DoubleWritable val : values) {

maxTptr = Math.max(val.get(), maxTptr);

}

context.write(key, new DoubleWritable(maxTptr));

}

}

public static void main(String[] argv) {

//JobConf conf = new JobConf(MaxTptr.class);

Job job = null;

try {

job = new Job();

job.setJarByClass(MaxTptr.class);

FileInputFormat.addInputPath(job, new Path("input"));

FileOutputFormat.setOutputPath(job, new Path("output"));

job.setMapperClass(MaxTptrMapper.class);

job.setReducerClass(MaxTptrReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(DoubleWritable.class);

System.exit( job.waitForCompletion(true) ? 0 : 1 );

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (ClassNotFoundException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

对上面的代码有以下点需要说明:

扩展基类发生了变化,分别为Mapper和Reducer
引入了重要的Context类
提交并运行任务采用的是新类Job而不是旧版本的JobConf

将该源文件连同src目录上传到伪分布式部署的Hadoop机器上,首先需要编译并打包Hadoop任务:

javac -classpath $HADOOP_HOME/hadoop-core-1.0.1.jar -d classes
src/com/bjcic/hadoop/guide/MaxTptr.java

jar -cvf ./MaxTptr.jar -C classes/ .

将在当前目录生成MaxTptr.jar。

转到$HADOOP_HOME目录,首先确保Hadoop已经运行在伪分布式模式下,如果没有则启动Hadoop(bin/start-all.sh,注意如果启动不成功,可以先运行bin/hadoop
namenode -format)。

上传温度文件到Hadoop的HDFS文件系统中:

$>bin/hadoop dfs -put /data_dir/tempreture.txt input

运行Hadoop任务:

$>bin/hadoop jar MaxTptr.jar com.bjcic.hadoop.guide.MaxTptr input
output

从HDFS中取下输出结果:

$>bin/hadoop dfs -get output /data_dir/

此时会在data_dir目录下建立output目录,其中就有生成的结果文件。




  你看日志的第三行,你的job的jobid是job_local_0001说明你的job是在本地运行的,并不是在分布式环境下,但是你的url是hdfs://master:9000/说明你是在hdfs上创建文件。这个问题说明当job运行在local环境下时不能操作hdfs。
解决方法:把你的项目打包成jar文件,提交集群运行即可,或者把uri的scheme改成file:///也可以,但是此时文件是创建在本地
软件
前端设计
程序设计
Java相关