Thursday, December 14, 2017

Partial Dependency Injection with Guice

Little Guicers

I hate Guice. I hate all the DI frameworks, in fact. It's a lot of magic for very little benefit. Java 8 and beyond is perfectly capable of letting you model your system plumbing in pure Java using a pattern like Cake for Scala, or simply using (gasp) the new keyword in a straight-up wiring method that looks a lot like Spring Java config, only more understandable and IDE/debug friendly.

But never mind that. I was recently called upon to do something like "partial" dependency injection using Guice, because I had the unique desire to

  • produce multiple instances of a class C
  • which had a constructor that required several collaborators
  • some of which were known at compile time, and some of which would vary for each instance
  • and these instances needed to be AOP-Alliance-enabled proxies, so that interceptors would work on them

That last bit ruled out writing my own factories by hand which used new, since they would never have the necessary enhancement by Guice.


The Google-verse sort of answered the question by mentioning FactoryModuleBuilder. What the hell is that? Well, the easiest way for me to think about it is in terms of currying in functional languages.

> val f = ( x : int, y : int, z : int ) => x * y + z
f : int * int * int => int

> val curryF = ( x : int ) => ( y : int ) => ( z : int ) => f( x, y, z )
curryF : int => int => int => int

> val someF = curryF( 2 )
someF : int => int => int

> val moreF = someF( 3 )
moreF : int => int

> val answer = moreF( 4 )
answer : int = 10

Factories in Guice

Factories in OO languages are basically curried, partially evaluated constructors. (Builders are like extreme versions of factories where the parameters are named.) In normal instantiation you do new Thing( x, y, z ) and with a factory you do new ThingFactory( x ).make( y, z ). My requirements above meant that I needed Guice to give me a little Guice factory for instances which was like a partially-evaluated injector: I knew x at compile-time and had bound it up with Guice already, but y and z would be unique to each Thing.

The code

Binding a static Thing would look like this:

public class Thing {
    public Thing( X x, Y y, Z z ) { /* ... */ }

public class ThingModule extends AbstractModule {
    public void configure() {
        bind( X.class ).toInstance( x );
        bind( Y.class ).toInstance( y );
        bind( Z.class ).toInstance( z );
        bind( Thing.class );

// Guice-bound Thing 
Guice.createInjector( new ThingModule() ).getInstance( Thing.class );

The factory way looks like this (notice the @Assisted annotations):

public interface ThingFactory {
    Thing make( Y y, Z z );
public class Thing {
    public Thing( X x, @Assisted Y y, @Assisted Z z ) { /* ... */ }

public class ThingModule extends AbstractModule {
    public void configure() {
        bind( X.class ).toInstance( x );
        install( new FactoryModuleBuilder().build( ThingFactory.class ) );

// Guice-bound ThingFactory creates parameterized, yet Guice-bound, Thing
// I guess we're "assisting" Guice by providing y and z on our own, hence the name
Guice.createInjector( new ThingModule() ).getInstance( ThingFactory.class ).make( y, z );

Of course the main way to use this construction is to inject the ThingFactory itself into some other object that needs dynamic Things.

The amazing thing is that if any method of Thing happens to match an interceptor, the resulting instance will be an AOP-alliance proxy and the interceptors will work. A hand-built factory using new, by contrast, would produce plain Thing instances that would be oblivious to interceptors.

Everything old is new again

This problem turns out to be very similar to something I posted about on this very blog almost a decade ago. In that case we needed a method of a Spring-decorated thing to produce another Spring-decorated thing, and used an interceptor on that method to wrap the result with a blindly delegating prototype bean produced by Spring. That technique probably works here too but takes more code.

Monday, September 21, 2015

CombineFileInputFormat in 10 lines

CombineFileInputFormat is a special Hadoop input format that is useful to combine smaller files into larger splits. The built-in file input formats that come with Hadoop normally will split input files larger than the HDFS block size, but they won't combine files that are smaller than it. Hadoop is notorious for scaling poorly as the number of its input files increases, even while holding the total byte count constant. The problem is especially pronounced in Hadoop 2, when we lost the ability to share JVM processes between successive map or reduce tasks.

When you run into a stupid scaling problem because some inconsiderate data provider did not consolidate his thousands of small inputs, it would be nice to be able to throw a small wrapper at the input and move on. But the top Google-indexed blog posts I've found about CombineFileInputFormat give pages and pages of code, which is discouraging. Unless you have a need for fairly sophisticated logic for how to combine things, most of this extra code is not necessary.

Here's ten lines that will build you a combining flavor of any input format, assuming your key, value, and (uncombined) input format are MyKey, MyValue, and MyFormat, respectively:

public class CombinedMyFormat
extends CombineFileInputFormat< MyKey, MyValue > {
    // exists merely to fix the key/value types and
    // inject the delegate format to the superclass
    // if MyFormat does not use state, consider a constant instead
    private static class CombineMyKeyMyValueReaderWrapper
    extends CombineFileRecordReaderWrapper< MyKey, MyValue > {
        protected CombineMyKeyMyValueReaderWrapper(
            CombineFileSplit split, TaskAttemptContext ctx, Integer idx
        ) throws IOException, InterruptedException {
            super( new MyFormat(), split, ctx, idx );

    public RecordReader< MyKey, MyValue > createRecordReader(
        InputSplit split, TaskAttemptContext ctx
    ) throws IOException {
        return new CombineFileRecordReader< MyKey, MyValue >(
            ( CombineFileSplit )split, ctx, CombineMyKeyMyValueReaderWrapper.class
Just drop CombinedMyFormat in place of MyFormat in your job driver, and set the maximum split size to the same as an HDFS block, or whatever number you need to keep the number of splits down to about 1,000 to 10,000 (the ideal number is based on how often you expect a task to fail and be retried; more often means your tasks should cover less data and there should be more of them), and you're done.

Thursday, May 16, 2013

Hadoop + S3 = data loss

Amazon's Simple Storage Service is pretty awesome; you can think of it as a limitless disk drive. It is tempting to use it as a durable storage and publication mechanism for big data projects involving Apache's Hadoop, and in fact Hadoop comes pre-packaged with I/O format libraries for working with S3 as a backing store. Unfortunately, those libraries are rife with assumptions about file-system consistency, a property that S3 quite consciously does not exhibit in the US Standard availability zone. After successfully writing an object into S3, subsequent attempts to read that object are only guaranteed to succeed after some amount of time x passes; and not only is there no actual limit on x, but in fact you might get a hit after y << x followed again by no hit between time y and x. It's like a flourescent light bulb turning on, only sometimes (once every ten thousand object writes or so) it takes about three hours. Or a few weeks. So far, I have encountered the following data-loss scenarios using Hadoop directly against S3 (on Elastic MapReduce, which uses improved S3 I/O libraries from the stock ones):
  1. Closed sockets on reads. S3 closes your read sockets whenever it wants to. As long as your input file is not splittable, you will get MD5 checksum protection, but if it is splittable, and not compressed, as a reader you may not be able to detect that your data has been truncated.
  2. Map files or multiple outputs may not be correctly committed. Map files consist of "folders" each containing a file named "data" and another named "index". On S3, it is possible that one of the files does not appear to the committer in its temporary output location during the task commit operation. This case is less dangerous than the analogous Multiple Outputs case because consumers of a map file with only an index file will fail rather than truncate.
  3. Multi-part uploads. On very rare occasions I have observed multi-part upload (which is enabled by default on recent EMR AMIs) to somehow "miss" one of the parts, leading to data truncation. I have disabled all multi-part uploads for all my jobs using the boolean-valued property fs.s3n.multipart.uploads.enabled for this and other reasons (for example, I use some customized S3 client code for my implementation of the NativeFileSystemStore interface, but multi-part uploads are performed by a private S3 client instance over which I have no control). By doing so I also have to make sure all my objects are significantly smaller than 5GB, and the transfers (in and out) have to report progress to avoid being killed for dead air by the jobtracker, but that is material for another post.
  4. No attempt to commit. Possibly the most insidious bug of all is the needsTaskCommit() implementation in FileOutputCommitter. If, at the time this method is called, the temporary output "folder" in S3 does not appear to the committer, no attempt is made to commit anything. It assumes there was no output, the task succeeds, and no error occurs. Because the US Standard region of S3 makes almost no guarantees at all about LIST after PUT consistency, as I mentioned above, this situation actually arises quite often.

Tuesday, January 03, 2012

Uses of wildcard types in Java

Many of the Java coding questions I get from my colleagues come down to confusion about wildcard types in Java generics. I'll be editing this post on an ongoing basis to catalog all the ways I have productively used these beasts.

Uses of wildcard types

  1. Compile-time prevention of most incremental modifications to a collection. Because the put() and add() methods of most collection abstractions receive an argument of the element type, the compiler will prevent you from even attempting to call these methods on a variable of type Collection< ? extends T > for any type or type parameter T. Unfortunately, you can still call Collection.clear(), and Collection.add( null ), without a compiler error, so the collection is not completely statically protected from modification.
  2. Compile-time enforcement of write-only collections. The converse of the above is that the get() methods don't work on a variable of type Collection< ? super T > unless the receiving type is Object, which is usually the wrong thing to do anyway. However you can happily put() and add() Ts to such a collection (and of course you can call clear()).
  3. Broader applicability of parametric library methods. If you have, for example, a utility forAll() which iterates over a collection of elements of type T (and does not use that collection in any other way), the argument type should be Iterable< ? extends T >, and not, say, List< T >, and you should probably have an Iterator< ? extends T > overload as well. If T were CharSequence, for example, you could call forAll( Arrays.asList( "a", "b", "c" ) ) or forAll( new HashSet< StringBuilder >() ) without needing extra copies of the utility logic, casts, or pointless reference copies. If your method requires a more complex nested type, you should use as many wildcards as are allowed, as in Map< ? extends K, ? extends Iterable< ? extends V > >.
  4. Broader utility of parametric library methods. Continuing the above example: because Java has no easy way to do a higher-order method like map() or filter() on collections, you will likely be writing loops in utility methods. Say we are writing a method that iterates a collection, looking for some property of the elements, and produces a new list with only the elements of the original list that had the property in question. It is better to declare the interface like
    public static < T > List< T > filter( Iterable< ? extends T > it );
    public static < T > List< T > filter( Iterable< T > it );
    Why? Say we have a s of type Set< ? extends Person >. In the latter case, Java will infer the return type of filter( s ) to be List< ? extends Person >, which can't be used directly by some other method needing a List< Person >; not all APIs are considerate enough to follow the protocol in use number 3, above, which leaves them subject to quirk number 3, below. Perhaps surprisingly, in the former case, Java infers a return type of List< Person > for the call filter( s ).
  5. More flexible class implementations. Generic collaborators (such as Comparators) to whom you only supply Ts should be typed using ? super T, and collaborators from whom you only get Ts should be typed using ? extends T. Note that these collaborators might very well be invariant in their type parameters (that is, there may be methods for consuming and producing Ts available); but if you only use one sort or the other, you should still use wildcards to allow covariance or contravariance, whichever is compatible.
  6. Making up for Java's lame type inference. The following innocuous-looking code will not compile as is:
    List< List< String > > llstr = Arrays.asList( new ArrayList< String >() );
    The reason is that the compiler infers the type List< ArrayList< String > > for the right-hand side of the assignment, which is not assignment-compatible with List< List< String > > (see quirk number 2, below). Any of the following edits will compile, however.
    // eschew type inference
    List< List< String > > llstr = Arrays.< List< String > >asList( new ArrayList< String >() );
    // use a cast (horrors!)
    List< List< String > > llstr = Arrays.asList( ( < List< String > > )new ArrayList< String >() );
    // use a wildcard in the variable type
    List< ? extends List< String > > llstr = Arrays.asList( new ArrayList< String >() );

Potential use of wildcard types, if only the damn language would allow it

  1. Anonymous intersection types. Suppose you want to act on a collection of objects that implement the two interfaces I and J (the technique could work for any finite number of interfaces and optionally 1 class). Your method signature could look like
    public void doSomething( List< ? extends I & J > lij ); // won't compile
    except that constraints are only allowed where type parameters are declared (i.e., class C< T extends I & J > {}, interface E< T extends I & J > {}, < T extends I & J > void f() {}. You are thus reduced to giving a name to the intersection type, as in
    public < IJ extends I & J > void doSomething( List< IJ > lij );
    An advantage of using a name instead of a wildcard is that this technique works directly with a single argument of both interfaces:
    public < IJ extends I & J > void doSomething( IJ ij ) {
        // ...
        iConsumer( ij );
        jConsumer( ij );
        // ...
    The same restriction applies to attempting to return an anonymous or private intersection type. There is no way to return something like ? extends I & J without giving that intersection type a name like IJ.

Quirks of wildcard types

  1. You can't construct an instance of a wildcard type using new. You can, however, use a factory method to accomplish the same thing, which makes me wonder why the designers of Java didn't just make new work. Given
    class C< T > {
        public C() {}
        public static < T > C< T > make() {
            return new C< T >();
    the following lines (using type inference) are ok:
    C< ? extends T > ct = C.make();
    C< ? super T > ct = C.make();
    but the following won't compile ("wildcard not allowed at this location"):
    C< ? extends T > ct = new C< ? extends T >();
    C< ? super T > ct = new C< ? super T >();
    C< ? super T > ct = C.< ? super T >make();
  2. Well, you can actually use new for nested wildcard types. As long as the wildcard is not at the top level, it works.
    C< C< ? extends T > > cct = new C< C< ? extends T > >();
  3. Generic types are invariant in their type parameters. In simpler terms, a C< Employee > is not a C< Person >, even if it should be by all rights (for example, if there are no methods consuming the parameter type T in C< T >). The opposite relationship also does not hold, even if it should by all rights. Instead, you get to make either relationship hold at the point where such objects are used, usually by using wildcard types. A C< Employee > is a C< ? extends Person >, and a C< Person > is a C< ? super Employee >.
  4. Wildcard types are supertypes. A C< Person > is a C< ? extends Person > and also a C< ? super Person >. No other relationships hold between the three types. It is for this reason that I advocate interfaces like uses 3 and 4, above; a method declared to expect a List< Person > is just not going to accept a List< ? extends Person > or a List< Employee >, even if it only consumed elements from the input as Persons.
  5. Type inference will not unify separate expressions of wildcard type. Given any two separate expressions, if one or both are of declared wildcard type, they will never be treated by the compiler as having the same type. This property holds even if the two expressions are identical references to a variable, or if one is created from the other by an endo-function. To "unify" the wildcard in two expressions of wildcard type, those expressions have to be declared in a generic method (a "capture-helper") which binds an actual type variable in place of the wildcard. Check out these examples:
    public class WildcardTest1 {
        private < T > void two( Class< T > t1, Class< T > t2 ) {}
        private < T > void one( Class< T > t1 ) {
            two( t1, t1 ); // compiles; no wildcards involved
        private void blah() {
            two( WildcardTest1.class, WildcardTest1.class ); // compiles
            one( WildcardTest1.class );                     // compiles
            Class< ? extends WildcardTest1 > wc = this.getClass();
            two( wc, wc ); // won't compile! (capture#2 and capture#3)
            one( wc );     // compiles
    public class WildcardTest2 {
        interface Thing< T > {
            void consume( T t );
        private < T > Thing< T > make( Class< T > c ) {
            return new Thing< T >() {
                @Override public void consume( T t ) {}
        private < T > void makeAndConsume( Object t, Class< T > c ) {
            make( c ).consume( c.cast( t ) );
        private void blah() {
            Class< ? extends WildcardTest2 > wc = this.getClass();
            make( wc ).consume( wc.cast( this ) ); // won't compile! (capture#2 and capture#3)
            makeAndConsume( this, wc );            // compiles
    Incidentally, the makeAndConsume method does not compile (for the same reason) if you declare its signature as private < T > void makeAndConsume( Object t, Class< ? extends T > c ), which is a counter-example to use #4, above. Capture-helper methods must not have wildcard types in their parameters.
  6. The implicit existential is on the nearest class. What I mean here is that anytime there is a wildcard buried deep inside a parameterized type, such as F< G< H< ? extends T > > >, the existential binds the wildcard as closely as possible; something like F< G< (∃ S extends T) H< S > > >, as opposed to, say, (∃ S extends T) F< G< H< S > > >. In practice, this means that G is heterogeneous; if G were a collection type, for example, its elements could be a mixture of H< S1 > and H< S2 > for two different subtypes S1, S2 of T, and still be type-correct.

Friday, August 05, 2011

Requirements for using a Hadoop combiner

One way to control I/O for large Hadoop jobs, especially those whose mappers produce many records with relatively fewer distinct keys, is to introduce a combiner phase between the mapper and reducer. I have not seen a simple diagram explaining what the types should be for the combiner and what properties the algorithm must exhibit, so here goes. If your mapper extends Mapper< K1, V1, K2, V2 > and your reducer extends Reducer< K2, V2, K3, V3 >, then the combiner must be an extension of Reducer< K2, V2, K2, V2 >, and the following diagram must commute:

The triangle in the center of the diagram represents distributivity of the combiner function (i.e., reduce(k, combine(k, M)) = reduce(k, combine(k, ∪icombine(k,σi(M))) for any partition σ = {σi | i ∈ I} of M), because Hadoop does not guarantee how many times it will combine intermediate outputs, if at all.

A common pattern is to use the same function for combiner and reducer, but for this pattern to work, we must have K2 = K3 and V2 = V3 (and of course, the reducer itself must be distributive).

I should also mention that if you use a grouping comparator for your Reducer that is different from your sorting comparator, the above diagram is not correct. Unfortunately, and I'm pretty sure this is an outright bug in Hadoop, the sorting comparator is always used to group inputs for the Combiner's reduce() calls (see MAPREDUCE-3310).

Thursday, June 16, 2011

Escaping property placeholders in Spring XML config

The problem

You might have encountered the awkward situation in which you are
  1. using Spring and XML config
  2. substituting properties into that config via some PropertyPlaceholderConfigurator
  3. needing to set some value in the config to a literal string of the form "${identifier}"

By default, any string of the form in 3. above is a placeholder, and if you have no value for that placeholder, you get an exception. Spring JIRA issue SPR-4953, which recognizes the fact that there is no simple escaping syntax for placeholders, is still open as of this writing.

A snippet such as the following will cause the exception if there is no value available to substitute for the variable customerName, or actually substitute a value for it if it is available. Neither result is desirable in our scenario; we want the "${customerName}" to remain intact when it is injected into our bean.
<bean id="aTrickyBean" class="org.anic.veggies.AreGoodForYou">
    <constructor-arg name="expression" value="Hello, ${customerName}!"/>

Most workarounds I have seen are unsatisfactory. You can use a customized placeholder configurator that sets its delimiter characters to something other than the default, for example, which would mean you would have to change the look of all the actual (unescaped) placeholders just to support the ones you want escaped.

The workaround

However, in Spring 3.x, you can work around this issue in a much more simple way using the following trick with SpEL:
<bean id="aTrickyBean" class="org.anic.veggies.AreGoodForYou">
    <constructor-arg name="expression" value="#{ 'Hello, $' + '{customerName}!' }"/>
Note that in order for this trick to work it is vital that the '$' and the '{' be physically separated (in this case, on either side of a string concatenation).

Thursday, November 11, 2010

Random notes on Hadoop

I am talking about Hadoop 0.20 using a custom jar, not streaming or Hive or Pig.
  1. Make sure your distribution has the MAPREDUCE-1182 patch.
  2. Make sure you change the default setting of dfs.datanode.max.xcievers to something very large, like 4096. And yes, the property name is misspelled. In 0.22/0.23 the property will be called dfs.datanode.max.transfer.threads.
  3. If you've declared your key type to be T, you can't write an S, even if S is a subclass of T.

  4. There are several ways to get your dependencies visible to Hadoop tasks and tools, and they are all clunky.
    • You can bundle them all into the job jar in a folder called lib, although doing so does not make it possible to have custom input formats, output formats, mappers, and reducers in separate jars.
    • You can use the -libjars argument, but if you ever have to load a class via reflection (i.e., using Class.forName), you have to use the Thread.currentThread().getContextClassLoader() rather than the default. Also you might run into HADOOP-6103.
    • You can use DistributedCache.addFileToClassPath, but you have to be sure to put the file on HDFS and refer to it by its absolute pathname without a scheme or authority, and these files are only available to the tasks, not the tool/job.

  5. DistributedCache is just plain wonky. You must
    1. put your file on hdfs somehow
    2. call DistributedCache.addCacheFile(), using the full URI with "hdfs:///"
    3. in the mapper/reducer, use* APIs to access the files represented by the paths in DistributedCache.getLocalCacheFiles(). Incredibly, the "typical usage" example in the javadocs for DistributedCache just completely elides this crucial bit. If you try to use FileSystem.get().open(), you'll get a cryptic error message with a filename that looks like it's been mangled.
      I can't find a programmatic mapping between files added via addCacheFile() and paths retrieved by getLocalCacheFiles(), although there may be some support for sharing the name or specifying it with the "hdfs://source#target" syntax. None of this API is well-documented.
  6. You can't substitute Hadoop counters for actual aggregation in your reducer, tempting as that might be. Counters will differ from run to run, even against identical inputs, because of things that vary like speculative execution and task failures.
  7. If you configure your cluster to have 0 reduce slots (perhaps because your jobs are all map-only), and you accidentally submit a job that does require a reduce phase, that job will run all mappers to completion and then hang forever.