博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop mapreduce中对splite的处理
阅读量:6273 次
发布时间:2019-06-22

本文共 2242 字,大约阅读时间需要 7 分钟。

分片:
1. 在job.submit() 提交job之后  会调用 submitter.submitJobInternal(Job.this, cluster);
 
2. 在submitJobInternal()函数中 会给job创建分片 int maps = writeSplits(job, submitJobDir); 在该函数中会调用writeNewSplits()
 
3. 在writeNewSplits()方法 中,通过反射获得InputFormat对象,会调用该对象中的getSplits()方法,来进行分片,从而得到InputSplit[] 数组. 然后会通过 JobSplitWriter.createSplitFiles方法将数组内容写出. writeNewSplits方法返回的是分片数目,决定了会创建多少个 map task.
 
4.  在JobSplitWriter.createSplitFiles方法中, 会打开一个输出流out,输出文件名是(${jobSubmitDir}/job.split)
    其中调用writeNewSplits()来完成写出操作,.与此同时,该函数会返回一个SplitMetaInfo的数组.
    在该数据结构中主要包括三个属性:
    long   startOffset :  该分片在 job.split 中的偏移量
    long inputDataLength: 分片数据长度
    String[] locations: hosts on which this split is local
 
5.最后调用writeJobSplitMetaInfo()方法  将第5步中的SplitMetaInfo数组写入到另一个文件中,文件名是${jobSubmitDir}/job.splitmetainfo.
 
6. 上述步骤中最终会输出两个文件 job.split 和 job.splitmetainfo.
    在 job.split内容:   split的类名, split的序列化信息,在FileSplit类中会写入文件名,偏移量,和长度
    在 job.splitmetainfo中内容: META_SPLIT_FILE_HEADER , version , 分片数量, splitMetaInfo序列化(包括locations的数目, 以此写入所有的locations, startOffset,inputDataLength)
 
到此为止,将分片信息记录完成,写入到HDFS中相应的文件中.
 

读取分片:
1. 首先在JobImpl类中的InitTransition中会读取相应的split信息, 并启动相应的Task
 
2. TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId)
    在该函数中会通过SplitMetaInfoReader.readSplitMetaInfo函数从job.splitmetainfo文件中读取出相应信息,首先会先验证META_SPLIT_VERSION和numSplits,然后会依次读取出每个splitMetaInfo,根据splitMetaInfo再从 job.split 中读取相应数据,构建出TaskSplitIndex对象,然后得到TaskSplitMetaInfo对象,最后返回 TaskSplitMetaInfo[]数组.
 TaskSplitIndex有两个属性: String splitLocation 和  long startOffset;
 TaskSplitMetaInfo有三个属性: TaskSplitIndex splitIndex , long inputDataLength,  String[] locations
    
 
3. 在创建Map任务的时候会将该数组传入 , createMapTasks(job, inputLength, taskSplitMetaInfo);
    对应每个taskSplitMetaInfo 会创建一个TaskImpl,并传入对应taskSplitMetaInfo
     TaskImpl task =new MapTaskImpl(....);
 
4. 在MapTaskImpl中会创建MapTaskAttemptImpl对象,该对象中存在createRemoteTask方法,在改方法中创建了实际的MapTask对象
MapTask mapTask =new MapTask("", TypeConverter.fromYarn(getID()), partition,splitInfo.getSplitIndex(), 1);
splitInfo.getSplitIndex()会返回一个TaskSplitIndex对象,
 
5. 在MapTask执行runNewMapper方法时,会通过
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());
    读取到实际的文件,最后通过InputFormat接口的createRecordReader方法得到需要的RecordReader.

转载于:https://www.cnblogs.com/thstone/p/4143435.html

你可能感兴趣的文章
走红日本 阿里云如何能够赢得海外荣耀
查看>>
磁盘空间满引起的mysql启动失败:ERROR! MySQL server PID file could not be found!
查看>>
点播转码相关常见问题及排查方式
查看>>
[arm驱动]linux设备地址映射到用户空间
查看>>
弗洛伊德算法
查看>>
【算法之美】求解两个有序数组的中位数 — leetcode 4. Median of Two Sorted Arrays
查看>>
精度 Precision
查看>>
Android——4.2 - 3G移植之路之 APN (五)
查看>>
Linux_DHCP服务搭建
查看>>
[SilverLight]DataGrid实现批量输入(like Excel)(补充)
查看>>
秋式广告杀手:广告拦截原理与杀手组织
查看>>
翻译 | 摆脱浏览器限制的JavaScript
查看>>
闲扯下午引爆乌云社区“盗窃”乌云币事件
查看>>
02@在类的头文件中尽量少引入其他头文件
查看>>
JAVA IO BIO NIO AIO
查看>>
input checkbox 复选框大小修改
查看>>
网吧维护工具
查看>>
BOOT.INI文件参数
查看>>
vmstat详解
查看>>
新年第一镖
查看>>