Hadoop 自定义InputFormat实现自定义Split
上一篇文章中提到了如何进行RecordReader的重写(见 http://www.linuxidc.com/Linux/2012-04/57831.htm ),本篇文章就是来实现如何实现自定义split的大小
更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
要解决的需求:
(1)一个文本中每一行都记录了一个文件的路径,
(2)要求处理路径对应的文件,但是因为文件量比较大,所以想进行分布式处理
(3)所以就对输入的文档进行预处理,读取前N行做为一个splits,但是没有实现,因为重写FileSplit不是太容易实现,就偷懒直接定义一个split的大小是1000个字节,这样就可以将输入的文档进行分片了。
直接贴代码:
InputFormat
- /**
- * @file LineInputFormat.java
- * @brief自定义InputFormat 实现split大小的控制
- * @author anbo, anbo724@gmail.com
- * @version 1.0
- * @date 2011-10-18
- */
- /* Copyright(C)
- * For free
- * All right reserved
- *
- */
- package an.hadoop.test;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.fs.BlockLocation;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.CompressionCodecFactory;
- import org.apache.hadoop.mapreduce.InputFormat;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
- public class LineInputFormat extends FileInputFormat<LongWritable , Text> {
- public long mySplitSize = 1000;
- private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
- private static final double SPLIT_SLOP = 1.1; // 10% slop
- @Override
- public RecordReader<LongWritable, Text>
- createRecordReader(InputSplit split,
- TaskAttemptContext context) {
- return new LineRecordReader(); //为什么不行呢
- }
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- CompressionCodec codec =
- new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
- //return codec == null;
- return true;//要求分片
- }
- /**
- * Generate the list of files and make them into FileSplits.
- */
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
- long maxSize = getMaxSplitSize(job);
- // generate splits
- List<InputSplit> splits = new ArrayList<InputSplit>(); //用以存放生成的split的
- for (FileStatus file: listStatus(job)) {//filestatues是文件对应的信息,具体看对应的类
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- long length = file.getLen(); //得到文本的长度
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); //取得文件所在块的位置
- if ((length != 0) && isSplitable(job, path)) { //如果文件不为空,并且可以分片的话就进行下列操作,
- long blockSize = file.getBlockSize();//
- //long splitSize = computeSplitSize(blockSize, minSize, maxSize); //split的大小Math.max(minSize, Math.min(maxSize, blockSize));
- //可以通过调整splitSize的大小来控制对应的文件块的大小,比如设置splitSize=100,那么就可以控制成每个split的大小
- //但是问题是,我是要求按行进行处理的,虽然这样应该也可以按行进行切分了,不过却不能保证每个split对应的行数都是相等的
- //一般情况是如果文件大于64M(32M)就会使用块大小来作为split
- long splitSize = mySplitSize;
- long bytesRemaining = length; //文本的长度
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {//剩下的文本长度大于split大小的SPLIT_SLOP倍数
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//找到对应block块中对应的第0个字符开始,
- splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- //这个是形成split的代码FileSplit(文件路径,0,split大小,host)
- //原始函数为 FileSplit(Path file, long start, long length, String[] hosts) {
- //但是应该可以通过重写FileSplit来实现对应的要求
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining != 0) {
- splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length-1].getHosts()));
- }
- } else if (length != 0) {
- splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
- } else {
- //Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
- }
- }
- LOG.debug("Total # of splits: " + splits.size());
- return splits;
- }
- }
2019-03-28 13:52
知识点
相关教程
更多在Hadoop的streaming中使用自定义的inputformat和outputformat
在Hadoop的streaming中有一个选项是指定输入输出格式化的: -inputformatTextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassNameOptional. -outputformatTextOutputFormat(default)|JavaClassNameOptional.
MapReduce高级编程之自定义InputFormat
InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢? InputFormat其实是一个接口,包含了两个方法: public interface InputFormat<K, V> { InputSplit[]getSplits(JobConf job, int numSplits) throws IOExcep
Hadoop streaming中指定自定义的inputformat java类
解决了Hadoop streaming中指定自定义的inputformat java类 想在streaming中用自己的输入类: 看到网上说: How do I provide my own input/output format with streaming? At least as late as version 0.14, Hadoop does not support multiple j
Hadoop : 新版API 自定义InputFormat 把整个文件作为一条记录处理
自定义InputFormat 新版API 把真个文件当成一条输入 主要参考 源代码LineRecordReader里面的内容 有些细节还没有理解 WholeFileInputFormat importjava.io.IOException; importorg.apache.Hadoop.fs.Path; importorg.apache.hadoop.io.BytesWritab
自定义实现Hadoop Key-Value
自定义实现Value: 如果需要自定义一个一个Value类型,那么需要实现Hadoop预定义接口org.apache.hadoop.io.Writable。Writable包含两个重要的方法:readFields和write,主要用于数据的序列化和反序列化。对于Writable的子类的成员变量必须是Java的基本类型或者是其他实现了Writable接口的类型。如果是Java的基本类型则可以使用Da
自定义Hadoop Writable
Hadoop中已经定义了很多Writable的实现,基本上可以符合我们日常使用,但是在一些特殊的场景我们可能还是需要自己去实现Writable,下面主要说明如何去实现自己的Writeable,及使用自定义的Writable作为map/reduce中的key值时遇到的一些问题。 首先需要实现org.apache.hadoop.io.Writable这个接口,该接口有write和readFields这
自定义Hadoop Map/Reduce输入文件切割InputFormat
Hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作 为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方 法。至于获得记录的方法是有不同的子类进行实现的。 那么,FileInputFormat是怎样将他们划分成splits
Hadoop自定义RecordReader
系统默认的LineRecordReader是按照每行的偏移量做为map输出时的key值,每行的内容作为map的value值,默认的分隔符是回车和换行。 现在要更改map对应的输入的<key,value>值,key对应的文件的路径(或者是文件名),value对应的是文件的内容(content)。 那么我们需要重写InputFormat和RecordReader,因为RecordReade
Hadoop自定义SdfTextInputFormat用在streaming中
本人在(Hadoop streaming中指定自定义的inputformat java类 http://www.linuxidc.com/Linux/2012-04/57830.htm)中写了自定义InputFormat的制作。 可是直接加入后一直得不到想要的结果。 查看源码发现: PipeMapper.java if(!this.ignoreKey){ write(key);
自定义Hadoop的可序列化类
Java原生语言中要想一个自定义类可序列化,很简单,只要让这个类实现java.io.Serializable接口就可以了,但是在Hadoop框架中,要想让自定义类可以被序列化,我们必须手动让其实现WritableCompable接口并且实现write(),readFields(),compareTo()方法。 下面就是一个我们自定义的可序列化的类: /* */ packagecom.c
实现MapReduce多文件自定义输出
普通maprduce中通常是有map和reduce两个阶段,在不做设置的情况下,计算结果会以part-000*输出成多个文件,并且输出的文件数量和reduce数量一样,文件内容格式也不能随心所欲。这样不利于后续结果处理。 在Hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方
荐 Twitter Storm Stream Grouping编写自定义分组实现
自定义Grouping测试 Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。 这是我写的一个自定义分组,总是把数据分到第一个Task: public class MyFirstStreamGrouping implements CustomStreamGrouping { private static Logger
Hadoop Oozie学习笔记 自定义安装和启动
cloudera里面有基于Oozie的安装,但是通过sudo apt-get install oozie的方式.其中你要将cloudera Hadoop的source放入sources.list.由于我升级了最新的Ubuntu.但对应的cloudera hadoop还没有相应的版本.所以添加sources.list无法成功.网上也没什么手动安装的资料,于是我就写点吧.参考来源:http://inc
[置顶] 【Apache Solr系列】使用IKAnalyzer中文分词以及自定义分词字典
之前写的Apache Solr只介绍了简单的搭建以及导入数据等功能,最近由于项目要求,添加索引分词和搜索分词功能;分词的项目有包括好几个:smartcn、ictclas4j、IK、jeasy、庖丁、mmseg4j; 以上几种分词器各有优缺点,根据不同场景可分可定制和不可定制两种: 一种是基于中科院ICTCLAS的隐式马尔科夫HMM算法的中文分词器,如smartcn,ictclas4j,优点是分词准
页面自定义布局
就好象 GOOGLE 自定义主页一样。如果我只有2栏目,左右可以轻松的用 float:left/right 来控制,可是上下的顺序要如何控制好呢???? 问题补充: 推荐使用jQuery来动态生成dom,after是在某个dom之后,before是在之前,append是在之中,三个够你随便玩转dom的排列了 ------------------------------------------
最新教程
更多java线程状态详解(6种)
java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞
redis从库只读设置-redis集群管理
默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave. 127.0.0.1:6382> set k3 111 (error) READONLY You can't write against a read only slave. 如果你要开启从库
Netty环境配置
netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。
Netty基于流的传输处理
在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据
Netty入门实例-使用POJO代替ByteBuf
使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。
Netty入门实例-时间服务器
Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现
Netty入门实例-编写服务器端程序
channelRead()处理程序方法实现如下
Netty开发环境配置
最新版本的Netty 4.x和JDK 1.6及更高版本
电商平台数据库设计
电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表
HttpClient 上传文件
我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。
MongoDB常用命令
查看当前使用的数据库 > db test 切换数据库 > use foobar switched to db foobar 插入文档 > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new
快速了解MongoDB【基本概念与体系结构】
什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。
windows系统安装MongoDB
安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我
Spring boot整合MyBatis-Plus 之二:增删改查
基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId; import com.baomidou.mybatisplus.annotations.TableName; import com.baom
分布式ID生成器【snowflake雪花算法】
基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案