Hadoop 随笔
包含模块:
- Hadoop Common:支持 Hadoop 其他模块的公共实用工具
- Hadoop Distributed File System (HDFS™):能够为应用数据提供高吞吐量的访问的分布式文件系统
- Hadoop YARN:作业调度和集群资源管理框架
- Hadoop MapReduce:一个基于 YARN 的大型数据集并行处理系统
- Hadoop Ozone:用于 Hadoop 的对象存储
名词解释
Server:可以理解为单个机器,提供服务。包括 CPU、DRAM、Disks
Rack:Multi Servers,40 ~ 80 servers 组成 Rack(机架、机柜),servers 之间通过 Ethernet(以太网)连接
Clusters:Multi Racks,多个 Rack 组成 Cluster,Rack 之间通过 Cluster Switch 连接,
Datacenter:更偏向于物理、地域的划分,一个 Datacenter 可能有多个集群,一个集群也可能跨多个数据中心。
关于网络带宽:单个 server 内不需要网络带宽;多个 server 交互时,同一 Rack 中的网络带宽最为充足,其次为同一 Cluster,不同地域的 Datacenter 网络带宽限制最大。
Hadoop Common
Rack
Hadoop 组件支持 Rack。HDFS 中就是用 Rack ,其通过将 Block 放置在不同的 Rack 上来使用 Rack 的容错机制。Rack 能保证在网络交换机故障时或者在集群中的分区提供数据的可用性。
HDFS
分布式文件系统 HDFS 被设计用于普通硬件上的,其和现有的分布式文件系统很相似,当然也有不同点,这些不同点正正是 HDFS 的优越性的体现。
与其他分布式文件系统的不同点、特点:
- 支持高容错
- 部署在低成本的硬件上
- 提供应用的高吞吐量访问
- 适应与拥有大型数据集的应用
- 放宽了对 POSIX 的一些要求,允许对文件系统进行流访问
HDFS 设计时的目标:
-
硬件的错误处理
HDFS 中包含数百台、数千台服务器,其中服务器挂掉是常态,因此 HDFS 能够识别错误,并且迅速的、自动的恢复是其设计结构的目标
-
流式数据访问
这里的流式数据访问是针对特殊应用的,而且此处”流式”的概念更像是批处理,而不是用于交互(interactive)。HDFS 更注重高吞吐量而不是低延迟。
-
大型数据集
文件大小通常在 GB 到 TB 之间,因此他需要提供高聚合数据带宽,并且在单个集群中扩展到数百个节点,并且在单个实例中支持千万个文件。
-
简单的一致性模型
HDFS 需要对文件的”单次写多次读”模型,一个文件一旦被创建、写入、关闭之后,除了 append 和 truncate 操作之外,是不希望被更改的(update)。这种模型假设简化了数据一致性问题,并且支持了高吞吐量访问。
-
移动计算比移动数据更加简单
当计算在数据附近时会变得更高效,对于大型数据集更是如此,因为这样会最小化网络拥塞,并且最大化系统的总吞吐量。HDFS 为此提供了应用程序接口,将计算移动到数据旁边。
-
异构硬件和软件平台的可移植性
将 HDFS 从一个平台移植到另一个平台是很容易的。
NameNode 和 DataNodes
HDFS 使用主从接结构。一个 HDFS 集群包含一个 NameNode,它是一个主服务器,用来管理文件系统命名空间和控制客户端对文件的访问。另外,包含大量的 DataNodes,通常集群中每个节点都有一个,它用来管理其所在节点的存储。
HDFS 暴露一个文件系统命名空间并且允许用户存储数据到文件中。在存储时,每个文件被分成一个或多个 blocks,并且这些 blocks 被存储在 DataNodes 集合中。NameNode 执行文件系统命名空间操作,例如打开、关闭、重命名文件和目录。同时 NameNode 决定 blocks 和 DataNodes 的对应关系。DataNodes 用来响应来自文件系统客户端的读或者写请求。DataNodes 同时执行 NameNode 的指令,如创建、删除、复制等。
与 HDFS 的整体设计目标一致,NameNode 与 dataNodes 同样也是被设计基于普通机器的。具体做法为:
- 运行于 Lnux 机器上
- 使用 Java 语言,移植性好
- 一般来说每个机器运行一个 DataNode
系统中仅存在单个 nameNode 简化了体系结构。metadata 被 namenode 决策和存储,而用户数据永远不会流经 NameNode。
文件系统命名空间
HDFS 支持传统的层次化文件组织。用户或者应用程序可以创建和存储数据到目录中。文件系统命名空间可以创建、移动、重命名文件,其支持用户配额和访问权限,目前 HDFS 并不支持硬链接和软链接。
除了文件系统命名空间信息,NameNode 同时存储复制因子(replication factor )等信息。
数据复制
HDFS 通过存储 blocks 序列的方式存储文件,为了达到高容错的目的,HDFS 采用复制的方式,每个文件的 block 大小和复制因子都是可配置的。HDFS 中除了最后一个 block,其余 block 的大小相同,尽管可以添加可变长度 block 的支持。HDFS 的复制因子可以被指定,其可以在文件创建时、或者创建之后被指定。
NameNode 负责关于 block 复制工作的所有决策。它定期从集群中的所有节点收集 Heartbeat 和 Blockreport,收到 Heartbeat 代表 DataNodes 正常工作,Blockreport 中包含所有 block 的列表。
HDFS 中复制位置和 Rack 息息相关,其可以保证可靠性、可用性和带宽利用率。集群中两个节点的交流需要网络交换机,其中不同 Rack 上的机器的网络带宽比相同 Rack 上机器的网络交带宽差。
复制放置位置、策略:
从 Rack 的角度:
NameNode 决定每个 DataNodes 所属的 Rack id。其中有集中策略:
-
均匀的将副本放置在不同的机架上
优点:当整个机架失败时不会损失数据;在读取数据时可以使用多个机架的带宽;可以很轻松的达到负载均衡;
缺点:提高了写操作的成本,因为按照这样方式写操作需要将块转移到多个机架上,之前提高过不同机架上机器传输对应的网络带宽差。
-
复制因子为 3 时,其中一个副本在当前写机器上的机架上,其余的副本在相同的其余机架上但是不同节点;复制因子为 4 时,前三个按照上述策略,其余的副本随机放置,并且每个机架上的副本数量少于上限。
上限计算公式:$\frac{replicas - 1}{racks + 2}$
优点:减少了机架间的写流量,以提高写性能;由于机架失败的概率远远小于节点失败的概率,因此这个策略同样可以保证可靠性和可用性;在读取文件时减少了使用的聚合网络带宽,因此所有的节点并不是分布在不同的机架上。
从 DataNodes 的角度:
每个 DataNode 不允许拥有同一 block 的多个副本,因此副本的最大数量是当时 DataNodes 的总数。
除上述考虑基于机架感知的存储策略,还需要考虑存储类型。即确定候选节点后,还需要检查候选节点是否拥有满足要求的存储空间,如果不满足将查找另一个节点,如果在当前路径中找不到符合要求的足够节点,将会在第二个路径中寻找具有回退类型的节点。
副本选择
为达到最小化网络带宽消耗和读取延迟,HDFS 选择副本的原则是:就近原则。具体为如果读取节点机架存在副本,那么当前副本将会响应请求,不然考虑同一数据中心中,是否存在副本可以使用。
安全模式
启动时,NameNode 会进入安全模式,在安全模式中,不会发生数据块的复制。在这个模式中,NameNode 接收 Heartbeat 和 Nodereport,通过 Nodereport 的信息,被 Namenode 检查后的复制快被认为是安全复制的,在安全模式退出后,如果块的副本数量少于复制因子,NameNode 将会执行复制操作。
文件系统元数据的持久性
HDFS 命名空间被存储在 NameNode 中。HDFS使用一个事务日志 Editlog 存储每一次文件系统元数据的改变记录,NameNode 使用本地系统中的一个文件存储 EditLog;HDFS 中的命名空间、blocks 映射关系、文件系统属性被存储在 FsImage 文件中,NameNode 使用本地系统中的文件去存储。
NameNode 在内存中保存命名空间和文件块映射的镜像。当 NameNode 启动或被触发时,其从硬盘中读取 EditLog 和 FsImage,并将 EditLog 中的事务应用到内存中的 FsImage 中,并且将最新的镜像存储到磁盘中的 FsImage 中,之后会删除旧的 EditLog,因为其所存储的事务都被持久化到 FsImage 中,这个过程叫做 checkpoint。checkpoint 的作用是通过获取文件系统元数据的快照并保存到 FsImage 中,以确保 HDFS 具有文件系统元数据的一致性视图。即使从 FsImage 是有效地,但是直接对 FsImage 编辑是无效的,通过这种方式,不需要对每次编辑修改 FsImage,而是将编辑存储到 EditLog 中,在 checkpoint 过程中,EditLog 被应用到 FsImage 中。checkpoint 的触发方式有两种,一种是以秒为单位的给定时间内出发;另一种在积累给定数量的文件系统事务后出发。
DataNodes 在本地文件系统中存储数据。其不知道有关 HDFS 文件的任何信息,它将每个 block 存储在本地文件系统的单个文件中,DataNodes 不会在同一目录中创建所有文件,而是使用一种启发式方法来确定每个目录中的最优文件数量,并适当的创建子目录。当 DataNodes 启动时,它会扫描本地文件系统,生成所有数据 blocks 的列表,这些数据 block 对应于文件系统中的每个文件,并将这个报告发送给 NameNode,这就是 Blockreport。
通信协议
所有的 HDFS 通信协议都位于 TCP/IP 通信协议之上。Client 会与 NameNode 机器上配置好的 TCP 端口上建立连接,使用 NameNode 与 Client 协议进行通信,DataNodes 使用 DataNode 协议与 NameNode 进行通信。RPC 抽象封装了 Client 协议和 DataNdoes 协议,根据设计,NameNode 从不启动任何的 RPC,它仅响应由 DataNodes 或者 Client 发送的请求。
可靠性保障(高容错)
HDFS 的主要目标是在发生故障时,也要可靠的存储数据,即高容错的实现。三种常见的故障类型分比为:NameNode 故障、DataNodes 故障、网络 partition。
数据磁盘故障,heartbeat 和 重新复制
每个 DataNode 周期性的向 NameNode 发送 Heartbeat 信息,网络 partitions 会使 DataNodes 的子集失去与 NameNode 的连接,NameNode 通过缺少的 Heartbeat 信息识别这种情况的发生。NameNode 标记缺少 Heartbeat 的 DataNodes 为“死亡”,并不会向这些 DataNodes发送新的 IO 请求,存储在死亡 DataNode 中的数据将不再可用,DataNodes 死亡将会导致一些 blocks 的复制因子少于指定值,NameNode 会跟踪需要复制的 blocks,并在需要时复制他们。
重新复制的触发条件有多种:其中一个 DataNode 不可用;副本可能损坏;DataNode 上的硬件磁盘可能损坏;复制因子增加等。
集群再平衡
HDFS 的架构与数据再平衡方案兼容。当某个 DataNode 上的空闲空间低于阈值时,将数据从一个 DataNode 移动到另一个 DataNode 是很有可能的,当对于某个文件突然出现高需求时,一个方案是动态的创建其余的副本并在集群中平衡其他数据,这种数据再平衡的方案还没有实现。
数据完整性
从 DataNodes 获取的数据 blcok 损失是有可能发生的,这种发生可能由于存储设备、网络故障、软件错误等,HDFS client 实现了对 HDFS 文件的校验和检查,当 Client 创建文件时,会计算文件的每个 block 的 checksum,并将这些 checksum 存储在同一个 HDFS 命名空间的隐藏文件中,当 Client 检索文件内容时,会将文件与相关联的 checksum 做匹配,如果匹配不成功,Client 会从其他含有 block 的 DataNode 中获取 block。
元数据磁盘故障
EditLog 和 FsImage 是 HDFS 的中心数据结构。这些文件的故障可能导致 HDFS 实例失去作用,因此,可以配置为将 Namenode 支持维护 EditLog 和 FsImage 的多个副本,对两个文件的任何更新都会同步到所有副本上,这种同步会降低每秒钟 NameNode 支持事务的速率,这是可以接收的,因为 HDFS 应用的数据敏感性是针对数据本身而言,而不是针对元数据。当 NameNode 重启时,他会选择最新的一致的 EditLog 和 FsImage。
提高对故障恢复力的另一个选项是使用多个 NameNode。
快照
快照支持在特定的时刻存储数据的副本,其中一种使用方式就是通过快照将损坏的 HDFS 实例回滚到以前所知的正确时间点。
数据组织
数据 blocks
HDFS 被设计用来支持非常大的数据集,这类应用一般只写一次但是读多次,并且要求读满足 stream 速度。HDFS 中默认块大小为 128M,如果可能,每个块存放在不同的 NameNode 上。
复制管道
当 Client 向 HDFS 文件写入数据时,如果复制因子为 3,NameNode 将会通过复制目标选择算法检索 DataNodes 列表,选择之后 client 将会向第一个 DataNode 写入,第一个 DataNode 将会分批接收数据,将每批数据写入其本地存储库,并且将该部分传输到列表中的第二个 DataNode 上。第二个 DataNode 类似,接收数据后,存储到本地存储库的同时,向第三个 DataNode 传输数据,最后第三个 DataNode 将每批数据写入其本地存储库。因此,数据是从一个 DataNode 到下一个 DataNode 的流水线。
可访问性
访问 HDFS 的不同方式:
- Java API
- REST API
- HTTP browser
MapReduce
Hadoop MapReduce 是一个软件框架,在普通的硬件上,可以轻松的编写应用程序,用于以可靠的、容错的方式在大型集群(数千个节点)上并行处理大量数据(TB 级别的数据集)。
MapReduce 通常将输入数据集分割成独立的块,每个块以完全并行的方式被 map 任务处理,框架对 map 过程的输出进行排序,然后将其输入到 reduce 任务中。通常 job 的输入和输出都存储在文件系统中,框架负责调度任务,并且监控并重新执行失败的任务。
通常,计算 node 和存储 node 是相同的,MapReduce 和 HDFS 运行在同一组节点上,这种配置允许框架有效的在数据已经存在的 node 上有效的调度任务,从而产生跨集群的非常高的聚合带宽。
MapReduce 结构:每个集群拥有单个 master ResourceManager、每个节点拥有单个 worker NodeManager,每个应用对应一个 MPAppMaster。
应用通过实现适当的接口或者抽象类,定义输入输出位置并提供 map 与 reduce 函数,再加上一些作业参数,就完成了一个 job 的配置。Hadoop job client 提交 job(通常为 jar 包或者可执行文件)和参数配置到 ResourceManager,它会承担分发软件包/配置到每个 workers 的责任,调度任务并监视他们,向 job client 提供状态和诊断信息。
Hadoop 与 HDFS 一样,使用 Java 实现。
输入输出
MapReduce 仅对 <key, value> 进行操作,其将输入作为 <key, value> 集合,并且产生 <key, value> 集合作为输出。key 与 value 类必须能够被框架序列化,因此需要实现 Writable 接口,另外 key 必须实现WritableComparable 以便框架对其进行排序。
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
MapReduce Application:WordCount
WordCount:计算输入集合中每个单词出现的次数
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// 处理一行,进行分词
StringTokenizer itr = new StringTokenizer(value.toString());
// 判断是否有下个词
while (itr.hasMoreTokens()) {
// 设置对应单词的 key
word.set(itr.nextToken());
// 每个单词生成一个 <key, value>
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 获取实例
Job job = Job.getInstance(conf, "word count");
// 进行配置
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
// 指定 combine,每个 map 的输出都经过本地的 combine
job.setCombinerClass(IntSumReducer.class);
// 对于所有 combine 的输出做 reduce 操作
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MapReduce 用户接口
Mapper and Reducer
应用程序通常通过实现 Mapper、Reducer 接口实现 map、reduce 方法。
Mapper
Mapper 将输入的 <key, value> 映射成“中间”的一组键值对,类似于 <k1, v1> -> map -> <k2, v2> 过程。Map 过程是将输入转换成中间值的独立任务,转换成的中间值不需要与输入具有相同的类型,一个输入记录有可能被转换为 0 或多个中间值。
MapReduce 框架通过 job 的 Inputformat 生成 InputSplit ,并且每个 InputSplit 对应一个 map 任务。
Mapper 接口的实现通过 job.setMapperClass(class) 方法传递给 job,然后框架为 InputSplit 中每个<key, value> 调用 map 方法,应用程序也可以从写 cleanup 方法,来做清理工作。
应用可以使用 Counter
报告统计数据。
框架对与给定输出相关联的中间值进行分组,并经过 reduce 阶段,得到最终结果。用户可以指定比较器 Job.setGroupingComparatorClass(Class)
来控制分组(group)。
Mapper 的输出被排序然后根据 Reducer 进行分组,partition 的数量与 job 的 reduce 任务数量一致,用户可以实现自定义 partitoner 控制分组规则。
map 数量
map 的数量由输入文件的大小决定,即文件对应的 block 数量(HDFS)
每个节点大概 10~100 个 map,并且任务的启动需要时间,因
此 map 至少需要一分钟的时间执行。
假如有 10 TB 的输入数据,block 大小为 128M,那么将会有 82000 数量的 map,当然 map 的数量可以手动设置。
Reducer
Reducer 中的 reduce 过程会处理一组中间值,这些中间值共享同一个 key。
job 中 reduce 的数量通过用户设置,具体方法为:Job.serNumReduceTasks(int)
。
Reduce 过程包含三个关键阶段:shuffle、sort、reduce
shuffle
Mapper 的输出排序后输入到 Reducer,在这个阶段,框架通过 http 获取所有 mapper 输出的相关 partition。
sort
框架对 Reducer 过程的输入通过 Key 进行分组,因为不同的 Mapper 可能拥有相同 Key 的输出。
shuffle 和 sort 阶段是同时发生的,当获取 map 输出时,map 输出被 merge。
Secondary Sort
在 reduce 之前,如果对于中间值中 key 与 map 输出中 key 的等价规则不同,则可以通过Job.setSortComparatorClass(Class)
指定比较器。Job.setGroupingComparatorClass(Class)
可用来控制中间值中 key 的分组方式,这两种方法可以结合使用来对 value 进行二次排序。
Reduce
在这个阶段,对于分组后输入的每个 <key, value list> 调用 reduce 方法。
reduce 的输出通常通过 context 写入文件系统。
Reducer 的输出是未经排序的。
reducer 的数量
0.95(or 1.75) * <no. of nodes> * <no. of maximum containers per node>
-
0.95
在 map 过程完成时,所有的 reduce 可以快速启动并且开始转换 map 的输出。
-
1.75
做的快的节点在完成第一轮 reduce 后,将启动第二轮 reduce,有利于 job 的负载均衡
reduce 数量的增加对增大框架的开销,但是会提高负载均衡并降低故障恢复成本
上面的比例因子略小于整数,是为了给推测任务和失败任务预留 reduce 位置。
reduce is None
如果不希望执行 reduce 过程,将 reduce 的数量设置为 0 是合法的。
这种情况下 map 的输出直接写入文件系统,并且不对 map 的结果进行排序。
Partitioner
partitioner 将 map 过程的输出,即中间值中的 key 进行分区,分区方法一般使用 hash 方法,分区的总数和 job 中 reduce 任务的总数一致。
Partitioner 默认使用 HashPartitioner。
Counter
Counter 是 MapReduce 应用程序报告其统计数据的工具。Mapper 和 Reducer 可以使用 Counter 进行数据统计。
Job 配置
Job 表示 MapReduce 中任务的配置。Job 是 MapReduce 框架描述任务执行的关键接口。
Job 通常用来指定 Mapper、combine、Reducer、Partitioner、InputFormat、OutputFormat。其中InputFormat、OutputFormat 是用来设置文件的路径等信息。
除此之外,Job 还可以指定其他信息:Comparator(用来 group)、放入 DistributedCache 中的文件、压缩相关、任务是否以 speculative 方式进行、每个 task 最大尝试次数。
Task 执行与环境
MRAppMaster 在独立的 jvm 中以子进程的方式执行 Mapper/Reducer task。
子任务继承父 MRAppMaster 中的环境,当然子 jvm 中的参数也可以另外指定。
内存管理
用户还可以指定子任务的最大虚拟内存,以及它递归启动的子进程。通过 mapreduce.{map | reduce}.memory.mb. 指定的是每个进程的限制,这个值必须大于等于 jvm 指定的 -Xmx,否则 jvm 无法启动。 |
Map 参数
map 过程编辑过的 record,将会序列化到 buffer 中,metadata 将会存储到 accounting buffers 中。当序列化内容或者元数据超出 buffer 限制时,buffer 中的内容将被排序并写入磁盘。如果 buffers 被完全填满,map 将会阻塞。当 map 过程结束后,所有的数据都被写入到磁盘中,并且将磁盘上所有的 segments 写入到一个文件中。最小化向磁盘的溢出可以减少 map 的时间,但是更大的 buffer 会减少应用程序的内存。
包含两个参数:mapreduce.task.io.sort.mb
、mapreduce.map.sort.spill.percent
当 mapreduce.map.sort.spill.percent
满足后,后台线程会将溢出的内容转移到磁盘上
当 mapreduce.task.io.sort.mb
满足后,map 将会停止并将溢出的内容写入到磁盘上。
Shuffle/Reduce 参数
每个 Reduce 通过 HTTP 获取 Partitioner 分配好的输出,提取到内存中,并且将内存中的数据周期性的 merge 到磁盘中。
-
mapreduce.task.io.soft.factor
指定同一时间 merge 磁盘 segment 的数量,如果文件数量超过了这个限制,将会分多个回合进行 merge;当然这个参数同样适用于 map 过程,但是相比于 Reduce,大多数作业在 map 阶段不应该达到这个限制值。
-
mapreduce.reduce.merge.inmem.thresholds
被 merge 到磁盘之前,已排序好的数据在内存中的数量。这与
mapreduce.map.sort.spill.percent
类似,更像是一个触发器。在实践中,由于在内存中 merge segment 比在磁盘 merge segment 要便宜,一般设置为极大(1000)或极小(0)。 -
mapreduce.reduce.shuffle.merge.percent
在内存 merge segment 之前,获取的 map 输出阈值,表示在内存中存储 map 输出所占被分配内存的百分比。
-
mapreduce.reduce.shuffle.input.buffer.percent
内存的百分比,用于在 shuffle 期间分配存储 map 输出。
-
mapreduce.reduce.input.buffer.percent
reduce 期间存储 map 输出的内存百分比。
具体的参数需要根据代码、注释进一步了解。
任务日志
任务的标准输出(stdout)、标准错误输出(stderr)、日志(syslog)被 NodeManager 读取,打印到${HADOOP_LOG_DIR}/userlogs
中。
job 提交和监控
Job 是用户与 ResourceManager 交互的重要接口,可以理解为用户通过 Job 的配置影响 ResourceManager 。
Job 的功能:
- 提交任务
- 跟踪进度
- 访问组件任务的报告和日志
- 获取 MapReduce 集群的状态信息
job 提交过程包括:
- 检查 job 的输入输出规范
- 计算 job 的 InputSplit 值
- 为 job 的 DistributedCache 值设置必要的信息
- 在文件系统上复制 job 的 jar 包和配置到 MapReduce系统目录上
- 提交任务到 ResourceManager,选择性的监控状态
对于复杂的任务,也可以将多个 MapReduce 串联起来执行,因为 MapReduce 的输出再 HDFS 上,可以作为下一个 MapReduce 的输入。但是这种情况下,整体 job 的成功和失败需要由用户控制,可能涉及到以下两个命令:
-
Job.summit()
提交任务到集群并且立即返回
-
Job.waitForCompletion(boolean)
提交任务到集群,等待任务完成后返回
Job 输入
InputFormat 描述了 MapReduce job 的输入规范。作用为:
- 验证 job的输入规范
- 将输入文件拆分成符合规则的 InputSplit 实例,每个实例用于单个 Mapper。
- 提供 RecordReader 的实现,用于从股和规则的 InputSplit 实例中收集 record,用于 Mapper 的处理。
基于文件的 InputFormat 实现根据输入文件的大小分隔符合规则的 InputSplit 实例,其中输入文件的 block size 是分隔上限,下限可以通过 mapreduce.input.fileinputformat.split.minsize
指定。
InputSplit
InputSplit 表示单个 Mapper 处理的数据。
InputSplit 提供输入的面向 byte 的视图;RecordReader 负责处理和呈现一个面向 record 的视图。
RecordReader
RecordReader 从一个 InputSplit 中读取 <key, value> 对。
RecordReader 将 InputSplit 提供的 byte 视图转换成 record 视图,用于 Mapper 的处理
Job 输出
OutputFormat 描述了 MapReduce job 的输出规范。作用为:
- 验证 Job 的输出规范,例如输出文件夹是否存在
- 提供 RecordWriter 的实现,用来写入 job 的输出到文件。
OutputCommitter
OutputCommitter 描述 MapReduce job 的任务输出提交。作用为:
- 在初始化期间设置 job。
- 当 job 完成后,清楚 job。
- 设置任务的临时输出
- 提交任务的输出
- 舍弃任务的提交
任务的 side-effect files
在某些应用中,组建任务需要创建、写入 side-files,其余 job 的输出文件不同。
RecordWriter
RecordWriter 将 <key, value> 输出写入到文件中,即实现了将 job 的输出写入到文件功能。
其他有用的特性(未拓展)
- 提交 job 到队列
- Counters
- 分布式缓存
- Profiling
- Debugging
- 数据压缩
- 跳过 bad records
WordCount 2.0
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
public class WordCount2 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
static enum CountersEnum { INPUT_WORDS }
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive;
private Set<String> patternsToSkip = new HashSet<String>();
private Configuration conf;
private BufferedReader fis;
@Override
public void setup(Context context) throws IOException,
InterruptedException {
conf = context.getConfiguration();
caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
if (conf.getBoolean("wordcount.skip.patterns", false)) {
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
for (URI patternsURI : patternsURIs) {
Path patternsPath = new Path(patternsURI.getPath());
String patternsFileName = patternsPath.getName().toString();
parseSkipFile(patternsFileName);
}
}
}
private void parseSkipFile(String fileName) {
try {
fis = new BufferedReader(new FileReader(fileName));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
} catch (IOException ioe) {
System.err.println("Caught exception while parsing the cached file '"
+ StringUtils.stringifyException(ioe));
}
}
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = (caseSensitive) ?
value.toString() : value.toString().toLowerCase();
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
Counter counter = context.getCounter(CountersEnum.class.getName(),
CountersEnum.INPUT_WORDS.toString());
counter.increment(1);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
List<String> otherArgs = new ArrayList<String>();
for (int i=0; i < remainingArgs.length; ++i) {
if ("-skip".equals(remainingArgs[i])) {
job.addCacheFile(new Path(remainingArgs[++i]).toUri());
job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
} else {
otherArgs.add(remainingArgs[i]);
}
}
FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
与 1.0 版本的不同:
- 在 Mapper 或者 Reducer 实现中,描述了 setup 方法怎样访问配置参数
- 描述了 jobs 中 DistributedCache 怎样被用于分布式只读数据,并且用户可以指定在计数时需要跳过的单词。
- 描述 GenericOptionsParser 处理通用 Hadoop 命令行选项的实用程序。
- 演示应用程序如何使用 Counter,以及如何将状态信息传递给 map 方法或者 reduce 方法。
YARN
YARN 的基础思想是将资源管理(Resource Manager)和 job 的调度/监视功能划分到单独的守护进程中。即拥有一个全局的 ResourceManager(RM)以及每个应用程序的 ApplicationMaster(AM),应用程序可以仅包含一个作业,或者多个作业。
ResourceManager 和 NodeManager 来自于数据计算框架,如 MapReduce,其中 ResourceManager 是用来在系统中协调资源的最终权威,NodeManager 是每个机器上的框架 agent,负责容器、监事并报告资源使用情况(CPU、MEN、DISK、NETWORK)。
ApplicationMaster 是特定于框架的库,用于与 ResourceManager 协商资源,并与 NodeManager 一起执行、监视任务。
ResourceManager 拥有两个重要组件:
-
调度器(Scheduler)
Scheduler 负责将资源分配给正在运行的应用程序,这些应用程序受到容量、队列的约束。它仅仅作为一个 Scheduler,不监视或者跟踪应用的执行,同样也不能保证由于软件或硬件故障而重新启动的 job。他仅在应用有资源需求时才会表现其资源调度的功能,它基于容易资源概念的抽象实现,中包含了诸如内存、cpu、磁盘、网络等元素。
Scheduler 有一个可插拔的策略,负责在队列、应用程序之间对集群资源进行 partitioning。
-
应用程序管理器(ApplicationsManager)
-
负责接收提交的 job
-
协商执行任务的第一个 ApplicationMaster 容器。
-
提供服务,用于重启 ApplicationMaster 容器当发生故障时
ApplicationMaster 作用:
- 从 Scheduler 处协商适当的资源
- 跟踪状态
- 监视进度
-
YARN 支持通过 ReservationSystem 保留资源的概念,该组件允许用户指定资源超时和时间限制的配置文件,并且保留资源以确保重要工作的可预测执行。
为了在几千台 node 上运行 YARN,通过 Federation 支持联合的概念。Federation 允许将多个 YARN 子群连接在一起,作为单个大型集群出现。这种机制用于实现更大的规模,允许多个独立集群用于非常大的 job。