本文作者:qiaoqingyi

小象编程(小象编程训练营下载)

qiaoqingyi 01-25 102

我 相 信 这 么 优秀 的 你

已 经 置 顶了 我

  专栏作者|蒋涉权

  转载请联系后台

  作者简介

  蒋涉权

  毕业于北京理工大学,大数据工程师,研究领域分布式计算,数据挖掘,算法,其中主要研究HDFS和Spark,目前就职于某大型设备商公司。

◇◆◇◆◇

概述

  DISTCP(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具,是Hadoop用户常用的命令之一。它使用Map/Reduce实现大量文件拷贝分发,错误处理和恢复,以及报告生成。它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝。由于使用了Map/Reduce方法,这个工具在语义和执行上都会有特殊的地方。 DISTCP的源码可以在Hadoop工程源码的hadoop-tools工程中找到,代码量不算大,主流程代码在org.apache.hadoop.tools和org.apache.hadoop.tools.mapred两个包中,笔者由于在工作中有需求要通过MR实现备份海量HDFS数据,因此研究了一波DISTCP源码并仿照它实现了自己的代码,实测在千兆网络中可以3~4小时内拷贝几TB的数据,这篇文档会介绍DISTCP的工作主流程,通过分析工具的代码也有助于我们更好的理解MR编程框架,并可以对HDFS文件系统有一个初步的了解。

  注:笔者当前使用的hadoop版本是2.7.2

◇◆◇◆◇

用法

  DISTCP最常用在集群之间的拷贝:

  Bash# hadoop distcphdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo

  这条命令会把nn1集群的/foo/bar目录下的所有文件或目录名展开并存储到一个临时文件中,这些文件内容的拷贝工作被分配给多个map任务,然后每个MapTask分别执行从nn1到nn2的拷贝操作。

  DISTCP在支持跨集群拷贝的主功能上还支持很多额外的功能选项,丰富了用户的选择,主要的选项如下索引,但是我们这里分析源码的时候,会忽略这些额外的功能选项的源码,只看主体的数据拷贝功能实现,只要读懂了主体功能,其它的额外分支只需在这基础上研读就很方便了。

  标识

  描述

  备注

  -p[rbugp]

  Preserve

  r: replication number

  b: block size

  u: user

  g: group

  p: permission

  修改次数不会被保留。并且当指定 -update 时,更新的状态不会被同步,除非文件大小不同(比如文件被重新创建)

  -i

忽略失败

这个选项会比默认情况提供关于拷贝的更精确的统计,同时它还将保留失败拷贝操作的日志,这些日志信息可以用于调试。最后,如果一个map失败了,但并没完成所有分块任务的尝试,这不会导致整个作业的失败

  -log logdir

  记录日志到 logdir

  DistCp为每个文件的每次尝试拷贝操作都记录日志,并把日志作为map的输出。如果一个map失败了,当重新执行时这个日志不会被保留

  -m num_maps

同时拷贝的最大数目

指定了拷贝数据时map的数目。请注意并不是map数越多吞吐量越大

  -overwrite

  覆盖目标

如果一个map失败并且没有使用-i选项,不仅仅那些拷贝失败的文件,这个分块任务中的所有文件都会被重新拷贝

  -update

  如果源和目标的大小不一样则进行覆盖

  这不是"同步"操作。执行覆盖的唯一标准是源文件和目标文件大小是否相同;如果不同,则源文件替换目标文件

  -f urilist_uri

  使用urilist_uri 作为源文件列表

  这等价于把所有文件名列在命令行中。 urilist_uri 列表应该是完整合法的URI

◇◆◇◆◇

源代码与过程分析

  阅读源码可以从工程中org.apache.hadoop.tools包里的DistCp类开始入手,DistCp的构造方法先处理了一下输入参数(即上面说到的额外功能选项)、读入配置项以及做了hadoop初始化工作,这里的东西不用过多关注,它不会影响我们读懂源码。

  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {

  Configuration config = new Configuration(configuration);

  config.addResource(DISTCP_DEFAULT_XML);

  setConf(config);

  this.inputOptions = inputOptions;

  this.metaFolder = createMetaFolderPath();

  }

  在DistCp类中,有一个有用的方法接口public int run(String[] argv),实际上当我们运行工具进行HDFS数据跨集群拷贝时,就是通过这个方法调度运行的,这个方法也很简单,只是对路径等做一些检查以及对execute()方法做异常处理,而execute方法则是任务执行方法。

  在execute()方法中,会调用createAndSubmitJob()创建MR任务,准备数据,设定数据输入格式,并把任务提交到hadoop集群运行,最后等待任务执行完毕。于是我们可以看到,主体功能实现就在createAndSubmitJob()这个函数体里,工程中其它的各个类无非都是为这个函数接口服务的。下面就是createAndSubmitJob()的代码,这里删除了一些不影响阅读的源码,只留下主体功能流程。

  public Job createAndSubmitJob() throws Exception {

  Job job = null;

  try {

  synchronized(this) {

  metaFolder = createMetaFolderPath();

  jobFS = metaFolder.getFileSystem(getConf());

  job = createJob();

  }

  if (!inputOptions.shouldUseDiff()) {

  createInputFileListing(job);

  }

  job.submit();

  submitted = true;

  } finally {

  if (!submitted) {

  cleanup();}

  return job;

  }

  接下来我们就可以分析一下createAndSubmitJob()中各行分别实现了什么功能,下面我们一个部分一个部分的分析。

  metaFolder:

  metafolder是DISTCP工具准备元数据的地方,在createMetaFolderPath()中会结合一个随机数生成一个工作目录,在这个目录中迟点会通过getFileListingPath()生成fileList.seq文件,然后往这个文件中写入数据,这是一个SequenceFile文件,即Key/Value结构的序列化文件,这个文件里将存放所有需要拷贝的源目录/文件信息列表。其中Key是源文件的Text格式的相对路径,即relPath;而Value则记录源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息,这里FileStatus是hadoop已经封装好了的描述HDFS文件信息的类,但是DISTCP为了更好的处理数据,重新继承并封装了CopyListingFileStatus类,其描述如下图1,不过我们其实可以认为这里的Value就是FileStatus即可。metafolder目录中的fileList.seq最终会作为参数传递给MR任务中的Mapper。

  

  private Path createMetaFolderPath() throws Exception {

  Configuration configuration = getConf();

  Path stagingDir = JobSubmissionFiles.getStagingDir(

  new Cluster(configuration), configuration);

  Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));

  if (LOG.isDebugEnabled())

  LOG.debug("Meta folder location: " + metaFolderPath);

  configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());

  return metaFolderPath;

  }

  Job:

  这个Job定义的就是我们的MapReduce任务了,在hadoop中每个任务都要定义一个Job,在Job中设定任务的输入输出、任务名字和运行逻辑等东西,DISTCP工具的Job在createAndSubmitJob()方法中的job = createJob()实现。实现源码如下:

  private Job createJob() throws IOException {

  String jobName = "distcp";

  String userChosenName = getConf().get(JobContext.JOB_NAME);

  if (userChosenName != null)

  jobName += ": " + userChosenName;

  Job job = Job.getInstance(getConf());

  job.setJobName(jobName);

  job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));

  job.setJarByClass(CopyMapper.class);

  configureOutputFormat(job);

  job.setMapperClass(CopyMapper.class);

  job.setNumReduceTasks(0);

  job.setMapOutputKeyClass(Text.class);

  job.setMapOutputValueClass(Text.class);

  job.setOutputFormatClass(CopyOutputFormat.class);

  job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");

  job.getConfiguration().set(JobContext.NUM_MAPS,

  String.valueOf(inputOptions.getMaxMaps()));

  if (inputOptions.getSslConfigurationFile() != null) {

  setupSSLConfig(job);

  }

  inputOptions.appendToConf(job.getConfiguration());

  return job;

  }

  这里面的源码已经是很清晰的,跟常规MR任务的设定差不多,我们主要关注以下两行。

  ● job.setInputFormatClass(DistCpUtils.getStrategy(getConf(),inputOptions));

  ● job.setMapperClass(CopyMapper.class);

  我们在设置MapReduce输入格式的时候,会调用上面第一行这样一条语句,这条语句保证了输入文件会按照我们预设的格式被读取。setInputFormatClass里设定了Mapper的数据读取格式,也就是由getStrategy(getConf(), inputOptions)得到,进到这个函数里面,可以看到最终Mapper数据输入格式由UniformSizeInputFormat.class这个类定义的,而这个类继承自InputFormat.class,MR中所有的输入格式类都继承自InputFormat,这是一个抽象类。

  InputFormat抽象类仅有两个抽象方法:

  ● ListInputSplitgetSplits(),获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。

  ● RecordReaderK,VcreateRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

  通过InputFormat,Mapreduce框架可以做到:验证作业输入的正确性;将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask;提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用。

  在DISTCP中,UniformSizeInputFormat继承了InputFormat并实现了数据读入格式,它会读取metafolder中fileList.seq序列化文件的内容,并根据用户设定的map数和拷贝总数据量进行分片,计算出分多少片,最终提供“K-V”对供Mapper使用。这个类的源码实现并不复杂,加上注释一共也才100多行,很容易就能读懂。

  CopyMapper.class中则定义了每个map的工作逻辑,也就是拷贝的核心逻辑,任务提交到hadoop集群中运行时每个map就是根据这个逻辑进行工作的,通过setMapperClass设定。这里要注意的是DISTCP任务只有map没有reduce,因为只需要map就可以完成拷贝数据的工作。CopyMapper的源码实现在org.apache.hadoop.tools.mapred这个包中,CopyMapper里最核心实现是setup()和map()这两个方法,这两个方法其实也是MR中Mapper的固有通用方法,setup()中完成map方法的一些初始化工作,在DISTCP中,这个方法里会设定对端的目标路径,并做一些参数设置和判断工作,源码(删掉了参数设置部分)如下:

  public void setup(Context context) throws IOException, InterruptedException {

  conf = context.getConfiguration();

  targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));

  Path targetFinalPath = new Path(conf.get(

  DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));

  targetFS = targetFinalPath.getFileSystem(conf);

  if (targetFS.exists(targetFinalPath) targetFS.isFile(targetFinalPath)) {

  overWrite = true; // When target is an existing file, overwrite it.

  }

  }

  而map()方法,内容较多,我们可以看其接口如下:

  ● public void map(Text relPath,CopyListingFileStatus sourceFileStatus, Context context) throws IOException,InterruptedException

  从输入参数可以看出来,这其实就是对上面分析过的UniformSizeInputFormat类里分片后的数据里的每一行进行处理,每行里存放的就是“K-V”对,也就是fileList.seq文件每行的内容。Map方法体前半部分有一大堆代码内容,其实都是一些准备和判断工作,为后面的拷贝服务,最终的拷贝动作发生在copyFileWithRetry(deion,sourceCurrStatus, target, context, action, fileAttributes)这个函数中,进入这个函数一直往里面读,就能看到数据最终通过常用的Java输入输出流的方式完成点对点拷贝,最后拷贝方法源码如下:

  long copyBytes(FileStatus sourceFileStatus, long sourceOffset,

  OutputStream outStream, int bufferSize, Mapper.Context context)

  throws IOException {

  Path source = sourceFileStatus.getPath();

  byte buf[] = new byte[bufferSize];

  ThrottledInputStream inStream = null;

  long totalBytesRead = 0;

  try {

  inStream = getInputStream(source, context.getConfiguration());

  int bytesRead = readBytes(inStream, buf, sourceOffset);

  while (bytesRead = 0) {

  totalBytesRead += bytesRead;

  if (action == FileAction.APPEND) {

  sourceOffset += bytesRead;

  }

  outStream.write(buf, 0, bytesRead);

  updateContextStatus(totalBytesRead, context, sourceFileStatus);

  bytesRead = readBytes(inStream, buf, sourceOffset);

  }

  outStream.close();

  outStream = null;

  } finally {

  IOUtils.cleanup(LOG, outStream, inStream);

  }

  return totalBytesRead;

  }

  元数据生成(createInputFileListing):

  前面提到在metafolder目录中会生成fileList.seq文件,而这个文件是怎么生成以及文件里面保存些什么内容呢?这个逻辑就在createInputFileListing(job)中完成的,源码如下。首先由getFileListingPath()方法创建一个空的seq文件,然后由buildListing()方法往这个seq文件写入数据,数据写入的整体逻辑就是遍历源路径下所有目录和文件,把每个文件的相对路径和它的CopyListingFileStatus以“K-V”对的形式写入fileList.seq每行中,最终就得到Mapper的输入了。

  protected Path createInputFileListing(Job job) throws IOException {

  Path fileListingPath = getFileListingPath();

小象编程(小象编程训练营下载)

  CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),

  job.getCredentials(), inputOptions);

  copyListing.buildListing(fileListingPath, inputOptions);

  return fileListingPath;

  }

◇◆◇◆◇

Finally

  最终完成任务以后,在finally里,要做一些清理工作,也就是删除在前面创建出来的metafolder,这里非常简单,也就是一个删除文件夹的逻辑,我们就不贴源码了。

阅读
分享