Hadoop 自定义InputFormat实现自定义Split

上一篇文章中提到了如何进行RecordReader的重写(见 http://www.linuxidc.com/Linux/2012-04/57831.htm ),本篇文章就是来实现如何实现自定义split的大小

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

要解决的需求:

(1)一个文本中每一行都记录了一个文件的路径,

(2)要求处理路径对应的文件,但是因为文件量比较大,所以想进行分布式处理

(3)所以就对输入的文档进行预处理,读取前N行做为一个splits,但是没有实现,因为重写FileSplit不是太容易实现,就偷懒直接定义一个split的大小是1000个字节,这样就可以将输入的文档进行分片了。

直接贴代码:

InputFormat

  1. /** 
  2. * @file LineInputFormat.java 
  3. * @brief自定义InputFormat 实现split大小的控制 
  4. * @author anbo, anbo724@gmail.com 
  5. * @version 1.0 
  6. * @date 2011-10-18 
  7. */  
  8. /* Copyright(C) 
  9. * For free 
  10. * All right reserved 
  11. * 
  12. */   
  13.   
  14.   
  15. package an.hadoop.test;  
  16.   
  17.   
  18. import java.io.IOException;  
  19. import java.util.ArrayList;  
  20. import java.util.List;  
  21.   
  22. import org.apache.commons.logging.Log;   
  23. import org.apache.commons.logging.LogFactory;  
  24. import org.apache.hadoop.fs.BlockLocation;  
  25. import org.apache.hadoop.fs.FileStatus;  
  26. import org.apache.hadoop.fs.FileSystem;  
  27. import org.apache.hadoop.fs.Path;  
  28. import org.apache.hadoop.io.LongWritable;  
  29. import org.apache.hadoop.io.Text;  
  30. import org.apache.hadoop.io.compress.CompressionCodec;  
  31. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  32. import org.apache.hadoop.mapreduce.InputFormat;  
  33. import org.apache.hadoop.mapreduce.InputSplit;  
  34. import org.apache.hadoop.mapreduce.JobContext;  
  35. import org.apache.hadoop.mapreduce.RecordReader;  
  36. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  37. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  38. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  39. import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  
  40.   
  41. public class LineInputFormat extends FileInputFormat<LongWritable , Text> {  
  42.       
  43.     public long mySplitSize = 1000;  
  44.       
  45.      private static final Log LOG = LogFactory.getLog(FileInputFormat.class);  
  46.   
  47.       private static final double SPLIT_SLOP = 1.1;   // 10% slop   
  48.   
  49.      @Override  
  50.       public RecordReader<LongWritable, Text>   
  51.         createRecordReader(InputSplit split,  
  52.                            TaskAttemptContext context) {  
  53.         return new LineRecordReader(); //为什么不行呢    
  54.       }  
  55.       
  56.     @Override  
  57.     protected boolean isSplitable(JobContext context, Path file) {  
  58.         CompressionCodec codec =  
  59.         new CompressionCodecFactory(context.getConfiguration()).getCodec(file);  
  60.         //return codec == null;   
  61.         return true;//要求分片   
  62.     }  
  63.       
  64.      /**  
  65.        * Generate the list of files and make them into FileSplits. 
  66.        */   
  67.     @Override  
  68.       public List<InputSplit> getSplits(JobContext job) throws IOException {  
  69.         long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  70.         long maxSize = getMaxSplitSize(job);  
  71.   
  72.         // generate splits   
  73.         List<InputSplit> splits = new ArrayList<InputSplit>(); //用以存放生成的split的     
  74.         for (FileStatus file: listStatus(job)) {//filestatues是文件对应的信息,具体看对应的类   
  75.           Path path = file.getPath();  
  76.           FileSystem fs = path.getFileSystem(job.getConfiguration());  
  77.           long length = file.getLen(); //得到文本的长度   
  78.           BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); //取得文件所在块的位置   
  79.           if ((length != 0) && isSplitable(job, path)) { //如果文件不为空,并且可以分片的话就进行下列操作,   
  80.             long blockSize = file.getBlockSize();//   
  81.             //long splitSize = computeSplitSize(blockSize, minSize, maxSize); //split的大小Math.max(minSize, Math.min(maxSize, blockSize));   
  82.             //可以通过调整splitSize的大小来控制对应的文件块的大小,比如设置splitSize=100,那么就可以控制成每个split的大小   
  83.             //但是问题是,我是要求按行进行处理的,虽然这样应该也可以按行进行切分了,不过却不能保证每个split对应的行数都是相等的   
  84.             //一般情况是如果文件大于64M(32M)就会使用块大小来作为split   
  85.             long splitSize = mySplitSize;  
  86.             long bytesRemaining = length; //文本的长度   
  87.               
  88.             while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {//剩下的文本长度大于split大小的SPLIT_SLOP倍数   
  89.               int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//找到对应block块中对应的第0个字符开始,   
  90.               splits.add(new FileSplit(path, length-bytesRemaining, splitSize,    
  91.                                        blkLocations[blkIndex].getHosts()));   
  92.             //这个是形成split的代码FileSplit(文件路径,0,split大小,host)   
  93.               //原始函数为 FileSplit(Path file, long start, long length, String[] hosts) {   
  94.               //但是应该可以通过重写FileSplit来实现对应的要求   
  95.               bytesRemaining -= splitSize;  
  96.             }  
  97.               
  98.             if (bytesRemaining != 0) {  
  99.               splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,   
  100.                          blkLocations[blkLocations.length-1].getHosts()));  
  101.             }  
  102.           } else if (length != 0) {  
  103.             splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));  
  104.           } else {   
  105.             //Create empty hosts array for zero length files   
  106.             splits.add(new FileSplit(path, 0, length, new String[0]));  
  107.           }  
  108.         }  
  109.         LOG.debug("Total # of splits: " + splits.size());  
  110.         return splits;  
  111.       }  
  112.   
  113.       
  114.   
  115.       
  116.       
  117.       
  118.   
  119. }  
2019-03-28 13:52

知识点

相关教程

更多

在Hadoop的streaming中使用自定义的inputformat和outputformat

在Hadoop的streaming中有一个选项是指定输入输出格式化的:          -inputformatTextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassNameOptional.  -outputformatTextOutputFormat(default)|JavaClassNameOptional.

MapReduce高级编程之自定义InputFormat

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?        InputFormat其实是一个接口,包含了两个方法:        public interface InputFormat<K, V> { InputSplit[]getSplits(JobConf job, int numSplits) throws IOExcep

Hadoop streaming中指定自定义的inputformat java类

解决了Hadoop streaming中指定自定义的inputformat java类 想在streaming中用自己的输入类: 看到网上说: How do I provide my own input/output format with streaming? At least as late as version 0.14, Hadoop does not support multiple j

Hadoop : 新版API 自定义InputFormat 把整个文件作为一条记录处理

自定义InputFormat 新版API 把真个文件当成一条输入  主要参考 源代码LineRecordReader里面的内容 有些细节还没有理解        WholeFileInputFormat importjava.io.IOException; importorg.apache.Hadoop.fs.Path; importorg.apache.hadoop.io.BytesWritab

自定义实现Hadoop Key-Value

自定义实现Value: 如果需要自定义一个一个Value类型,那么需要实现Hadoop预定义接口org.apache.hadoop.io.Writable。Writable包含两个重要的方法:readFields和write,主要用于数据的序列化和反序列化。对于Writable的子类的成员变量必须是Java的基本类型或者是其他实现了Writable接口的类型。如果是Java的基本类型则可以使用Da

自定义Hadoop Writable

Hadoop中已经定义了很多Writable的实现,基本上可以符合我们日常使用,但是在一些特殊的场景我们可能还是需要自己去实现Writable,下面主要说明如何去实现自己的Writeable,及使用自定义的Writable作为map/reduce中的key值时遇到的一些问题。 首先需要实现org.apache.hadoop.io.Writable这个接口,该接口有write和readFields这

自定义Hadoop Map/Reduce输入文件切割InputFormat

Hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作 为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方 法。至于获得记录的方法是有不同的子类进行实现的。 那么,FileInputFormat是怎样将他们划分成splits

Hadoop自定义RecordReader

系统默认的LineRecordReader是按照每行的偏移量做为map输出时的key值,每行的内容作为map的value值,默认的分隔符是回车和换行。 现在要更改map对应的输入的<key,value>值,key对应的文件的路径(或者是文件名),value对应的是文件的内容(content)。 那么我们需要重写InputFormat和RecordReader,因为RecordReade

Hadoop自定义SdfTextInputFormat用在streaming中

本人在(Hadoop streaming中指定自定义的inputformat java类 http://www.linuxidc.com/Linux/2012-04/57830.htm)中写了自定义InputFormat的制作。 可是直接加入后一直得不到想要的结果。 查看源码发现:           PipeMapper.java if(!this.ignoreKey){ write(key);

自定义Hadoop的可序列化类

Java原生语言中要想一个自定义类可序列化,很简单,只要让这个类实现java.io.Serializable接口就可以了,但是在Hadoop框架中,要想让自定义类可以被序列化,我们必须手动让其实现WritableCompable接口并且实现write(),readFields(),compareTo()方法。 下面就是一个我们自定义的可序列化的类:     /*  */  packagecom.c

实现MapReduce多文件自定义输出

普通maprduce中通常是有map和reduce两个阶段,在不做设置的情况下,计算结果会以part-000*输出成多个文件,并且输出的文件数量和reduce数量一样,文件内容格式也不能随心所欲。这样不利于后续结果处理。 在Hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方

荐 Twitter Storm Stream Grouping编写自定义分组实现

自定义Grouping测试  Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。 这是我写的一个自定义分组,总是把数据分到第一个Task: public class MyFirstStreamGrouping implements CustomStreamGrouping {    private static Logger

Hadoop Oozie学习笔记 自定义安装和启动

cloudera里面有基于Oozie的安装,但是通过sudo apt-get install oozie的方式.其中你要将cloudera Hadoop的source放入sources.list.由于我升级了最新的Ubuntu.但对应的cloudera hadoop还没有相应的版本.所以添加sources.list无法成功.网上也没什么手动安装的资料,于是我就写点吧.参考来源:http://inc

[置顶] 【Apache Solr系列】使用IKAnalyzer中文分词以及自定义分词字典

之前写的Apache Solr只介绍了简单的搭建以及导入数据等功能,最近由于项目要求,添加索引分词和搜索分词功能;分词的项目有包括好几个:smartcn、ictclas4j、IK、jeasy、庖丁、mmseg4j; 以上几种分词器各有优缺点,根据不同场景可分可定制和不可定制两种: 一种是基于中科院ICTCLAS的隐式马尔科夫HMM算法的中文分词器,如smartcn,ictclas4j,优点是分词准

页面自定义布局

就好象 GOOGLE 自定义主页一样。如果我只有2栏目,左右可以轻松的用 float:left/right 来控制,可是上下的顺序要如何控制好呢????  问题补充:  推荐使用jQuery来动态生成dom,after是在某个dom之后,before是在之前,append是在之中,三个够你随便玩转dom的排列了  ------------------------------------------

最新教程

更多

java线程状态详解(6种)

java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

redis从库只读设置-redis集群管理

默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty基于流的传输处理

​在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-编写服务器端程序

channelRead()处理程序方法实现如下

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

电商平台数据库设计

电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

HttpClient 上传文件

我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

MongoDB常用命令

查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

快速了解MongoDB【基本概念与体系结构】

什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

windows系统安装MongoDB

安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

Spring boot整合MyBatis-Plus 之二:增删改查

基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
import com.baomidou.mybatisplus.annotations.TableName;
import com.baom

分布式ID生成器【snowflake雪花算法】

基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案