必威体育Betway必威体育官网
当前位置:首页 > IT技术

FileInputFormat详解

时间:2019-06-04 14:41:04来源:IT技术作者:seo实验室小编阅读:70次「手机版」
 

fileinputformat

转载:http://blog.csdn.net/hellozpc/article/details/45771933 https://my.oschina.net/leejun2005/blog/133424

1. 概述

我们在设置mapreduce输入格式的时候,会调用这样一条语句:

job.setInputFormatClass(KeyValueTextInputFormat.class);

这条语句保证了输入文件会按照我们预设的格式被读取。KeyValueTextInputFormat即为我们设定的数据读取格式。

所有的输入格式类都继承自InputFormat,这是一个抽象类。其子类有例如专门用于读取普通文件的fileinputformat,还有用来读取数据库的DBInputFormat等等。相关类图简单画出如下:

2. InputFormat

从InputFormat类图看,InputFormat抽象类仅有两个抽象方法:

 List<InputSplit> getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
    RecordReader<K,V> createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

在后面说到InputSplits的时候,会介绍在getSplits()时需要验证输入文件是否可分割、文件存储时分块的大小和文件大小等因素,所以总体来说,通过InputFormat,Mapreduce框架可以做到:

  验证作业输入的正确性
    将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask
    提供RecordReader实现,读取InputSplit中的“K-V对”供MAPPer使用

InputFormat抽象类源码也很简单,如下供参考(文章格式考虑,删除了部分注释,添加了部分中文注释):

public abstract class InputFormat<K, V> {

    /**
     * 仅仅是逻辑分片,并没有物理分片,所以每一个分片类似于这样一个元组 <input-file-path, start, offset>
     */
    public abstract List<InputSplit> getSplits(Jobcontext context)
            throws IOException, InterruptedException;

    /**
     * Create a record reader for a given split.
     */
    public abstract RecordReader<K, V> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException,
            InterruptedException;

}

不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片会被单独的map task作为数据源。下面详细介绍输入分片(inputSplit)是什么。

3.InputSplit

Mappers的输入是一个一个的输入分片,称InputSplit。看源码可知,InputSplit也是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

public abstract class InputSplit {
      /**
       * 获取Split的大小,支持根据size对InputSplit排序.
       */
      public abstract long getLength() throws IOException, InterruptedException;

      /**
       * 获取存储该分片的数据所在的节点位置.
       */
      public abstract 
        String[] getLocations() throws IOException, InterruptedException;
}

下面深入看一个InputSplit的子类:FileSplit类

public class FileSplit extends InputSplit implements Writable {
    private Path file;
    private long start;
    private long length;
    private String[] hosts;

    /**
     * Constructs a split with host information
     * 
     * @param file
     *            the file name
     * @param start
     *            the position of the first byte in the file to process
     * @param length
     *            the number of bytes in the file to process
     * @param hosts
     *            the list of hosts containing the block, possibly null
     */
    public FileSplit(Path file, long start, long length, String[] hosts) {
        this.file = file;
        this.start = start;
        this.length = length;
        this.hosts = hosts;
    }

    /** The number of bytes in the file to process. */
    @Override
    public long getLength() {
        return length;
    }

    @Override
    public String[] getLocations() throws IOException {
        if (this.hosts == null) {
            return new String[] {};
        } else {
            return this.hosts;
        }
    }

    // 略掉部分方法
}

从源码中可以看出,FileSplit有四个属性:文件路径,分片起始位置,分片长度和存储分片的hosts。用这四项数据,就可以计算出提供给每个Mapper的分片数据。在InputFormat的getSplit()方法中构造分片,分片的四个属性会通过调用FileSplit的constructor设置。

再看一个InputSplit的子类:CombineFileSplit。源码如下:

public class CombineFileSplit extends InputSplit implements Writable {

    private Path[] paths;
    private long[] startoffset;
    private long[] lengths;
    private String[] locations;
    private long totLength;

    public CombineFileSplit(Path[] files, long[] start, long[] lengths,
            String[] locations) {
        initSplit(files, start, lengths, locations);
    }

    private void initSplit(Path[] files, long[] start, long[] lengths,
            String[] locations) {
        this.startoffset = start;
        this.lengths = lengths;
        this.paths = files;
        this.totLength = 0;
        this.locations = locations;
        for (long length : lengths) {
            totLength += length;
        }
    }

    public long getLength() {
        return totLength;
    }

    /** Returns all the Paths where this input-split resides */
    public String[] getLocations() throws IOException {
        return locations;
    }

    //省略了部分构造函数和方法,深入学习请阅读源文件
}

为什么介绍该类呢,因为接下来要学习《Hadoop学习(五) – 小文件处理》,深入理解该类,将有助于该节学习。

上面我们介绍的FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!

CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。

需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。

现在,我们已深入的了解了InputSplit的概念,看了其源码,知道了其属性。我们知道数据分片是在InputFormat中实现的,接下来,我们就深入InputFormat的一个子类,FileInputFormat看看分片是如何进行的。

4. FileInputFormat

FileInputFormat中,分片方法代码及详细注释如下,就不再详细解释该方法:

public List<InputSplit> getSplits(JobContext job) throws IOException {
    // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小。
    // 由源码可知,这两个值可以通过mapred.min.split.size和mapred.max.split.size来设置
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // splits链表用来存储计算得到的输入分片结果
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // files链表存储由listStatus()获取的输入文件列表,listStatus比较特殊,我们在下面详细研究
    List<FileStatus> files = listStatus(job);
    for (FileStatus file : files) {
        Path path = file.getPath();
        FileSystem fs = path.getFileSystem(job.getconfiguration());
        long length = file.getLen();
        // 获取该文件所有的block信息列表[hostname, offset, length]
        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
                length);
        // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割
        // 是否分割可以自行重写FileInputFormat的isSplitable来控制
        if ((length != 0) && isSplitable(job, path)) {
            long blockSize = file.getBlockSize();
            // 计算分片大小
            // 即 Math.max(minSize, Math.min(maxSize, blockSize));
            // 也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize
            long splitSize = computeSplitSize(blockSize, minSize, maxSize);

            long bytesRemaining = length;
            // 循环分片。
            // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片
            while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length
                        - bytesRemaining);
                splits.add(new FileSplit(path, length - bytesRemaining,
                        splitSize, blkLocations[blkIndex].getHosts()));
                bytesRemaining -= splitSize;
            }
            // 处理余下的数据
            if (bytesRemaining != 0) {
                splits.add(new FileSplit(path, length - bytesRemaining,
                        bytesRemaining,
                        blkLocations[blkLocations.length - 1].getHosts()));
            }
        } else if (length != 0) {
            // 不可split,整块返回
            splits.add(new FileSplit(path, 0, length, blkLocations[0]
                    .getHosts()));
        } else {
            // 对于长度为0的文件,创建空Hosts列表,返回
            splits.add(new FileSplit(path, 0, length, new String[0]));
        }
    }

    // 设置输入文件数量
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    return splits;
}

在getSplits()方法中,我们提到了一个方法,listStatus(),我们先来看一下这个方法:

protected List<FileStatus> listStatus(JobContext job) throws IOException {

    // 省略部分代码...

    List<PathFilter> filters = new ArrayList<PathFilter>();
    filters.add(hiddenFileFilter);
    PathFilter jobFilter = getInputPathFilter(job);
    if (jobFilter != null) {
        filters.add(jobFilter);
    }
    // 创建了一个MultiPathFilter,其内部包含了两个PathFilter
    // 一个为过滤隐藏文件的Filter,一个为用户自定义Filter(如果制定了)
    PathFilter inputFilter = new MultiPathFilter(filters);

    for (int i = 0; i < dirs.length; ++i) {
        Path p = dirs[i];
        FileSystem fs = p.getFileSystem(job.getConfiguration());
        FileStatus[] matches = fs.globStatus(p, inputFilter);
        if (matches == null) {
            ERRORs.add(new IOException("Input path does not exist: " + p));
        } else if (matches.length == 0) {
            errors.add(new IOException("Input Pattern " + p
                    + " matches 0 files"));
        } else {
            for (FileStatus globStat : matches) {
                if (globStat.isDir()) {
                    for (FileStatus stat : fs.listStatus(
                            globStat.getPath(), inputFilter)) {
                        result.add(stat);
                    }
                } else {
                    result.add(globStat);
                }
            }
        }
    }

    // 省略部分代码
}

NLineInputFormat是一个很有意思的FileInputFormat的子类,有时间可以了解一下。

5. PathFilter

PathFilter文件筛选器接口,使用它我们可以控制哪些文件要作为输入,哪些不作为输入。PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false。可以通过设置mapred.input.pathFilter.class来设置用户自定义的PathFilter。

public interface PathFilter {
  /**
   * Tests whether or not the specified abstract pathname should be
   * included in a pathname list.
   *
   * @param  path  The abstract pathname to be tested
   * @return  <code>true</code> if and only if <code>pathname</code>
   *          should be included
   */
  boolean accept(Path path);
}

FileInputFormat类有hiddenFileFilter属性:

private static final PathFilter hiddenFileFilter = new PathFilter() {
    public boolean accept(Path p) {
        String name = p.getName();
        return !name.startsWith("_") && !name.startsWith(".");
    }
};

hiddenFileFilter过滤掉隐藏文件。

FileInputFormat类还有一个内部类:

private static class MultiPathFilter implements PathFilter {
    private List<PathFilter> filters;

    public MultiPathFilter(List<PathFilter> filters) {
        this.filters = filters;
    }

    public boolean accept(Path path) {
        for (PathFilter filter : filters) {
            if (!filter.accept(path)) {
                return false;
            }
        }
        return true;
    }
}

MultiPathFilter类类似于一个PathFilter代理,其内部有一个PathFilter list属性,只有符合其内部所有filter的路径,才被作为输入。在FileInputFormat类中,它被listStatus()方法调用,而listStatus()又被getSplits()方法调用来获取输入文件,也即实现了在获取输入分片前进行文件过滤。

至此,我们已经利用PathFilter过滤了文件,利用FileInputFormat 的getSplits方法,计算出了Mapreduce的Map的InputSplit。作业的输入分片有了,而这些分片,是怎么被Map读取的呢?

这由InputFormat中的另一个方法createRecordReader()来负责。FileInputFormat没有对于这个方法的实现,而是交给子类自行去实现它。

6. RecordReader

RecordReader将读入到Map的数据拆分成

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

    /**
     * 由一个InputSplit初始化
     */
    public abstract void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException;

    /**
     * 顾名思义,读取分片下一个<key, value>对
     */
    public abstract boolean nextKeyValue() throws IOException,
            InterruptedException;

    /**
     * Get the current key
     */
    public abstract KEYIN getCurrentKey() throws IOException,
            InterruptedException;

    /**
     * Get the current value.
     */
    public abstract VALUEIN getCurrentValue() throws IOException,
            InterruptedException;

    /**
     * 跟踪读取分片的进度
     */
    public abstract float getprogress() throws IOException,
            InterruptedException;

    /**
     * Close the record reader.
     */
    public abstract void close() throws IOException;
}

从源码可以看出,一个RecordReader主要来完成这几项功能。接下来,通过一个具体的RecordReader实现类,来详细了解一下各功能的具体操作。

public class LineRecordReader extends RecordReader<LongWritable, Text> {
    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private LongWritable key = null;
    private Text value = null;

    // initialize函数即对LineRecordReader的一个初始化
    // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等
    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
            throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);

        // 打开文件,并定位到分片读取的起始位置
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        boolean skipFirstLine = false;
        if (codec != null) {
            // 文件是压缩文件的话,直接打开文件
            in = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            //
            if (start != 0) {
                skipFirstLine = true;
                --start;
                // 定位到偏移位置,下次读取就会从便宜位置开始
                fileIn.seek(start);
            }
            in = new LineReader(fileIn, job);
        }
        if (skipFirstLine) { // skip first line and re-establish "start".
            start += in.readLine(new Text(), 0,
                    (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);// key即为偏移量
        if (value == null) {
            value = new Text();
        }
        int newSize = 0;
        while (pos < end) {
            newSize = in.readLine(value, maxLineLength,
                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
                            maxLineLength));
            // 读取的数据长度为0,则说明已读完
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            // 读取的数据长度小于最大行长度,也说明已读取完毕
            if (newSize < maxLineLength) {
                break;
            }
            // 执行到此处,说明该行数据没读完,继续读入
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
    // 省略了部分方法
}

数据从InputSplit分片中读出已经解决,但是RecordReader是如何被Mapreduce框架利用的呢?我们先看一下Mapper类

7. Mapper

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

    public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context(Configuration conf, TaskAttemptID taskid,
                RecordReader<KEYIN, VALUEIN> reader,
                RecordWriter<KEYOUT, VALUEOUT> writer,
                OutputCommitter committer, StatusReporter reporter,
                InputSplit split) throws IOException, InterruptedException {
            super(conf, taskid, reader, writer, committer, reporter, split);
        }
    }

    /**
     * 预处理,仅在map task启动时运行一次
     */
    protected void setup(Context context) throws IOException,
            InterruptedException {
    }

    /**
     * 对于InputSplit中的每一对<key, value>都会运行一次
     */
    @SuppressWarnings("unchecked")
    protected void map(KEYIN key, VALUEIN value, Context context)
            throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }

    /**
     * 扫尾工作,比如关闭流等
     */
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
    }

    /**
     * map task的驱动器
     */
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
        cleanup(context);
    }
}

重点看一下Mapper.class中的run()方法,它相当于map task的驱动。

run()方法首先调用setup()进行初始操作

然后循环对每个从context.nextKeyValue()获取的“K-V对”调用map()函数进行处理

最后调用cleanup()做最后的处理

事实上,content.nextKeyValue()就是使用了相应的RecordReader来获取“K-V对”。Mapper.class中的Context类,它继承自MapContext类,使用一个RecordReader进行构造。下面我们再看这个MapContext。

public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
        TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    private RecordReader<KEYIN, VALUEIN> reader;
    private InputSplit split;

    public MapContext(Configuration conf, TaskAttemptID taskid,
            RecordReader<KEYIN, VALUEIN> reader,
            RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer,
            StatusReporter reporter, InputSplit split) {
        super(conf, taskid, writer, committer, reporter);
        this.reader = reader;
        this.split = split;
    }

    /**
     * Get the input split for this map.
     */
    public InputSplit getInputSplit() {
        return split;
    }

    @Override
    public KEYIN getCurrentKey() throws IOException, InterruptedException {
        return reader.getCurrentKey();
    }

    @Override
    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
        return reader.getCurrentValue();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        return reader.nextKeyValue();
    }

}

从MapContent类中的方法可见,content.getCurrentKey(),content.getCurrentValue()以及nextKeyValue(),其实都是对RecordReader方法的封装,即MapContext是直接使用传入的RecordReader来对InputSplit进行“K-V对”读取的。

至此,我们已经清楚的知道Mapreduce的输入文件是如何被过滤、读取、分片、读出“K-V对”,然后交给Mapper类来处理的。

自定义 hadoop MapReduce InputFormat 切分输入文件

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduce 一次读取一个 cookieId 下的所有记录,然后再按 time 进行切分 session,逻辑伪码如下:

for OneSplit in MyInputFormat.getSplit() // OneSplit 是某个 cookieId 下的所有记录

    for session in OneSplit // session 是按 time 把 OneSplit 进行了二次分割

        for line in session // line 是 session 中的每条记录,对应原始日志的某条记录

1、原理:

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?

InputFormat其实是一个接口,包含了两个方法:


public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  RecordReader<K, V> createRecordReader(InputSplit split, 

                                  TaskAttemptContext context)  throws IOException;
}

这两个方法有分别完成着以下工作:方法 getSplits 将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M方法 方法 getRecordReader 将每个 split 解析成records, 再依次将record解析成K,V对

也就是说 InputFormat完成以下工作:

InputFile –> splits –> K,V

系统常用的 InputFormat 又有哪些呢?

这里写图片描述

其中Text InputFormat便是最常用的,它的

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader (interface), Line RecordReader(class)的关系
      FileInputFormat implements  InputFormat
      TextInputFormat extends  FileInputFormat
      TextInputFormat.get RecordReader calls  Line RecordReader
      Line RecordReader  implements  RecordReader

对于InputFormat接口,上面已经有详细的描述

再看看 FileInputFormat,它实现了 InputFormat接口中的 getSplits方法,而将 getRecordReader与isSplitable留给具体类(如 TextInputFormat )实现, isSplitable方法通常不用修改,所以只需要在自定义的 InputFormat中实现

getRecordReader方法即可,而该方法的核心是调用 Line RecordReader(即由LineRecorderReader类来实现 ” 将每个s plit解析成records, 再依次将record解析成

  public interface RecordReader<K, V> {
  boolean   next(K key, V value) throws IOException;
  K   createKey();
  V   createValue();
  long   getPos() throws IOException;
  public void   close() throws IOException;
  float   getProgress() throws IOException;
}

因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

2、代码:

package MyInputFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {

    @SuppressWarnings("deprecation")
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) {
        return new TrackRecordReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        CompressionCodec codec = new CompressionCodecFactory(
                context.getConfiguration()).getCodec(file);
        return codec == null;
    }

}
package MyInputFormat;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * Treats keys as offset in file and value as line.
 * 
 * @deprecated Use
 *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
 *             instead.
 */
public class TrackRecordReader extends RecordReader<LongWritable, Text> {
    private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);

    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private NewLineReader in;
    private int maxLineLength;
    private LongWritable key = null;
    private Text value = null;
    // ----------------------
    // 行分隔符,即一条记录的分隔符
    private byte[] separator = "END\n".getBytes();

    // --------------------

    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
            throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);

        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        boolean skipFirstLine = false;
        if (codec != null) {
            in = new NewLineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            if (start != 0) {
                skipFirstLine = true;
                this.start -= separator.length;//
                // --start;
                fileIn.seek(start);
            }
            in = new NewLineReader(fileIn, job);
        }
        if (skipFirstLine) { // skip first line and re-establish "start".
            start += in.readLine(new Text(), 0,
                    (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
            value = new Text();
        }
        int newSize = 0;
        while (pos < end) {
            newSize = in.readLine(value, maxLineLength,
                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
                            maxLineLength));
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            if (newSize < maxLineLength) {
                break;
            }

            LOG.info("Skipped line of size " + newSize + " at pos "
                    + (pos - newSize));
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }

    @Override
    public LongWritable getCurrentKey() {
        return key;
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }

    /**
     * Get the progress within the split
     */
    public float getProgress() {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float) (end - start));
        }
    }

    public synchronized void close() throws IOException {
        if (in != null) {
            in.close();
        }
    }

    public class NewLineReader {
        private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
        private int bufferSize = DEFAULT_BUFFER_SIZE;
        private InputStream in;
        private byte[] buffer;
        private int bufferLength = 0;
        private int bufferPosn = 0;

        public NewLineReader(InputStream in) {
            this(in, DEFAULT_BUFFER_SIZE);
        }

        public NewLineReader(InputStream in, int bufferSize) {
            this.in = in;
            this.bufferSize = bufferSize;
            this.buffer = new byte[this.bufferSize];
        }

        public NewLineReader(InputStream in, Configuration conf)
                throws IOException {
            this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
        }

        public void close() throws IOException {
            in.close();
        }

        public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
                throws IOException {
            str.clear();
            Text record = new Text();
            int txtLength = 0;
            long bytesConsumed = 0L;
            boolean newline = false;
            int sepPosn = 0;
            do {
                // 已经读到buffer的末尾了,读下一个buffer
                if (this.bufferPosn >= this.bufferLength) {
                    bufferPosn = 0;
                    bufferLength = in.read(buffer);
                    // 读到文件末尾了,则跳出,进行下一个文件的读取
                    if (bufferLength <= 0) {
                        break;
                    }
                }
                int startPosn = this.bufferPosn;
                for (; bufferPosn < bufferLength; bufferPosn++) {
                    // 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
                    if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {
                        sepPosn = 0;
                    }
                    // 遇到行分隔符的第一个字符
                    if (buffer[bufferPosn] == separator[sepPosn]) {
                        bufferPosn++;
                        int i = 0;
                        // 判断接下来的字符是否也是行分隔符中的字符
                        for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {
                            // buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
                            if (bufferPosn + i >= bufferLength) {
                                bufferPosn += i - 1;
                                break;
                            }
                            // 一旦其中有一个字符不相同,就判定为不是分隔符
                            if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {
                                sepPosn = 0;
                                break;
                            }
                        }
                        // 的确遇到了行分隔符
                        if (sepPosn == separator.length) {
                            bufferPosn += i;
                            newline = true;
                            sepPosn = 0;
                            break;
                        }
                    }
                }
                int readLength = this.bufferPosn - startPosn;
                bytesConsumed += readLength;
                // 行分隔符不放入块中
                if (readLength > maxLineLength - txtLength) {
                    readLength = maxLineLength - txtLength;
                }
                if (readLength > 0) {
                    record.append(this.buffer, startPosn, readLength);
                    txtLength += readLength;
                    // 去掉记录的分隔符
                    if (newline) {
                        str.set(record.getBytes(), 0, record.getLength()
                                - separator.length);
                    }
                }
            } while (!newline && (bytesConsumed < maxBytesToConsume));
            if (bytesConsumed > (long) Integer.MAX_VALUE) {
                throw new IOException("Too many bytes before newline: "
                        + bytesConsumed);
            }

            return (int) bytesConsumed;
        }

        public int readLine(Text str, int maxLineLength) throws IOException {
            return readLine(str, maxLineLength, Integer.MAX_VALUE);
        }

        public int readLine(Text str) throws IOException {
            return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
        }
    }
}
package MyInputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TestMyInputFormat {

    public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException,
                InterruptedException {
            System.out.println("key:\t " + key);
            System.out.println("value:\t " + value);
            System.out.println("-------------------------");
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassnotfoundException {
        Configuration conf = new Configuration();
         Path outPath = new Path("/hive/11");
         FileSystem.get(conf).delete(outPath, true);
        Job job = new Job(conf, "TestMyInputFormat");
        job.setInputFormatClass(TrackInputFormat.class);
        job.setJarByClass(TestMyInputFormat.class);
        job.setMapperClass(TestMyInputFormat.MapperClass.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);

        System.exit(job.waitforCompletion(true) ? 0 : 1);
    }
}

3、测试数据:

cookieId   time   url    cookieOverFlag

1       a   1_hao123

1       a   1_baidu

1       b   1_Google   2END

2       c   2_google

2       c   2_hao123

2       c   2_google   1END

3       a   3_baidu

3       a   3_sougou

3       b   3_soso     2END

4、结果:

key:     0
value:   1  a   1_hao123    
1   a    1_baidu    
1   b    1_google   2
-------------------------
key:     47
value:   2  c    2_google   
2   c    2_hao123   
2   c    2_google   1
-------------------------
key:     96
value:   3  a    3_baidu    
3   a    3_sougou   
3   b    3_soso 2
-------------------------

相关阅读

分享到:

栏目导航

推荐阅读

热门阅读