mapreduce
mapreduce官网,mapreduce是什么,原理,编程,说明手册
mapreduce是什么?
MapReduce是一种用于处理和分析大规模数据集的编程模型和计算框架。它最初由Google提出,并在Apache Hadoop项目中得到广泛应用。MapReduce的核心思想是将计算任务分成两个阶段:Map阶段和Reduce阶段。在Map阶段,输入数据被拆分成多个独立的数据块,并由多个Mapper并行处理。每个Mapper将输入数据转换成键值对的形式,生成中间结果。在Reduce阶段,中间结果被合并和处理,生成最终的输出结果。
mapreduce官网: https://hadoop.apache.org/
MapReduce主要功能
MapReduce框架提供了自动处理任务并行化、数据划分、跨节点通信和故障恢复等功能。它可以在大规模集群上运行,利用多台计算机的计算能力和存储空间进行高效的分布式计算。
MapReduce对于处理大规模数据集、并行计算和分布式存储非常有效。它已经成为处理大数据的重要工具之一,被广泛应用于数据分析、搜索引擎、日志处理等领域。
Hadoop核心组件之一:分布式计算的方案MapReduce,是一种编程模型,用于大规模数据集的并行运算,其中Map(映射)和Reduce(归约)。
MapReduce既是一个编程模型,也是一个计算组件,处理的过程分为两个阶段,Map阶段:负责把任务分解为多个小任务,Reduce负责把多个小任务的处理结果进行汇总。其中Map阶段主要输入是一对Key-Value,经过map计算后输出一对Key-Value值;然后将相同Key合并,形成Key-Value集合;再将这个Key-Value集合转入Reduce阶段,经过计算输出最终Key-Value结果集。
MapReduce可以实现基于上千台服务器并发工作,提供很强大的数据处理能力,如果其中单台服务挂掉,计算任务会自动转义到另外节点执行,保证高容错性;但是MapReduce不适应于实时计算与流式计算,计算的数据是静态的。
MapReduce官方手册
综述
Hadoop MapReduce是一个软件框架。它能够很容易的创建以一种可靠,容错的方式在商用机器上的大集群上并行的处理大量的数据。
一个MapReduce job通常将输入的数据集拆分成独立的块。Map任务以完全并行的方式处理这些块。框架对map的输出进行排序,进而作为输入提供给reduce任务。通常来说,job的输入和输出都保存在一个文件系统中。框架负责调度任务,监控任务并重新执行失败了的任务。
通常来说,计算节点和存储节点是相同的,也就是说,MapReduce框架和HDFS运行在相同的节点集上。这样的配置能够保证框架在已经存在数据的节点上有效的调度任务,进而在不同集群间获得一个非常高的总带宽。
MapReduce框架由一个单一的主ResourceManager,每个集群节点上的一个从NodeManager以及每个应用上一个MRAppMaster组成。
应用至少会指定输入/输出位置以及通过实现合适的接口和抽象类来提供map和reduce功能。这些,以及其他job参数,组成job配置(configuration)。
然后,Hadoop job客户端提交job(jar/可执行的文件等等)以及配置ResourceManger。ResoureManger 负责给从节点分发软件/配置,调度和监督任务,反馈状态和诊断信息给job客户端。
虽然Hadoop框架是由java实现的,但是MapReduce应用并不需要使用java编写。
输入和输出
MapReduce框架完全以<键,值>形式操作,也就是说,框架将输入给job的数据视为<键,值>对,并且产生一个<键,值>对集作为job的输出。
键和值类必须通过框架序列化,因此需要实现Writable接口。除此之外,key类必须实现WritableComparable接口以辅助框架的排序。
一个MapReducejob的输入输出类型如下所示:
(input) <k1, v1>-> map -> <k2,v2> -> combine -> <k2,v2> -> reduce -> <k3,v3> (output)
实例:WordCount v1.0
在详细介绍细节前,让我们看一个MapReduce 应用,这对于帮助我们认识它是如何工作的非常有用。
WordCount是一个简单应用,它计算一个输入集中每个单词的出现次数。
这个例子适用于本地单机、伪分布和全分布hadoop安装
源代码:
-
import java.io.IOException;
-
import java.util.StringTokenizer;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
public class WordCount {
-
-
public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
private final static IntWritable one = new IntWritable(1);
-
private Text word = new Text();
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
StringTokenizer itr = new StringTokenizer(value.toString());
-
while (itr.hasMoreTokens()) {
-
word.set(itr.nextToken());
-
context.write(word, one);
-
}
-
}
-
}
-
-
public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
private IntWritable result = new IntWritable();
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
int sum = 0;
-
for (IntWritable val : values) {
-
sum += val.get();
-
}
-
result.set(sum);
-
context.write(key, result);
-
}
-
}
-
-
public static void main(String[] args) throws Exception {
-
Configuration conf = new Configuration();
-
Job job = Job.getInstance(conf, “word count”);
-
job.setJarByClass(WordCount.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
FileInputFormat.addInputPath(job, new Path(args[0]));
-
FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
System.exit(job.waitForCompletion(true) ? 0 : 1);
-
}
-
}
运行:
export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
编译WordCount.java且创建一个jar:
$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class
假设:
/user/joe/wordcount/input –HDFS中的输入目录
/user/joe/wordcount/output –HDFS中的输出目录
作为输入的样例文本文件:
$ bin/hadoop fs -ls /user/joe/wordcount/input/ /user/joe/wordcount/input/file01 /user/joe/wordcount/input/file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
运行应用:
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
输出:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000`
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
应用能够通过使用选项 –files来在该任务当前工作目录下指定多个以逗号分开的路径列表。-libjars选项允许应用给map和reduce的类路径增加jar。选项 –archives 允许传递以逗号分开的备份作为参数。
运行带有-libjars,-files和-archives的wordcount例子:
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
其中,myarchive.zip会解压放在已“myarchive.zip”的名称的路径下。
用户能够使用#号给通过-files和-archives传递的文件和备份指定一个不同的名称。
比如:
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output
解析(walk-through)
WordCount应用是非常简单的。
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
StringTokenizer itr = new StringTokenizer(value.toString());
-
while (itr.hasMoreTokens()) {
-
word.set(itr.nextToken());
-
context.write(word, one);
-
}
-
}
Mapper通过map方法实现,在指定的textInputFormat的辅助下,一次处理一行。然后,通过StringTokenizer,将行按空格拆分成字符,且输出类似<<word>,1>的键值对。
对于给定的例子,第一个map的输出:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二个map输出:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
我们已经了解到了为一个给定job而衍生的很多map.至于如何以一种细粒度的方式控制它们,将会在手册的后面介绍。
job.setCombinerClass(IntSumReducer.class);
WordCount 同样指定了一个合并器。因此,在基于key值排序后,每一个map的输出都会传递给本地的合并器以实现本地聚合。
第一个map的输出:
< Bye, 1>
< Hello, 1>
< World, 2>`
第二个map的输出:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>`
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
int sum = 0;
-
for (IntWritable val : values) {
-
sum += val.get();
-
}
-
result.set(sum);
-
context.write(key, result);
-
}
Reducer通过reduce方法实现,目的是对值进行求和,也就是是计算每个键出现的次数(比如,在这个例子中就是单词)
因此,job的输出为:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>`
main 方法指定job的不同方面,比如输入/输出路径(通过命令行传递),键/值类型,输入/输出格式等等。
在手册的后续我们将会学习更多关于Job,InputFormat,OutoutFormat和其他接口和类。
MapReduce-用户接口
这部分介绍MapReduce框架用户方面的一些细节。这能够帮助用户更细致的实施、配置和调节它们的job.但是,每个类/接口的javadoc依然是最详细可用的文档;这个仅仅是作为一个手册。
让我们首先来看看Mapper和Reducer接口。应用通常通过提供map和reduce方法来实现它们。
然后我们将会讨论其他核心接口,包括Job,Partitioner,InputFormat,OutputFormat和其他等。
最后,我们会讨论框架的一些用户特性,比如DistributedCache,IsolationRunner等。
应用通常实现Mapper和Reducer接口来提供map和reduce方法。这些组成了job的核心。
Mapper
Mapper 将输入的键/值对映射成一个中间键值对集。
映射是将输入记录转换成中间记录的单个行为。转换后的中间记录并不需要和输入记录类型相同。一个给定的输入对可能会映射成0个或更多的输出对。
Hadoop MapReduce框架给通过InputFormat产生的每个InputSplit衍生一个map任务。
总的来说,Mapper实现通过Job.setMapperClass(Class)方法传递给Job。进而,框架为InputSplit中的每个键/值对调用map(WritableComparable,Writable,Context)。应用能够重载cleanup(Context)方法来操作任何需要的清理。
输出(键值)对的类型不需要和输入对的类型相同。一个给定的输入对可能会映射成0个或更多的输出对。通过调用context.write(WritableComparable,Writable)来收集输出对。
应用能够使用Counter来报告它们的统计。
所有关联一个给定输出键的中间值随后会被框架分组,进而传递给Reducer来决定最后的输出。用户能够通过Job.setGroupingComparatorClass(Class)指定一个Comparator来控制分组。
Mapper输出会被排序进而分发给Reducer。分块的数量和job中reduce任务一样。用户能够通过实现一个订制的Partitioner来控制那些键(也就是记录)分发给哪些Reducer。
用户能够选择通过Job.setCombinerClass(Class)指定一个combiner来对中间结果进行本地聚合,以帮助减少从Mapper传递给Reducer的数据的数量。
中间以及排序后的输出总是以一种简单的(键长,值长,值)格式存储。应用能够控制是否,以及如何对输出进行压缩。且通过Configuration使用CompressionCodec。
有多少map?
Map的数量通常决定于输入的总尺寸,也就是输入文件的块的总数。
Map并行性的合适层级一般设置在每个节点大概10-100个maps,尽管对于很多不是很耗CPU资源的map任务可以设置高达300个map.任务启动需要花点时间,因此map花几分钟后再执行的话是最好的。
因此,如果有10TB的输入数据,且block(块)的尺寸为128MB的话,你将获得配备82000个map.除非你通过Configuration.set(MRJobConfig.NUM_MAPS, int)来将它设置的更高。
Reducer
Reducer 减少共享一个键的中间值,得到一个更小的值集。
用户通过Job.setNumReduceTasks(int)来设置job的reduce的数量。
总的来说,实施的Reducer通过Job.setReducerClass(Class)传递给Job。然后,框架给分组后的输入中的每一<key,(list of values)>对调用reduce(WritableCoparable,Iterable<Writable>,Context)方法。应用能够重载cleanup(Context)方法来操作任何所需的清理。
Reducer有3个主要阶段:清洗、排序和简化。
清洗
Reducer的输入是mapper排序后的输出。在这一阶段,框架通过HTTP从所有的mapper中抽取相关的分区。
排序
在这一阶段,框架按键对Reducer的输入进行分组(因为不同的mapper可能输出相同的键)。
清洗和排序阶段是同时发生的;map输出抽取后,就会进行合并。
二次排序
如果对中间值进行分组的等量规则需要和reduction前的键值分组的规则不同的话,可以通过Job.setSortComparatorClass(Class)方法指定一个Comparator。因为Job.setGroupingComparatorClass(Class)能够用来控制中间键如何分组,因此能够用来一起模拟对值进行二次排序。
简化
在这阶段,对分完组的输入的每一<key,(list of values)>对调用reduce(WritableComparable,Iterable<Writable>,Context)
Reduce任务的输出通常通过Context.write(WritableComparable,Writable)写入到文件系统中。
应用能够使用Counter报告它的统计。
Reducer的输出是没经过排序。
多少Reduce?
如果使用的是0.95,map一完成,所有的reduce就会立马发布并启动传输map的输出。如果使用1.75,最快的节点会完成第一轮的reduce并且发起第二波reduce来做更多更好的负载均衡工作。
增加reduce会增大框架的负载,但是能够负载均衡,且降低失败的代价。
无Reducer
如果无需规约(reduction),设置reduce的任务数为0是合理的。
这种情况map任务的输出会通过输出路径直接存储到文件系统中,其中输出路径通过FileOutputFormat.setOutputPath(Job,Path)设置。框架在将map的输出写到文件系统中去之前不会对输出进行排序。
分区器
分区器拆分键空间。
分区器控制对中间map输出中的键进行分区。键(或者键的子集)通常用来通过哈希函数来衍生分区,分区的总数和job中的reduce任务相同。因此,这控制中间键会发送到哪些reduce任务来进行规约(reduction).
HashPartitioner是默认的分区器。
Counter
Counter辅助MapReduce应用报告它的统计结果。
Mapper和Reducer实现能够使用Counter来报告统计。
Hadoop MapReduce捆绑一个有用的mapper,reducer和分区器库。
Job 配置
Job是主要接口,提供给用户为Hadoop框架去刻画一个供执行的MapReducejob。框架尝试照Job描述的方式如实的执行job。但是:
l 一些参数可能会被管理者标识为final,因此不能修改。
l 有一些job参数可以直接设置(比如,Job.setNumReduceTasks(int)),而其他一些参数与框架的其他部分或者job配置精密交互,这是很难设置的(比如,Configuration.set(JobContext.NUN_MAPS,int))
Job通常用来指定Mapper,合并器(如果有)、分区器、Reducer、InputFormat、OutputFormat实现。FileInputFormat指定输入文件集(FileInputFormat.setInputPaths(Job,Path…)/FileInputFormat.addInputPath(Job,Path))以及(FileInputFormat.setInputPaths(Job,String…)/FileInputFormat.addInputPaths(Job,String)),同时指定输出文件应该写入的地址(FileOutFormat.setOutputPath(Path)).
可选地,Job能够用来指定job更高级的一些方面,比如使用的比较器(Comparator),用来存入DistributedCache的文档,是否要对中间或job输出进行压缩以及如何压缩;是否job任务能够以一种投机的(speculative)方式执行(setMapSpeculativeExecution(boolean)/setReduceSpeculativeExcution(Boolean));以及每个人物的尝试次数(setMaxMapAttempts(int)/setMaxReduceAttempts(int))等等。
当然,用户能够使用Configuration.set(String,String)/Configuration.get(String)来设置和获取应用所需的专用参数。但是,对于大规模(只读)数据,需要使用DistributedCache.
任务的执行和环境
MRAppMaster 将Mapper/Reducer任务作为一个子进程在一个独立的jvm上执行。
子任务继承父MRAppMaster的环境。用户能够通过mapreduce.{map|reduce}.java.opts给子-JVM指定额外选项.
内存管理
用户/管理者统一能够通过使用mapreduce.{map|reduce}.memory.mb指定发布的子任务,以及任何迭代发布的子进程的最大虚拟内存。
Job提交和监控
Job是用户-job与ResourceManager交互的主要接口。
Job提供提交job的基本所需,跟踪job进程,访问模块-任务的报告和日志,获取MapReduce集群的状态信息等等。
job的提交涉及:
1. 检查job特定的输入和输出。
2. 为job计算InputSplit值。
3. 如果需要,为job的DistributedCache提供必须的计算信息。
4. 复制job的jar和配置到文件系统上的MapReduce系统目录。
5. 提交job到Resourcemanager且选择性的监控它的状态。
通常,用户使用Job去创建应用,描述job的不同方面,提交job,并监控它的过程。
Job控制
用户通常需要链接MapReducejob来完成那些通过单一MapReducejob 不能完成的复杂任务。这相对还是比较容易的,因为job的输出通常会进入到分布式文件系统,而输出能轮流作为下一个job的输入。
但是,这同样以为着保存job完整(成功/失败)的责任就完全由客户端来承担。在这种情况下,不同的job控制选择有:
Job.submit():将job提交到集群并立即返回。
Job.waitForCompetion(boolean):提交job到集群并等待完成。
Job输入
InputFormat为一个MapReduce job描述输入指定
MapReduce框架依赖job的InputFormat:
1. 验证job的输入特性。
2. 拆分输入的文本成逻辑InputSplit实体,每个实体之后分配个一个单独的Mapper.
3. 提供RecordReader实现用来从逻辑InputSplit搜集输入记录给Mapper处理。
基于文件的InputFormat实现,通常是FileInputFormat的子类的默认行为是基于总尺寸按比特将输入拆分逻辑InputSplit实体。但是,输入文件的FileSystem块尺寸会当作输入分区的一个上限。分区尺寸的下限可以通过mapreduce.input.fileinputformat.split.minsize设置。
显然,因为记录边界必须考虑,所以基于输入尺寸的逻辑分区对于很多的应用是不足够的。在这样的例子中,应用必须实现一个RecordReader。RecordReader负责考虑记录边界且为个体任务呈现逻辑InputSplit的一个面向记录的视图。
TextInputFormat 是默认InputFormat.
如果TextInputFormat是一个给定job的InputFormat,框架会探测扩展名为.gz的输入文件,并使用合适的CompressionCodec自动解压它们。但是,具有以上扩展名的压缩文件不能够分区,每一个压缩文件会被一个单一的mapper当作整体处理。
InputSplit
InputSplit 代表单个Mapper要处理的数据。
通常,InputSplit代表输入的一个面向比特视图,Recorder负责处理和呈现一个面向记录的视图。
FileSplit是默认的InputSplit。通过设置mapreduce.map.input.file为进行逻辑分区的输入文件。
RecordReader
RecordReader从一个InputSplit读取<key,value>对。
通常,RecordReader转换InputSplit提供的输入的面向比特视图,并且呈现给Mapper一个面向记录的视图进行处理。总之,RecordReader的责任就是处理记录边界和以键值的形式呈现任务。
Job输出
OutputFormat为一个MapReduce job描述一个特定的输出。
MapReduce框架依赖job的outputFormat:
1. 验证job的输出特性;比如,检查输出路径不存在。
2. 提供RecordWriter实现用来写job的输出文件。输出文件保存在FileSystem中。
TextOutputFormat是默认的OutputFormat.
OutputCommitter
OutputCommitter为MapReduce job描述任务的提交。
MapReduce 框架依赖job的OutputCommitter:
1. 初始化阶段启动job.比如,在job的初始化阶段为job建立一个临时的输出路径。当job处于PREP状态且在初始化任务后,job是通过一个单独的任务启动的。一旦开启任务完成,job就会进入到运行状态。
2. 当job完成的时候清除job。比如,当job完成后移除临时输出路径。Job清楚是在job结束时由一个独立的任务完成的。当清除工作完成后,job会声明为SUCCEDED/FAILED/KILLED。
3. 启动任务临时输出。任务启动在任务初始化时发生,是相同任务的一部分。
4. 检查是否一个任务需要一个提交。这样能够避免不需要提交的任务提交
5. 任务输出的提交。一旦任务完成,如果需要任务会提交它的输出。
6. 放弃任务提交。如果任务失败后者被杀,输出就会被清除。如果不能清除,会发布一个具有相同尝试id的单独任务来执行清除。
FileOutputCommitter 是默认OutCommitter.Job启动/清理任务占用map或者reduce容器,不管哪个在节点管理上都可行。JobCleanup任务、TaskCleanup 任务和JobSetup任务按从高到低,具有最高的优先级。
Task Side-EffectFiles
在一些应用,组合任务需要建立和写边文件(side-files)。边文件和job输出文件(job-output)文件有区别。
RecordWriter
RedordWriter将输出<key,value>对写入输出文件。
RecordWriter实现将job输出写入到文件系统(FileSystem)
其他有用的特性
提交job到队列
用户提交job到队列(Queues).队列,作为job的容器,允许系统提供特定功能。比如,队列使用ACLs来控制哪些用户能够将job提交给他们。队列主要被Hadoop调度器使用。
Hadoop配置一个单一强制性队列,称之为“默认”。队列的名字通过mapreduce.job.queuename——Hadoop位置配置的属性来定义。一些job调度器,比如容器调度器(Capacity Scheduler),支持多队列。
定义队列的job需要通过mapreduce.job.queuename属性或者通过Configuration.set(MRJobConfig.QUEUE_NAME,String)API来提交。设置queue名称是可选的。如果一个job没有设置一个相关的队列名称,那就回设置为‘默认’队列。
Counters
Counters代表全局计算器,通过MapReduce框架或者应用定义。每一个计算器可以是任何枚举类型。特定Enum的计算器会捆绑到计算机组中——Counters.Group
应用能够定义特定Counters,并且通过map和reduce方法的Counters.incrCounter(Enum,long)或Counters.incrCounter(String,String,long)更新。进而框架会汇集这些计算器。
DistributedCache
DistributedCache有效率的分发应用特定、大且只读的文件。
DistributedCache是MapReduce框架提供的一个工具,用来缓存应用需要缓存的文件(文本,压缩文件,jar等待)。
应用在Job中通过urls(hdfs://)指定要缓存的文件。DistributedCache假定通过hdfs://指定的文件已经存在在文件系统中(FileSystem)。
在job的任意任务在节点上执行时,框架都会将所需的文件复制到该隶属节点。这个过程的效率源于每个job只会复制文件一次,并且有能力缓存那些在隶属节点上不解压的压缩文件。
DistributedCache跟踪缓存文件的修改时间戳( modification timestamps)。显然,当job执行的时候,缓存文件不会被应用或者外部修改。
DistributedCache能够用来分发简单、只读数据/文本文件或者更复杂类型,比如压缩文件后jar文件。压缩文件(zip,tar,tgz和tar.gz文件)是在隶属节点不解压的。文件有执行许可设定。
文件/压缩文件能够通过设置mapreduce.job.cache.{files|archives}来分发.如果超过一个文件/压缩文件分发,它们必须以逗号分开路径来添加。同样可以通过API——Job.addCacheFile(URI)/Job.addCacheArchive(URI)和[Job.setCacheFiles[URI]] (../../api/org/apache/hadoop/mapreduce/Job.html)/ [Job.setCacheArchives(URI[])](../../api/org/apache/hadoop/mapreduce/Job.html) where URI is of the form hdfs://host:port/absolute-path\#link-name.在流中,文件能够通过命令行选项-cacheFile/-cacheArchive来分发。
DistributedCache能够作为初步的软件分发机制用在map或(和)reduce任务重。DistributedCache能够用来分发jar库和原始库。Job.addArchiveToClassPath或Job.addFileToClassPath(Path) api用来缓存文件/jar,且将它们添加到child-jvm的classpath中。这同样可以通过配置属性mapreduce.job.classpath.{files|archives}来实现。相同的,符号连接任务的工作路径的缓存文件能够用来分发源库和加载它们。
私有和公共分发缓存文件
分发缓存文件可以是私有的或者公共的。这决定它们怎么在从节点间共享。
l “私有”分发缓存文件缓存在一个本地路径上,它对那些他们的job需要这些文件的用户是隐私的。这些文件只有在指定的用户的所有job和任务间共享而不能被从节点上的其他用户的job访问。一个分发缓存文件通过在它保存的文件系统——通常是HDFS上实现隐私性设定。如果文件没有可读的访问,或者装载文件的路径没有可执行的接入来查询,那么文件就会变成私有。
l “公有”分发缓存文件缓存在一个全局目录中。对于所有的用户,这些文件都是可见的。这些文件可以被从节点上的所有用户的任务(task)和job共享。一个分发缓存文件能够在保存它们的文件系统中实现设置为公共。如果文件可读可访问,且装载文件的路径可执行可访问来实现查询,它们就是公有的。换句话说,如果用户想要使一个文件对所有用户公开可用,文件许可必须要设定为全局可读且装载文件的路径的目录许可必须是全局可执行的。
调试(Debugging)
为了调试,MapReduce框架提供了一个工具来运行用户提供的脚本。比如说,当一个MapReduce任务失败时,一个用户能够运行一个调试脚本来处理任务日志。给定的脚本访问任务的标准输出(stdout)、标准错误输出(stderr)、系统日志(syslog)和job配置(jobconf)。调试脚本的标准输出和标准错误输出的输出会展示在控制台的诊断区,同样也会作为jobUI的一部分。
下面会讨论如果在一个job中提交debug脚本。脚本文件需要分发给提交给框架。
如何分发脚本文件
用户需要使用DistributedCache来分发和符号链接脚本文件。
如何提交脚本
快速提交调试脚本的一种方式是为调试map任务和reduce任务设置属性mapreduce.map.debug.srcipt和mapreduce.redue.debug.script的值。这些属性同样能够使用APIs Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT,String)和Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT,String)。在流模式下,还可以通过命令行选项-mapdebug和-reducedebug设定。
脚本的参数是:任务的标准输出,标准错误输出,系统日志和job配置文件。在MapReduce任务失败的节点上执行的调试命令如下:
$script$stdout $stderr $syslog $jobconf
数据压缩
Hadoop MapReduce 提供给应用开发者一个指定中间map输出和job输出——即reduce输出压缩的工具。同时捆绑了zlib压缩算法的CompressionCodec实现。同样支持gzip,bzip2,snappy和lz4文件格式。
中间输出
应用通过Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS,Boolean)控制中间map输出的压缩,且通过Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODES,Class)api能使用CompressionCodec
Job输出
使用FileOutputFormat.setCompressOutput(Job,boolean)控制job输出的压缩;通过FileoutputFormat.setOutputComressorClass(Job,Class)api指定使用CompressionCodec。
忽略错误记录
Hadoop提供选择当在处理map输入的时候某些坏的输入记录能够忽略。应用通过SkipBadRecords类能够控制这特性。
当map任务在某些输入广泛崩溃时能用到这一特性。这通常发生在map函数存在bug时。,通常用户必须修复这些bug。但是这并不总是可能。比方说,bug可能出现在源码不可用的第三方库中。在这些情况下,即使多次尝试,任务也不能完全成功,最终job失败。如果具有上面的特性,只会损失坏记录周边的一小部分数据,这在一些应用中是可以接受的(比如说,在大量数据中执行统计分析时)
默认这一特性是关闭的。如果要开启它,需要SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducemaxSkipGroups(Configuration,long)
如果启动了这一特性框架在几次map失败后就会进入“忽略模式”。更多细节,查看SkipBadRecords.setAttemptsToStartSkipping(Configuration,int)。在“忽略模式”下,map任务保持一个要处理的记录的范围。为了实现这一点,框架需要依赖处理记录计算器。查看SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS和SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS。框架通过该计数器能够知道有多少记录处理成功,因此,也就知道导致任务崩溃的记录的范围。在进一步尝试时,记录的这个范围会忽略。
忽略的记录的数量取决于应用增加处理记录计数器的频率。推荐在每个记录处理后就增加该计数器。
为了增加人物尝试次数,可以使用 Job.setMaxMapAttempts(int) and Job.setMaxReduceAttempts(int)
实例:WordCount v2.0
这是一个更完整的WordCount_——采用了目前讨论的MapReduce框架的很多特性。
这个例子需要启动并运行HDFS,特别是对于DistributedCache相关的特性。因此,这个例子只能工作在伪分布或全分布的Hadoop安装版本中。
源码:
-
import java.io.BufferedReader;
-
import java.io.FileReader;
-
import java.io.IOException;
-
import java.net.URI;
-
import java.util.ArrayList;
-
import java.util.HashSet;
-
import java.util.List;
-
import java.util.Set;
-
import java.util.StringTokenizer;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.Counter;
-
import org.apache.hadoop.util.GenericOptionsParser;
-
import org.apache.hadoop.util.StringUtils;
-
-
public class WordCount2 {
-
-
public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
static enum CountersEnum { INPUT_WORDS }
-
-
private final static IntWritable one = new IntWritable(1);
-
private Text word = new Text();
-
-
private boolean caseSensitive;
-
private Set<String> patternsToSkip = new HashSet<String>();
-
-
private Configuration conf;
-
private BufferedReader fis;
-
-
-
public void setup(Context context) throws IOException,
-
InterruptedException {
-
conf = context.getConfiguration();
-
caseSensitive = conf.getBoolean(“wordcount.case.sensitive”, true);
-
if (conf.getBoolean(“wordcount.skip.patterns”, true)) {
-
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
-
for (URI patternsURI : patternsURIs) {
-
Path patternsPath = new Path(patternsURI.getPath());
-
String patternsFileName = patternsPath.getName().toString();
-
parseSkipFile(patternsFileName);
-
}
-
}
-
}
-
-
private void parseSkipFile(String fileName) {
-
try {
-
fis = new BufferedReader(new FileReader(fileName));
-
String pattern = null;
-
while ((pattern = fis.readLine()) != null) {
-
patternsToSkip.add(pattern);
-
}
-
} catch (IOException ioe) {
-
System.err.println(“Caught exception while parsing the cached file ‘”
-
+ StringUtils.stringifyException(ioe));
-
}
-
}
-
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
String line = (caseSensitive) ?
-
value.toString() : value.toString().toLowerCase();
-
for (String pattern : patternsToSkip) {
-
line = line.replaceAll(pattern, “”);
-
}
-
StringTokenizer itr = new StringTokenizer(line);
-
while (itr.hasMoreTokens()) {
-
word.set(itr.nextToken());
-
context.write(word, one);
-
Counter counter = context.getCounter(CountersEnum.class.getName(),
-
CountersEnum.INPUT_WORDS.toString());
-
counter.increment(1);
-
}
-
}
-
}
-
-
public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
private IntWritable result = new IntWritable();
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
int sum = 0;
-
for (IntWritable val : values) {
-
sum += val.get();
-
}
-
result.set(sum);
-
context.write(key, result);
-
}
-
}
-
-
public static void main(String[] args) throws Exception {
-
Configuration conf = new Configuration();
-
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
-
String[] remainingArgs = optionParser.getRemainingArgs();
-
if (!(remainingArgs.length != 2 | | remainingArgs.length != 4)) {
-
System.err.println(“Usage: wordcount <in> <out> [-skip skipPatternFile]”);
-
System.exit(2);
-
}
-
Job job = Job.getInstance(conf, “word count”);
-
job.setJarByClass(WordCount2.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
-
List<String> otherArgs = new ArrayList<String>();
-
for (int i=0; i < remainingArgs.length; ++i) {
-
if (“-skip”.equals(remainingArgs[i])) {
-
job.addCacheFile(new Path(remainingArgs[++i]).toUri());
-
job.getConfiguration().setBoolean(“wordcount.skip.patterns”, true);
-
} else {
-
otherArgs.add(remainingArgs[i]);
-
}
-
}
-
FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
-
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
-
-
System.exit(job.waitForCompletion(true) ? 0 : 1);
-
}
-
}
运行实例
输入的文本文件样例:
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World, Bye World!
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.
运行这个应用:
bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output
输出:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
注意这里的输入和第一版已经不一样。我们同样可以看到这影响到了输出。
现在,让我们通过DistributedCache,插入一个模式文件来列出所有要忽视的字符模式。
$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to
再执行一次。这次附加上更多选项。
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
如所愿,输出如下:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1
去掉大小写敏感再运行一次:
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
bye 1
goodbye 1
hadoop 2
hello 2
horld 2
原文:http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core
数据评估
本站ai导航提供的mapreduce都来源于网络,不保证外部链接的准确性和完整性,同时,对于该外部链接的指向,不由ai导航实际控制,在2023年7月2日 下午4:08收录时,该网页上的内容,都属于合规合法,后期网页的内容如出现违规,可以直接联系网站管理员进行删除,ai导航不承担任何责任。