从WordCount看hadoop执行流程
< 返回列表时间: 2019-07-04来源:OSCHINA
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
准备
要执行Map reduce程序,首先得安装hadoop,hadoop安装可以参考 hadoop安装
启动hdfs和yarn start-dfs.cmd start-yarn.cmd
创建待处理数据目录: hadoop fs -mkdir /wc hadoop fs -mkdir /wc/in # 查看目录 hadoop fs -ls -R /
上传待处理数据文件: hadoop fs -put file:///G:\note\bigdata\hadoop\wordcount\word1.txt /wc/in hadoop fs -put file:///G:\note\bigdata\hadoop\wordcount\word2.txt /wc/in
其中数据文件内容如下: word1.txt hello world hello hadoop
word2.txt hadoop world hadoop learn

WordCount与Map Reduce流程 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.StringTokenizer; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text,Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.out.println("[map-key]:" + key + " [map-value]:" + value); StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while (stringTokenizer.hasMoreTokens()){ word.set(stringTokenizer.nextToken()); context.write(word,one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; StringBuffer sb = new StringBuffer(); for(IntWritable num : values){ sum += num.get(); sb.append(num); sb.append("、"); } result.set(sum); context.write(key,result); System.out.println("[reduce-key]:" + key + " [reduce-values]:" + sb.substring(0,sb.length()-1)); } } //job:http://localhost:8088/ //hdfs:http://localhost:9870/ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); configuration.set("fs.default.name", "hdfs://localhost:9000"); Job job = Job.getInstance(configuration, "WC"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); // job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inPath = new Path("/wc/in/"); Path outPath = new Path("/wc/out/"); FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); System.exit(job.waitForCompletion(true) ? 0:1); } }
如果输出目录已经存在,可以使用下面的命令删除: hadoop fs -rm -r /wc/out
我们先来看一下程序的输出: [map-key]:0 [map-value]:hadoop world [map-key]:14 [map-value]:hadoop learn [map-key]:0 [map-value]:hello world [map-key]:13 [map-value]:hello hadoop [reduce-key]:hadoop [reduce-values]:1、1、1 [reduce-key]:hello [reduce-values]:1、1 [reduce-key]:learn [reduce-values]:1 [reduce-key]:world [reduce-values]:1、1
从输出我们可以推测hadoop的map过程是:hadoop把待处理的文件 按行 拆分,每一行调用map函数,map函数的key就是每一行的起始位置,值就是这一行的值。
map处理之后,再按key-value的形式写中间值。
reduce函数就是处理这些中间过程,参数的key就是map写入的key,value就是,map之后按key分组的value。
Combin过程
再Map和Reduce中间还可以加入Combin过程,用于处理中间结果,减少网络间数据传输的数据量。
Map->Reduce->Combin
我们把上面程序中job.setCombinerClass(IntSumReducer.class);注释去掉就可以获取到有Combiner的输出: [map-key]:0 [map-value]:hadoop world [map-key]:14 [map-value]:hadoop learn [reduce-key]:hadoop [reduce-values]:1、1 [reduce-key]:learn [reduce-values]:1 [reduce-key]:world [reduce-values]:1 [map-key]:0 [map-value]:hello world [map-key]:13 [map-value]:hello hadoop [reduce-key]:hadoop [reduce-values]:1 [reduce-key]:hello [reduce-values]:1、1 [reduce-key]:world [reduce-values]:1 [reduce-key]:hadoop [reduce-values]:2、1 [reduce-key]:hello [reduce-values]:2 [reduce-key]:learn [reduce-values]:1 [reduce-key]:world [reduce-values]:1、1
从上面的输出我们可以看到,map之后有一个reduce输出,其实是combin操作,combin和reduce的区别是combin是在单节点内部执行的,为了减小中间数据。
注意:combin操作必须满足结合律,例如: 加法,求总和:a+b+c+d = (a+b) + (c+d) 最大值:max(a+b+c+d) = max(max(a,b),max(c,d))
均值就不能使用combin操作: (a+b+c+d)/4 明显不等价于 ((a+b)/2 + (c+d)/2)/2
pom文件 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.curitis</groupId> <artifactId>hadoop-learn</artifactId> <version>1.0.0</version> <properties> <spring.version>5.1.3.RELEASE</spring.version> <junit.version>4.11</junit.version> <hadoop.version>3.0.2</hadoop.version> <parquet.version>1.10.1</parquet.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- parquet --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-column</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-common</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-encoding</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <!--test--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
热门排行