Hadoop 随笔

Hadoop 随笔

Posted by Jinliang on May 9, 2020

Hadoop 随笔

包含模块:

  • Hadoop Common:支持 Hadoop 其他模块的公共实用工具
  • Hadoop Distributed File System (HDFS™):能够为应用数据提供高吞吐量的访问的分布式文件系统
  • Hadoop YARN:作业调度和集群资源管理框架
  • Hadoop MapReduce:一个基于 YARN 的大型数据集并行处理系统
  • Hadoop Ozone:用于 Hadoop 的对象存储

名词解释

Server:可以理解为单个机器,提供服务。包括 CPU、DRAM、Disks

Rack:Multi Servers,40 ~ 80 servers 组成 Rack(机架、机柜),servers 之间通过 Ethernet(以太网)连接

Clusters:Multi Racks,多个 Rack 组成 Cluster,Rack 之间通过 Cluster Switch 连接,

Datacenter:更偏向于物理、地域的划分,一个 Datacenter 可能有多个集群,一个集群也可能跨多个数据中心。

关于网络带宽:单个 server 内不需要网络带宽;多个 server 交互时,同一 Rack 中的网络带宽最为充足,其次为同一 Cluster,不同地域的 Datacenter 网络带宽限制最大。

Hadoop Common

Rack

Hadoop 组件支持 Rack。HDFS 中就是用 Rack ,其通过将 Block 放置在不同的 Rack 上来使用 Rack 的容错机制。Rack 能保证在网络交换机故障时或者在集群中的分区提供数据的可用性。

HDFS

分布式文件系统 HDFS 被设计用于普通硬件上的,其和现有的分布式文件系统很相似,当然也有不同点,这些不同点正正是 HDFS 的优越性的体现。

与其他分布式文件系统的不同点、特点

  • 支持高容错
  • 部署在低成本的硬件上
  • 提供应用的高吞吐量访问
  • 适应与拥有大型数据集的应用
  • 放宽了对 POSIX 的一些要求,允许对文件系统进行流访问

HDFS 设计时的目标

  • 硬件的错误处理

    HDFS 中包含数百台、数千台服务器,其中服务器挂掉是常态,因此 HDFS 能够识别错误,并且迅速的、自动的恢复是其设计结构的目标

  • 流式数据访问

    这里的流式数据访问是针对特殊应用的,而且此处”流式”的概念更像是批处理,而不是用于交互(interactive)。HDFS 更注重高吞吐量而不是低延迟。

  • 大型数据集

    文件大小通常在 GB 到 TB 之间,因此他需要提供高聚合数据带宽,并且在单个集群中扩展到数百个节点,并且在单个实例中支持千万个文件。

  • 简单的一致性模型

    HDFS 需要对文件的”单次写多次读”模型,一个文件一旦被创建、写入、关闭之后,除了 append 和 truncate 操作之外,是不希望被更改的(update)。这种模型假设简化了数据一致性问题,并且支持了高吞吐量访问。

  • 移动计算比移动数据更加简单

    当计算在数据附近时会变得更高效,对于大型数据集更是如此,因为这样会最小化网络拥塞,并且最大化系统的总吞吐量。HDFS 为此提供了应用程序接口,将计算移动到数据旁边。

  • 异构硬件和软件平台的可移植性

    将 HDFS 从一个平台移植到另一个平台是很容易的。

NameNode 和 DataNodes

HDFS 使用主从接结构。一个 HDFS 集群包含一个 NameNode,它是一个主服务器,用来管理文件系统命名空间控制客户端对文件的访问。另外,包含大量的 DataNodes,通常集群中每个节点都有一个,它用来管理其所在节点的存储

HDFS 暴露一个文件系统命名空间并且允许用户存储数据到文件中。在存储时,每个文件被分成一个或多个 blocks,并且这些 blocks 被存储在 DataNodes 集合中。NameNode 执行文件系统命名空间操作,例如打开、关闭、重命名文件和目录。同时 NameNode 决定 blocks 和 DataNodes 的对应关系。DataNodes 用来响应来自文件系统客户端的读或者写请求。DataNodes 同时执行 NameNode 的指令,如创建、删除、复制等。

与 HDFS 的整体设计目标一致,NameNode 与 dataNodes 同样也是被设计基于普通机器的。具体做法为:

  • 运行于 Lnux 机器上
  • 使用 Java 语言,移植性好
  • 一般来说每个机器运行一个 DataNode

系统中仅存在单个 nameNode 简化了体系结构。metadata 被 namenode 决策和存储,而用户数据永远不会流经 NameNode。

文件系统命名空间

HDFS 支持传统的层次化文件组织。用户或者应用程序可以创建和存储数据到目录中。文件系统命名空间可以创建、移动、重命名文件,其支持用户配额和访问权限,目前 HDFS 并不支持硬链接和软链接。

除了文件系统命名空间信息,NameNode 同时存储复制因子(replication factor )等信息。

数据复制

HDFS 通过存储 blocks 序列的方式存储文件,为了达到高容错的目的,HDFS 采用复制的方式,每个文件的 block 大小复制因子都是可配置的。HDFS 中除了最后一个 block,其余 block 的大小相同,尽管可以添加可变长度 block 的支持。HDFS 的复制因子可以被指定,其可以在文件创建时、或者创建之后被指定。

NameNode 负责关于 block 复制工作的所有决策。它定期从集群中的所有节点收集 Heartbeat 和 Blockreport,收到 Heartbeat 代表 DataNodes 正常工作,Blockreport 中包含所有 block 的列表。

HDFS DataNodes

HDFS 中复制位置和 Rack 息息相关,其可以保证可靠性、可用性和带宽利用率。集群中两个节点的交流需要网络交换机,其中不同 Rack 上的机器的网络带宽比相同 Rack 上机器的网络交带宽差。

复制放置位置、策略:

从 Rack 的角度:

NameNode 决定每个 DataNodes 所属的 Rack id。其中有集中策略:

  • 均匀的将副本放置在不同的机架上

    优点:当整个机架失败时不会损失数据;在读取数据时可以使用多个机架的带宽;可以很轻松的达到负载均衡;

    缺点:提高了写操作的成本,因为按照这样方式写操作需要将块转移到多个机架上,之前提高过不同机架上机器传输对应的网络带宽差。

  • 复制因子为 3 时,其中一个副本在当前写机器上的机架上,其余的副本在相同的其余机架上但是不同节点;复制因子为 4 时,前三个按照上述策略,其余的副本随机放置,并且每个机架上的副本数量少于上限。

    上限计算公式:$\frac{replicas - 1}{racks + 2}$

    优点:减少了机架间的写流量,以提高写性能;由于机架失败的概率远远小于节点失败的概率,因此这个策略同样可以保证可靠性和可用性;在读取文件时减少了使用的聚合网络带宽,因此所有的节点并不是分布在不同的机架上。

从 DataNodes 的角度:

每个 DataNode 不允许拥有同一 block 的多个副本,因此副本的最大数量是当时 DataNodes 的总数。


除上述考虑基于机架感知的存储策略,还需要考虑存储类型。即确定候选节点后,还需要检查候选节点是否拥有满足要求的存储空间,如果不满足将查找另一个节点,如果在当前路径中找不到符合要求的足够节点,将会在第二个路径中寻找具有回退类型的节点。

副本选择

为达到最小化网络带宽消耗和读取延迟,HDFS 选择副本的原则是:就近原则。具体为如果读取节点机架存在副本,那么当前副本将会响应请求,不然考虑同一数据中心中,是否存在副本可以使用。

安全模式

启动时,NameNode 会进入安全模式,在安全模式中,不会发生数据块的复制。在这个模式中,NameNode 接收 Heartbeat 和 Nodereport,通过 Nodereport 的信息,被 Namenode 检查后的复制快被认为是安全复制的,在安全模式退出后,如果块的副本数量少于复制因子,NameNode 将会执行复制操作。

文件系统元数据的持久性

HDFS 命名空间被存储在 NameNode 中。HDFS使用一个事务日志 Editlog 存储每一次文件系统元数据的改变记录,NameNode 使用本地系统中的一个文件存储 EditLog;HDFS 中的命名空间、blocks 映射关系、文件系统属性被存储在 FsImage 文件中,NameNode 使用本地系统中的文件去存储。

NameNode 在内存中保存命名空间文件块映射的镜像。当 NameNode 启动或被触发时,其从硬盘中读取 EditLog 和 FsImage,并将 EditLog 中的事务应用到内存中的 FsImage 中,并且将最新的镜像存储到磁盘中的 FsImage 中,之后会删除旧的 EditLog,因为其所存储的事务都被持久化到 FsImage 中,这个过程叫做 checkpoint。checkpoint 的作用是通过获取文件系统元数据的快照并保存到 FsImage 中,以确保 HDFS 具有文件系统元数据的一致性视图。即使从 FsImage 是有效地,但是直接对 FsImage 编辑是无效的,通过这种方式,不需要对每次编辑修改 FsImage,而是将编辑存储到 EditLog 中,在 checkpoint 过程中,EditLog 被应用到 FsImage 中。checkpoint 的触发方式有两种,一种是以秒为单位的给定时间内出发;另一种在积累给定数量的文件系统事务后出发。

DataNodes 在本地文件系统中存储数据。其不知道有关 HDFS 文件的任何信息,它将每个 block 存储在本地文件系统的单个文件中,DataNodes 不会在同一目录中创建所有文件,而是使用一种启发式方法来确定每个目录中的最优文件数量,并适当的创建子目录。当 DataNodes 启动时,它会扫描本地文件系统,生成所有数据 blocks 的列表,这些数据 block 对应于文件系统中的每个文件,并将这个报告发送给 NameNode,这就是 Blockreport

通信协议

所有的 HDFS 通信协议都位于 TCP/IP 通信协议之上。Client 会与 NameNode 机器上配置好的 TCP 端口上建立连接,使用 NameNode 与 Client 协议进行通信,DataNodes 使用 DataNode 协议与 NameNode 进行通信。RPC 抽象封装了 Client 协议和 DataNdoes 协议,根据设计,NameNode 从不启动任何的 RPC,它仅响应由 DataNodes 或者 Client 发送的请求。

可靠性保障(高容错)

HDFS 的主要目标是在发生故障时,也要可靠的存储数据,即高容错的实现。三种常见的故障类型分比为:NameNode 故障、DataNodes 故障、网络 partition

数据磁盘故障,heartbeat 和 重新复制

每个 DataNode 周期性的向 NameNode 发送 Heartbeat 信息,网络 partitions 会使 DataNodes 的子集失去与 NameNode 的连接,NameNode 通过缺少的 Heartbeat 信息识别这种情况的发生。NameNode 标记缺少 Heartbeat 的 DataNodes 为“死亡”,并不会向这些 DataNodes发送新的 IO 请求,存储在死亡 DataNode 中的数据将不再可用,DataNodes 死亡将会导致一些 blocks 的复制因子少于指定值,NameNode 会跟踪需要复制的 blocks,并在需要时复制他们。

重新复制的触发条件有多种:其中一个 DataNode 不可用;副本可能损坏;DataNode 上的硬件磁盘可能损坏;复制因子增加等。

集群再平衡

HDFS 的架构与数据再平衡方案兼容。当某个 DataNode 上的空闲空间低于阈值时,将数据从一个 DataNode 移动到另一个 DataNode 是很有可能的,当对于某个文件突然出现高需求时,一个方案是动态的创建其余的副本并在集群中平衡其他数据,这种数据再平衡的方案还没有实现。

数据完整性

从 DataNodes 获取的数据 blcok 损失是有可能发生的,这种发生可能由于存储设备、网络故障、软件错误等,HDFS client 实现了对 HDFS 文件的校验和检查,当 Client 创建文件时,会计算文件的每个 block 的 checksum,并将这些 checksum 存储在同一个 HDFS 命名空间的隐藏文件中,当 Client 检索文件内容时,会将文件与相关联的 checksum 做匹配,如果匹配不成功,Client 会从其他含有 block 的 DataNode 中获取 block。

元数据磁盘故障

EditLog 和 FsImage 是 HDFS 的中心数据结构。这些文件的故障可能导致 HDFS 实例失去作用,因此,可以配置为将 Namenode 支持维护 EditLog 和 FsImage 的多个副本,对两个文件的任何更新都会同步到所有副本上,这种同步会降低每秒钟 NameNode 支持事务的速率,这是可以接收的,因为 HDFS 应用的数据敏感性是针对数据本身而言,而不是针对元数据。当 NameNode 重启时,他会选择最新的一致的 EditLog 和 FsImage。

提高对故障恢复力的另一个选项是使用多个 NameNode。

快照

快照支持在特定的时刻存储数据的副本,其中一种使用方式就是通过快照将损坏的 HDFS 实例回滚到以前所知的正确时间点。

数据组织

数据 blocks

HDFS 被设计用来支持非常大的数据集,这类应用一般只写一次但是读多次,并且要求读满足 stream 速度。HDFS 中默认块大小为 128M,如果可能,每个块存放在不同的 NameNode 上。

复制管道

当 Client 向 HDFS 文件写入数据时,如果复制因子为 3,NameNode 将会通过复制目标选择算法检索 DataNodes 列表,选择之后 client 将会向第一个 DataNode 写入,第一个 DataNode 将会分批接收数据,将每批数据写入其本地存储库,并且将该部分传输到列表中的第二个 DataNode 上。第二个 DataNode 类似,接收数据后,存储到本地存储库的同时,向第三个 DataNode 传输数据,最后第三个 DataNode 将每批数据写入其本地存储库。因此,数据是从一个 DataNode 到下一个 DataNode 的流水线。

可访问性

访问 HDFS 的不同方式:

  • Java API
  • REST API
  • HTTP browser

MapReduce

Hadoop MapReduce 是一个软件框架,在普通的硬件上,可以轻松的编写应用程序,用于以可靠的、容错的方式在大型集群(数千个节点)上并行处理大量数据(TB 级别的数据集)。

MapReduce 通常将输入数据集分割成独立的块,每个块以完全并行的方式被 map 任务处理,框架对 map 过程的输出进行排序,然后将其输入到 reduce 任务中。通常 job 的输入和输出都存储在文件系统中,框架负责调度任务,并且监控并重新执行失败的任务。

通常,计算 node 和存储 node 是相同的,MapReduce 和 HDFS 运行在同一组节点上,这种配置允许框架有效的在数据已经存在的 node 上有效的调度任务,从而产生跨集群的非常高的聚合带宽。

MapReduce 结构:每个集群拥有单个 master ResourceManager、每个节点拥有单个 worker NodeManager,每个应用对应一个 MPAppMaster。

应用通过实现适当的接口或者抽象类,定义输入输出位置并提供 map 与 reduce 函数,再加上一些作业参数,就完成了一个 job 的配置。Hadoop job client 提交 job(通常为 jar 包或者可执行文件)和参数配置到 ResourceManager,它会承担分发软件包/配置到每个 workers 的责任,调度任务并监视他们,向 job client 提供状态和诊断信息。

Hadoop 与 HDFS 一样,使用 Java 实现。

输入输出

MapReduce 仅对 <key, value> 进行操作,其将输入作为 <key, value> 集合,并且产生 <key, value> 集合作为输出。key 与 value 类必须能够被框架序列化,因此需要实现 Writable 接口,另外 key 必须实现WritableComparable 以便框架对其进行排序。

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

MapReduce Application:WordCount

WordCount:计算输入集合中每个单词出现的次数

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      // 处理一行,进行分词
      StringTokenizer itr = new StringTokenizer(value.toString());
      // 判断是否有下个词
      while (itr.hasMoreTokens()) {
        // 设置对应单词的 key
        word.set(itr.nextToken());
        // 每个单词生成一个 <key, value>
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    // 获取实例
    Job job = Job.getInstance(conf, "word count");
    // 进行配置
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    // 指定 combine,每个 map 的输出都经过本地的 combine
    job.setCombinerClass(IntSumReducer.class);
    // 对于所有 combine 的输出做 reduce 操作
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

MapReduce 用户接口

Mapper and Reducer

应用程序通常通过实现 Mapper、Reducer 接口实现 map、reduce 方法。

Mapper

Mapper 将输入的 <key, value> 映射成“中间”的一组键值对,类似于 <k1, v1> -> map -> <k2, v2> 过程。Map 过程是将输入转换成中间值的独立任务,转换成的中间值不需要与输入具有相同的类型,一个输入记录有可能被转换为 0 或多个中间值。

MapReduce 框架通过 job 的 Inputformat 生成 InputSplit ,并且每个 InputSplit 对应一个 map 任务。

Mapper 接口的实现通过 job.setMapperClass(class) 方法传递给 job,然后框架为 InputSplit 中每个<key, value> 调用 map 方法,应用程序也可以从写 cleanup 方法,来做清理工作。

应用可以使用 Counter 报告统计数据。

框架对与给定输出相关联的中间值进行分组,并经过 reduce 阶段,得到最终结果。用户可以指定比较器 Job.setGroupingComparatorClass(Class)来控制分组(group)。

Mapper 的输出被排序然后根据 Reducer 进行分组,partition 的数量与 job 的 reduce 任务数量一致,用户可以实现自定义 partitoner 控制分组规则。

map 数量

map 的数量由输入文件的大小决定,即文件对应的 block 数量(HDFS)

每个节点大概 10~100 个 map,并且任务的启动需要时间,因

此 map 至少需要一分钟的时间执行。

假如有 10 TB 的输入数据,block 大小为 128M,那么将会有 82000 数量的 map,当然 map 的数量可以手动设置。

Reducer

Reducer 中的 reduce 过程会处理一组中间值,这些中间值共享同一个 key。

job 中 reduce 的数量通过用户设置,具体方法为:Job.serNumReduceTasks(int)

Reduce 过程包含三个关键阶段:shuffle、sort、reduce

shuffle

Mapper 的输出排序后输入到 Reducer,在这个阶段,框架通过 http 获取所有 mapper 输出的相关 partition。

sort

框架对 Reducer 过程的输入通过 Key 进行分组,因为不同的 Mapper 可能拥有相同 Key 的输出。

shuffle 和 sort 阶段是同时发生的,当获取 map 输出时,map 输出被 merge。

Secondary Sort

在 reduce 之前,如果对于中间值中 key 与 map 输出中 key 的等价规则不同,则可以通过Job.setSortComparatorClass(Class)指定比较器。Job.setGroupingComparatorClass(Class)可用来控制中间值中 key 的分组方式,这两种方法可以结合使用来对 value 进行二次排序。

Reduce

在这个阶段,对于分组后输入的每个 <key, value list> 调用 reduce 方法。

reduce 的输出通常通过 context 写入文件系统。

Reducer 的输出是未经排序的。

reducer 的数量

0.95(or 1.75) * <no. of nodes> * <no. of maximum containers per node>

  • 0.95

    在 map 过程完成时,所有的 reduce 可以快速启动并且开始转换 map 的输出。

  • 1.75

    做的快的节点在完成第一轮 reduce 后,将启动第二轮 reduce,有利于 job 的负载均衡

reduce 数量的增加对增大框架的开销,但是会提高负载均衡并降低故障恢复成本

上面的比例因子略小于整数,是为了给推测任务和失败任务预留 reduce 位置。

reduce is None

如果不希望执行 reduce 过程,将 reduce 的数量设置为 0 是合法的。

这种情况下 map 的输出直接写入文件系统,并且不对 map 的结果进行排序。

Partitioner

partitioner 将 map 过程的输出,即中间值中的 key 进行分区,分区方法一般使用 hash 方法,分区的总数和 job 中 reduce 任务的总数一致。

Partitioner 默认使用 HashPartitioner。

Counter

Counter 是 MapReduce 应用程序报告其统计数据的工具。Mapper 和 Reducer 可以使用 Counter 进行数据统计。

Job 配置

Job 表示 MapReduce 中任务的配置。Job 是 MapReduce 框架描述任务执行的关键接口。

Job 通常用来指定 Mapper、combine、Reducer、Partitioner、InputFormat、OutputFormat。其中InputFormat、OutputFormat 是用来设置文件的路径等信息。

除此之外,Job 还可以指定其他信息:Comparator(用来 group)、放入 DistributedCache 中的文件、压缩相关、任务是否以 speculative 方式进行、每个 task 最大尝试次数。

Task 执行与环境

MRAppMaster 在独立的 jvm 中以子进程的方式执行 Mapper/Reducer task。

子任务继承父 MRAppMaster 中的环境,当然子 jvm 中的参数也可以另外指定。

内存管理
用户还可以指定子任务的最大虚拟内存,以及它递归启动的子进程。通过 mapreduce.{map reduce}.memory.mb. 指定的是每个进程的限制,这个值必须大于等于 jvm 指定的 -Xmx,否则 jvm 无法启动。
Map 参数

map 过程编辑过的 record,将会序列化到 buffer 中,metadata 将会存储到 accounting buffers 中。当序列化内容或者元数据超出 buffer 限制时,buffer 中的内容将被排序并写入磁盘。如果 buffers 被完全填满,map 将会阻塞。当 map 过程结束后,所有的数据都被写入到磁盘中,并且将磁盘上所有的 segments 写入到一个文件中。最小化向磁盘的溢出可以减少 map 的时间,但是更大的 buffer 会减少应用程序的内存。

包含两个参数:mapreduce.task.io.sort.mbmapreduce.map.sort.spill.percent

mapreduce.map.sort.spill.percent 满足后,后台线程会将溢出的内容转移到磁盘上

mapreduce.task.io.sort.mb 满足后,map 将会停止并将溢出的内容写入到磁盘上。

Shuffle/Reduce 参数

每个 Reduce 通过 HTTP 获取 Partitioner 分配好的输出,提取到内存中,并且将内存中的数据周期性的 merge 到磁盘中。

  • mapreduce.task.io.soft.factor

    指定同一时间 merge 磁盘 segment 的数量,如果文件数量超过了这个限制,将会分多个回合进行 merge;当然这个参数同样适用于 map 过程,但是相比于 Reduce,大多数作业在 map 阶段不应该达到这个限制值。

  • mapreduce.reduce.merge.inmem.thresholds

    被 merge 到磁盘之前,已排序好的数据在内存中的数量。这与mapreduce.map.sort.spill.percent类似,更像是一个触发器。在实践中,由于在内存中 merge segment 比在磁盘 merge segment 要便宜,一般设置为极大(1000)或极小(0)。

  • mapreduce.reduce.shuffle.merge.percent

    在内存 merge segment 之前,获取的 map 输出阈值,表示在内存中存储 map 输出所占被分配内存的百分比。

  • mapreduce.reduce.shuffle.input.buffer.percent

    内存的百分比,用于在 shuffle 期间分配存储 map 输出。

  • mapreduce.reduce.input.buffer.percent

    reduce 期间存储 map 输出的内存百分比。

具体的参数需要根据代码、注释进一步了解。

任务日志

任务的标准输出(stdout)、标准错误输出(stderr)、日志(syslog)被 NodeManager 读取,打印到${HADOOP_LOG_DIR}/userlogs中。

job 提交和监控

Job 是用户与 ResourceManager 交互的重要接口,可以理解为用户通过 Job 的配置影响 ResourceManager 。

Job 的功能:

  • 提交任务
  • 跟踪进度
  • 访问组件任务的报告和日志
  • 获取 MapReduce 集群的状态信息

job 提交过程包括:

  1. 检查 job 的输入输出规范
  2. 计算 job 的 InputSplit 值
  3. 为 job 的 DistributedCache 值设置必要的信息
  4. 在文件系统上复制 job 的 jar 包和配置到 MapReduce系统目录上
  5. 提交任务到 ResourceManager,选择性的监控状态

对于复杂的任务,也可以将多个 MapReduce 串联起来执行,因为 MapReduce 的输出再 HDFS 上,可以作为下一个 MapReduce 的输入。但是这种情况下,整体 job 的成功和失败需要由用户控制,可能涉及到以下两个命令:

  1. Job.summit()

    提交任务到集群并且立即返回

  2. Job.waitForCompletion(boolean)

    提交任务到集群,等待任务完成后返回

Job 输入

InputFormat 描述了 MapReduce job 的输入规范。作用为:

  1. 验证 job的输入规范
  2. 将输入文件拆分成符合规则的 InputSplit 实例,每个实例用于单个 Mapper。
  3. 提供 RecordReader 的实现,用于从股和规则的 InputSplit 实例中收集 record,用于 Mapper 的处理。

基于文件的 InputFormat 实现根据输入文件的大小分隔符合规则的 InputSplit 实例,其中输入文件的 block size 是分隔上限,下限可以通过 mapreduce.input.fileinputformat.split.minsize 指定。

InputSplit

InputSplit 表示单个 Mapper 处理的数据。

InputSplit 提供输入的面向 byte 的视图;RecordReader 负责处理和呈现一个面向 record 的视图。

RecordReader

RecordReader 从一个 InputSplit 中读取 <key, value> 对。

RecordReader 将 InputSplit 提供的 byte 视图转换成 record 视图,用于 Mapper 的处理

Job 输出

OutputFormat 描述了 MapReduce job 的输出规范。作用为:

  1. 验证 Job 的输出规范,例如输出文件夹是否存在
  2. 提供 RecordWriter 的实现,用来写入 job 的输出到文件。
OutputCommitter

OutputCommitter 描述 MapReduce job 的任务输出提交。作用为:

  1. 在初始化期间设置 job。
  2. 当 job 完成后,清楚 job。
  3. 设置任务的临时输出
  4. 提交任务的输出
  5. 舍弃任务的提交
任务的 side-effect files

在某些应用中,组建任务需要创建、写入 side-files,其余 job 的输出文件不同。

RecordWriter

RecordWriter 将 <key, value> 输出写入到文件中,即实现了将 job 的输出写入到文件功能。

其他有用的特性(未拓展)

  • 提交 job 到队列
  • Counters
  • 分布式缓存
  • Profiling
  • Debugging
  • 数据压缩
  • 跳过 bad records

WordCount 2.0

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

与 1.0 版本的不同:

  • 在 Mapper 或者 Reducer 实现中,描述了 setup 方法怎样访问配置参数
  • 描述了 jobs 中 DistributedCache 怎样被用于分布式只读数据,并且用户可以指定在计数时需要跳过的单词。
  • 描述 GenericOptionsParser 处理通用 Hadoop 命令行选项的实用程序。
  • 演示应用程序如何使用 Counter,以及如何将状态信息传递给 map 方法或者 reduce 方法。

YARN

YARN 的基础思想是将资源管理(Resource Manager)和 job 的调度/监视功能划分到单独的守护进程中。即拥有一个全局的 ResourceManager(RM)以及每个应用程序的 ApplicationMaster(AM),应用程序可以仅包含一个作业,或者多个作业。

ResourceManager 和 NodeManager 来自于数据计算框架,如 MapReduce,其中 ResourceManager 是用来在系统中协调资源的最终权威,NodeManager 是每个机器上的框架 agent,负责容器、监事并报告资源使用情况(CPU、MEN、DISK、NETWORK)。

ApplicationMaster 是特定于框架的库,用于与 ResourceManager 协商资源,并与 NodeManager 一起执行、监视任务。

MapReduce NextGen Architecture

ResourceManager 拥有两个重要组件:

  • 调度器(Scheduler)

    Scheduler 负责将资源分配给正在运行的应用程序,这些应用程序受到容量、队列的约束。它仅仅作为一个 Scheduler,不监视或者跟踪应用的执行,同样也不能保证由于软件或硬件故障而重新启动的 job。他仅在应用有资源需求时才会表现其资源调度的功能,它基于容易资源概念的抽象实现,中包含了诸如内存、cpu、磁盘、网络等元素。

    Scheduler 有一个可插拔的策略,负责在队列、应用程序之间对集群资源进行 partitioning。

  • 应用程序管理器(ApplicationsManager)

    • 负责接收提交的 job

    • 协商执行任务的第一个 ApplicationMaster 容器。

    • 提供服务,用于重启 ApplicationMaster 容器当发生故障时

      ApplicationMaster 作用:

      • 从 Scheduler 处协商适当的资源
      • 跟踪状态
      • 监视进度

YARN 支持通过 ReservationSystem 保留资源的概念,该组件允许用户指定资源超时和时间限制的配置文件,并且保留资源以确保重要工作的可预测执行。

为了在几千台 node 上运行 YARN,通过 Federation 支持联合的概念。Federation 允许将多个 YARN 子群连接在一起,作为单个大型集群出现。这种机制用于实现更大的规模,允许多个独立集群用于非常大的 job。