Monday, September 21, 2015

CombineFileInputFormat in 10 lines

CombineFileInputFormat is a special Hadoop input format that is useful to combine smaller files into larger splits. The built-in file input formats that come with Hadoop normally will split input files larger than the HDFS block size, but they won't combine files that are smaller than it. Hadoop is notorious for scaling poorly as the number of its input files increases, even while holding the total byte count constant. The problem is especially pronounced in Hadoop 2, when we lost the ability to share JVM processes between successive map or reduce tasks.

When you run into a stupid scaling problem because some inconsiderate data provider did not consolidate his thousands of small inputs, it would be nice to be able to throw a small wrapper at the input and move on. But the top Google-indexed blog posts I've found about CombineFileInputFormat give pages and pages of code, which is discouraging. Unless you have a need for fairly sophisticated logic for how to combine things, most of this extra code is not necessary.

Here's ten lines that will build you a combining flavor of any input format, assuming your key, value, and (uncombined) input format are MyKey, MyValue, and MyFormat, respectively:

public class CombinedMyFormat
extends CombineFileInputFormat< MyKey, MyValue > {
    // exists merely to fix the key/value types and
    // inject the delegate format to the superclass
    // if MyFormat does not use state, consider a constant instead
    private static class CombineMyKeyMyValueReaderWrapper
    extends CombineFileRecordReaderWrapper< MyKey, MyValue > {
        protected CombineMyKeyMyValueReaderWrapper(
            CombineFileSplit split, TaskAttemptContext ctx, Integer idx
        ) throws IOException, InterruptedException {
            super( new MyFormat(), split, ctx, idx );
        }
    }

    @Override
    public RecordReader< MyKey, MyValue > createRecordReader(
        InputSplit split, TaskAttemptContext ctx
    ) throws IOException {
        return new CombineFileRecordReader< MyKey, MyValue >(
            ( CombineFileSplit )split, ctx, CombineMyKeyMyValueReaderWrapper.class
        );
    }
}
Just drop CombinedMyFormat in place of MyFormat in your job driver, and set the maximum split size to the same as an HDFS block, or whatever number you need to keep the number of splits down to about 1,000 to 10,000 (the ideal number is based on how often you expect a task to fail and be retried; more often means your tasks should cover less data and there should be more of them), and you're done.

No comments: