用户登录
用户注册

分享至

mapreduce 源码

  • 作者: andy53
  • 来源: 51数据库
  • 2020-09-23
在这个实例中,我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的MapReduce
程序。
尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C++、Python等语言来实现Hadoop程序。尽管Hadoop官方网站给的示例程序是使用Jython编写并打包成Jar文件,这样显然造成了不便,其实,不一定非要这样来实现,我们可以使用Python与Hadoop 关联进行编程,看看位于/src/examples/python/WordCount.py 的例子,你将了解到我在说什么。

我们想要做什么?

我们将编写一个简单的 MapReduce 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。
我们的这个例子将模仿 WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。结果也以文本形式输出,每一行包含一个单词和单词出现的次数,两者中间使用制表符来想间隔。

先决条件

编写这个程序之前,你学要架设好Hadoop 集群,这样才能不会在后期工作抓瞎。如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu Linux 上搭建(同样适用于其他发行版linux、unix)

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立单节点的 Hadoop 集群

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多节点的 Hadoop 集群

Python的MapReduce代码

使用Python编写MapReduce代码的技巧就在于我们使用了 HadoopStreaming 来帮助我们在Map 和 Reduce间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好其他事。这是真的,别不相信!

Map: mapper.py

将下列的代码保存在/home/hadoop/mapper.py中,他将从STDIN读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:
注意:要确保这个脚本有足够权限(chmod +x /home/hadoop/mapper.py)。

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\\t%s' % (word, 1)在这个脚本中,并不计算出单词出现的总数,它将输出 " 1" 迅速地,尽管可能会在输入中出现多次,计算是留给后来的Reduce步骤(或叫做程序)来实现。当然你可以改变下编码风格,完全尊重你的习惯。

Reduce: reducer.py

将代码存储在/home/hadoop/reducer.py 中,这个脚本的作用是从mapper.py 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。
同样,要注意脚本权限:chmod +x /home/hadoop/reducer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

# maps words to their counts
word2count = {}

# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

# parse the input we got from mapper.py
word, count = line.split('\\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass

# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))

# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\\t%s'% (word, count)
测试你的代码(cat data | map | sort | reduce)

我建议你在运行MapReduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果
这里有一些建议,关于如何测试你的Map和Reduce的功能:
——————————————————————————————————————————————
\r\n
# very basic test
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
——————————————————————————————————————————————
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/reducer.py
bar 1
foo 3
labs 1
——————————————————————————————————————————————

# using one of the ebooks as example input
# (see below on where to get the ebooks)
hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py
The 1
Project 1
Gutenberg 1
EBook 1
of 1
[...]
(you get the idea)

quux 2

quux 1

——————————————————————————————————————————————

在Hadoop平台上运行Python脚本

为了这个例子,我们将需要三种电子书:

The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson\r\n
The Notebooks of Leonardo Da Vinci\r\n
Ulysses by James Joyce
下载他们,并使用us-ascii编码存储 解压后的文件,保存在临时目录,比如/tmp/gutenberg.

hadoop@ubuntu:~$ ls -l /tmp/gutenberg/
total 3592
-rw-r--r-- 1 hadoop hadoop 674425 2007-01-22 12:56 20417-8.txt
-rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
-rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
hadoop@ubuntu:~$

复制本地数据到HDFS

在我们运行MapReduce job 前,我们需要将本地的文件复制到HDFS中:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg gutenberg
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
/user/hadoop/gutenberg
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg
Found 3 items
/user/hadoop/gutenberg/20417-8.txt 674425
/user/hadoop/gutenberg/7ldvc10.txt 1423808
/user/hadoop/gutenberg/ulyss12.txt 1561677

执行 MapReduce job

现在,一切准备就绪,我们将在运行Python MapReduce job 在Hadoop集群上。像我上面所说的,我们使用的是
HadoopStreaming 帮助我们传递数据在Map和Reduce间并通过STDIN和STDOUT,进行标准化输入输出。

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/*
-output gutenberg-output
在运行中,如果你想更改Hadoop的一些设置,如增加Reduce任务的数量,你可以使用“-jobconf”选项:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-jobconf mapred.reduce.tasks=16 -mapper ...

一个重要的备忘是关于Hadoop does not honor mapred.map.tasks
这个任务将会读取HDFS目录下的gutenberg并处理他们,将结果存储在独立的结果文件中,并存储在HDFS目录下的
gutenberg-output目录。
之前执行的结果如下:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/*
-output gutenberg-output

additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob: map 0% reduce 0%
[...] INFO streaming.StreamJob: map 43% reduce 0%
[...] INFO streaming.StreamJob: map 86% reduce 0%
[...] INFO streaming.StreamJob: map 100% reduce 0%
[...] INFO streaming.StreamJob: map 100% reduce 33%
[...] INFO streaming.StreamJob: map 100% reduce 70%
[...] INFO streaming.StreamJob: map 100% reduce 77%
[...] INFO streaming.StreamJob: map 100% reduce 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021

[...] INFO streaming.StreamJob: Output: gutenberg-output hadoop@ubuntu:/usr/local/hadoop$

正如你所见到的上面的输出结果,Hadoop 同时还提供了一个基本的WEB接口显示统计结果和信息。
当Hadoop集群在执行时,你可以使用浏览器访问 http://localhost:50030/ ,如图:

检查结果是否输出并存储在HDFS目录下的gutenberg-output中:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg-output
Found 1 items
/user/hadoop/gutenberg-output/part-00000 903193 2007-09-21 13:00
hadoop@ubuntu:/usr/local/hadoop$

可以使用dfs -cat 命令检查文件目录

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat gutenberg-output/part-00000
"(Lo)cra" 1
"1490 1
"1498," 1
"35" 1
"40," 1
"A 2
"AS-IS". 2
"A_ 1
"Absoluti 1
[...]
hadoop@ubuntu:/usr/local/hadoop$

注意比输出,上面结果的(")符号不是Hadoop插入的。

转载仅供参考,版权属于原作者。祝你愉快,满意请采纳哦




  michaelg.noll在他的blog中提到如何在hadoop中用python编写mapreduce程序,韩国的gogamza在其bolg中也提到如何用c编写mapreduce程序(我稍微修改了一下原程序,因为他的map对单词切分使用tab键)。我合并他们两人的文章,也让国内的hadoop用户能够使用别的语言来编写mapreduce程序。  首先您得配好您的hadoop集群,这方面的介绍网上比较多,这儿给个链接(hadoop学习笔记二安装部署)。hadoopstreaming帮助我们用非java的编程语言使用mapreduce,streaming用stdin(标准输入)和stdout(标准输出)来和我们编写的map和reduce进行数据的交换数据。任何能够使用stdin和stdout都可以用来编写mapreduce程序,比如我们用python的sys.stdin和sys.stdout,或者是c中的stdin和stdout。  我们还是使用hadoop的例子wordcount来做示范如何编写mapreduce,在wordcount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在map程序中会接受到这批文档每一行的数据,然后我们编写的map程序把这一行按空格切开成一个数组。并对这个数组遍历按"1"用标准的输出输出来,代表这个单词出现了一次。在reduce中我们来统计单词的出现频率。    pythoncode  map:mapper.py  #!/usr/bin/envpythonimportsys#mapswordstotheircountsword2count={}#inputcomesfromstdin(standardinput)forlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#splitthelineintowordswhileremovinganyemptystringswords=filter(lambdaword:word,line.split())#increasecountersforwordinwords:#writetheresultstostdout(standardoutput);#whatweoutputherewillbetheinputforthe#reducestep,i.e.theinputforreducer.py##tab-delimited;thetrivialwordcountis1print'%s\t%s'%(word,1)  reduce:reducer.py  #!/usr/bin/envpythonfromoperatorimportitemgetterimportsys#mapswordstotheircountsword2count={}#inputcomesfromstdinforlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptvalueerror:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically;##thisstepisnotrequired,wejustdoitsothatour#finaloutputwilllookmoreliketheofficialhadoop#wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstostdout(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)  ccode  map:mapper.c  #include#include#include#include#definebuf_size2048#definedelim"\n"intmain(intargc,char*argv[]){charbuffer[buf_size];while(fgets(buffer,buf_size-1,stdin)){intlen=strlen(buffer);if(buffer[len-1]=='\n')buffer[len-1]=0;char*querys=index(buffer,'');char*query=null;if(querys==null)continue;querys+=1;/*nottoinclude'\t'*/query=strtok(buffer,"");while(query){printf("%s\t1\n",query);query=strtok(null,"");}}return0;}h>h>h>h>  reduce:reducer.c  #include#include#include#include#definebuffer_size1024#definedelim"\t"intmain(intargc,char*argv[]){charstrlastkey[buffer_size];charstrline[buffer_size];intcount=0;*strlastkey='\0';*strline='\0';while(fgets(strline,buffer_size-1,stdin)){char*strcurrkey=null;char*strcurrnum=null;strcurrkey=strtok(strline,delim);strcurrnum=strtok(null,delim);/*necessarytocheckerrorbut.*/if(strlastkey[0]=='\0'){strcpy(strlastkey,strcurrkey);}if(strcmp(strcurrkey,strlastkey)){printf("%s\t%d\n",strlastkey,count);count=atoi(strcurrnum);}else{count+=atoi(strcurrnum);}strcpy(strlastkey,strcurrkey);}printf("%s\t%d\n",strlastkey,count);/*flushthecount*/return0;}h>h>h>h>  首先我们调试一下源码:  chmod+xmapper.pychmod+xreducer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./reducer.pybar1foo3labs1quux2g++mapper.c-omapperg++reducer.c-oreducerchmod+xmapperchmod+xreducerecho"foofooquuxlabsfoobarquux"|./mapper|./reducerbar1foo2labs1quux1foo1quux1  你可能看到c的输出和python的不一样,因为python是把他放在词典里了.我们在hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.  在hadoop中运行程序  首先我们要下载我们的测试文档wget页面中摘下的用php编写的mapreduce程序,供php程序员参考:map:mapper.php  #!/usr/bin/php$word2count=array();//inputcomesfromstdin(standardinput)while(($line=fgets(stdin))!==false){//removeleadingandtrailingwhitespaceandlowercase$line=strtolower(trim($line));//splitthelineintowordswhileremovinganyemptystring$words=preg_split('/\w/',$line,0,preg_split_no_empty);//increasecountersforeach($wordsas$word){$word2count[$word]+=1;}}//writetheresultstostdout(standardoutput)//whatweoutputherewillbetheinputforthe//reducestep,i.e.theinputforreducer.pyforeach($word2countas$word=>$count){//tab-delimitedecho$word,chr(9),$count,php_eol;}?>  reduce:mapper.php  #!/usr/bin/php$word2count=array();//inputcomesfromstdinwhile(($line=fgets(stdin))!==false){//removeleadingandtrailingwhitespace$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;}//sortthewordslexigraphically////thissetisnotrequired,wejustdoitsothatour//finaloutputwilllookmoreliketheofficialhadoop//wordcountexamplesksort($word2count);//writetheresultstostdout(standardoutput)foreach($word2countas$word=>$count){echo$word,chr(9),$count,php_eol;}?>  作者:马士华发表于:2008-03-05
软件
前端设计
程序设计
Java相关