Thursday, May 16, 2013

Hadoop + S3 = data loss

Amazon's Simple Storage Service is pretty awesome; you can think of it as a limitless disk drive. It is tempting to use it as a durable storage and publication mechanism for big data projects involving Apache's Hadoop, and in fact Hadoop comes pre-packaged with I/O format libraries for working with S3 as a backing store. Unfortunately, those libraries are rife with assumptions about file-system consistency, a property that S3 quite consciously does not exhibit in the US Standard availability zone. After successfully writing an object into S3, subsequent attempts to read that object are only guaranteed to succeed after some amount of time x passes; and not only is there no actual limit on x, but in fact you might get a hit after y << x followed again by no hit between time y and x. It's like a flourescent light bulb turning on, only sometimes (once every ten thousand object writes or so) it takes about three hours. Or a few weeks. So far, I have encountered the following data-loss scenarios using Hadoop directly against S3 (on Elastic MapReduce, which uses improved S3 I/O libraries from the stock ones):
  1. Closed sockets on reads. S3 closes your read sockets whenever it wants to. As long as your input file is not splittable, you will get MD5 checksum protection, but if it is splittable, and not compressed, as a reader you may not be able to detect that your data has been truncated.
  2. Map files or multiple outputs may not be correctly committed. Map files consist of "folders" each containing a file named "data" and another named "index". On S3, it is possible that one of the files does not appear to the committer in its temporary output location during the task commit operation. This case is less dangerous than the analogous Multiple Outputs case because consumers of a map file with only an index file will fail rather than truncate.
  3. Multi-part uploads. On very rare occasions I have observed multi-part upload (which is enabled by default on recent EMR AMIs) to somehow "miss" one of the parts, leading to data truncation. I have disabled all multi-part uploads for all my jobs using the boolean-valued property fs.s3n.multipart.uploads.enabled for this and other reasons (for example, I use some customized S3 client code for my implementation of the NativeFileSystemStore interface, but multi-part uploads are performed by a private S3 client instance over which I have no control). By doing so I also have to make sure all my objects are significantly smaller than 5GB, and the transfers (in and out) have to report progress to avoid being killed for dead air by the jobtracker, but that is material for another post.
  4. No attempt to commit. Possibly the most insidious bug of all is the needsTaskCommit() implementation in FileOutputCommitter. If, at the time this method is called, the temporary output "folder" in S3 does not appear to the committer, no attempt is made to commit anything. It assumes there was no output, the task succeeds, and no error occurs. Because the US Standard region of S3 makes almost no guarantees at all about LIST after PUT consistency, as I mentioned above, this situation actually arises quite often.

Tuesday, January 03, 2012

Uses of wildcard types in Java

Many of the Java coding questions I get from my colleagues come down to confusion about wildcard types in Java generics. I'll be editing this post on an ongoing basis to catalog all the ways I have productively used these beasts.

Uses of wildcard types

  1. Compile-time prevention of most incremental modifications to a collection. Because the put() and add() methods of most collection abstractions receive an argument of the element type, the compiler will prevent you from even attempting to call these methods on a variable of type Collection< ? extends T > for any type or type parameter T. Unfortunately, you can still call Collection.clear(), and Collection.add( null ), without a compiler error, so the collection is not completely statically protected from modification.
  2. Compile-time enforcement of write-only collections. The converse of the above is that the get() methods don't work on a variable of type Collection< ? super T > unless the receiving type is Object, which is usually the wrong thing to do anyway. However you can happily put() and add() Ts to such a collection (and of course you can call clear()).
  3. Broader applicability of parametric library methods. If you have, for example, a utility forAll() which iterates over a collection of elements of type T (and does not use that collection in any other way), the argument type should be Iterable< ? extends T >, and not, say, List< T >, and you should probably have an Iterator< ? extends T > overload as well. If T were CharSequence, for example, you could call forAll( Arrays.asList( "a", "b", "c" ) ) or forAll( new HashSet< StringBuilder >() ) without needing extra copies of the utility logic, casts, or pointless reference copies. If your method requires a more complex nested type, you should use as many wildcards as are allowed, as in Map< ? extends K, ? extends Iterable< ? extends V > >.
  4. Broader utility of parametric library methods. Continuing the above example: because Java has no easy way to do a higher-order method like map() or filter() on collections, you will likely be writing loops in utility methods. Say we are writing a method that iterates a collection, looking for some property of the elements, and produces a new list with only the elements of the original list that had the property in question. It is better to declare the interface like
    public static < T > List< T > filter( Iterable< ? extends T > it );
    than
    public static < T > List< T > filter( Iterable< T > it );
    Why? Say we have a s of type Set< ? extends Person >. In the latter case, Java will infer the return type of filter( s ) to be List< ? extends Person >, which can't be used directly by some other method needing a List< Person >; not all APIs are considerate enough to follow the protocol in use number 3, above, which leaves them subject to quirk number 3, below. Perhaps surprisingly, in the former case, Java infers a return type of List< Person > for the call filter( s ).
  5. More flexible class implementations. Generic collaborators (such as Comparators) to whom you only supply Ts should be typed using ? super T, and collaborators from whom you only get Ts should be typed using ? extends T. Note that these collaborators might very well be invariant in their type parameters (that is, there may be methods for consuming and producing Ts available); but if you only use one sort or the other, you should still use wildcards to allow covariance or contravariance, whichever is compatible.
  6. Making up for Java's lame type inference. The following innocuous-looking code will not compile as is:
    List< List< String > > llstr = Arrays.asList( new ArrayList< String >() );
    
    The reason is that the compiler infers the type List< ArrayList< String > > for the right-hand side of the assignment, which is not assignment-compatible with List< List< String > > (see quirk number 2, below). Any of the following edits will compile, however.
    // eschew type inference
    List< List< String > > llstr = Arrays.< List< String > >asList( new ArrayList< String >() );
    // use a cast (horrors!)
    List< List< String > > llstr = Arrays.asList( ( < List< String > > )new ArrayList< String >() );
    // use a wildcard in the variable type
    List< ? extends List< String > > llstr = Arrays.asList( new ArrayList< String >() );
    

Potential use of wildcard types, if only the damn language would allow it

  1. Anonymous intersection types. Suppose you want to act on a collection of objects that implement the two interfaces I and J (the technique could work for any finite number of interfaces and optionally 1 class). Your method signature could look like
    public void doSomething( List< ? extends I & J > lij ); // won't compile
    
    except that constraints are only allowed where type parameters are declared (i.e., class C< T extends I & J > {}, interface E< T extends I & J > {}, < T extends I & J > void f() {}. You are thus reduced to giving a name to the intersection type, as in
    public < IJ extends I & J > void doSomething( List< IJ > lij );
    
    An advantage of using a name instead of a wildcard is that this technique works directly with a single argument of both interfaces:
    public < IJ extends I & J > void doSomething( IJ ij ) {
        // ...
        ij.iMethod();
        ij.jMethod();
        iConsumer( ij );
        jConsumer( ij );
        // ...
    }
    
    The same restriction applies to attempting to return an anonymous or private intersection type. There is no way to return something like ? extends I & J without giving that intersection type a name like IJ.

Quirks of wildcard types

  1. You can't construct an instance of a wildcard type using new. You can, however, use a factory method to accomplish the same thing, which makes me wonder why the designers of Java didn't just make new work. Given
    class C< T > {
        public C() {}
        public static < T > C< T > make() {
            return new C< T >();
        }
    }
    
    the following lines (using type inference) are ok:
    C< ? extends T > ct = C.make();
    C< ? super T > ct = C.make();
    but the following won't compile ("wildcard not allowed at this location"):
    C< ? extends T > ct = new C< ? extends T >();
    C< ? super T > ct = new C< ? super T >();
    C< ? super T > ct = C.< ? super T >make();
  2. Well, you can actually use new for nested wildcard types. As long as the wildcard is not at the top level, it works.
    C< C< ? extends T > > cct = new C< C< ? extends T > >();
  3. Generic types are invariant in their type parameters. In simpler terms, a C< Employee > is not a C< Person >, even if it should be by all rights (for example, if there are no methods consuming the parameter type T in C< T >). The opposite relationship also does not hold, even if it should by all rights. Instead, you get to make either relationship hold at the point where such objects are used, usually by using wildcard types. A C< Employee > is a C< ? extends Person >, and a C< Person > is a C< ? super Employee >.
  4. Wildcard types are supertypes. A C< Person > is a C< ? extends Person > and also a C< ? super Person >. No other relationships hold between the three types. It is for this reason that I advocate interfaces like uses 3 and 4, above; a method declared to expect a List< Person > is just not going to accept a List< ? extends Person > or a List< Employee >, even if it only consumed elements from the input as Persons.
  5. Type inference will not unify separate expressions of wildcard type. Given any two separate expressions, if one or both are of declared wildcard type, they will never be treated by the compiler as having the same type. This property holds even if the two expressions are identical references to a variable, or if one is created from the other by an endo-function. To "unify" the wildcard in two expressions of wildcard type, those expressions have to be declared in a generic method (a "capture-helper") which binds an actual type variable in place of the wildcard. Check out these examples:
    public class WildcardTest1 {
        private < T > void two( Class< T > t1, Class< T > t2 ) {}
        private < T > void one( Class< T > t1 ) {
            two( t1, t1 ); // compiles; no wildcards involved
        }
        private void blah() {
            two( WildcardTest1.class, WildcardTest1.class ); // compiles
            one( WildcardTest1.class );                     // compiles
    
            Class< ? extends WildcardTest1 > wc = this.getClass();
            two( wc, wc ); // won't compile! (capture#2 and capture#3)
            one( wc );     // compiles
        }
    }
    public class WildcardTest2 {
        interface Thing< T > {
            void consume( T t );
        }
        private < T > Thing< T > make( Class< T > c ) {
            return new Thing< T >() {
                @Override public void consume( T t ) {}
            };
        }
        private < T > void makeAndConsume( Object t, Class< T > c ) {
            make( c ).consume( c.cast( t ) );
        }
    
        private void blah() {
            Class< ? extends WildcardTest2 > wc = this.getClass();
            make( wc ).consume( wc.cast( this ) ); // won't compile! (capture#2 and capture#3)
            makeAndConsume( this, wc );            // compiles
        }
    }
    Incidentally, the makeAndConsume method does not compile (for the same reason) if you declare its signature as private < T > void makeAndConsume( Object t, Class< ? extends T > c ), which is a counter-example to use #4, above. Capture-helper methods must not have wildcard types in their parameters.
  6. The implicit existential is on the nearest class. What I mean here is that anytime there is a wildcard buried deep inside a parameterized type, such as F< G< H< ? extends T > > >, the existential binds the wildcard as closely as possible; something like F< G< (∃ S extends T) H< S > > >, as opposed to, say, (∃ S extends T) F< G< H< S > > >. In practice, this means that G is heterogeneous; if G were a collection type, for example, its elements could be a mixture of H< S1 > and H< S2 > for two different subtypes S1, S2 of T, and still be type-correct.

Friday, August 05, 2011

Requirements for using a Hadoop combiner

One way to control I/O for large Hadoop jobs, especially those whose mappers produce many records with relatively fewer distinct keys, is to introduce a combiner phase between the mapper and reducer. I have not seen a simple diagram explaining what the types should be for the combiner and what properties the algorithm must exhibit, so here goes. If your mapper extends Mapper< K1, V1, K2, V2 > and your reducer extends Reducer< K2, V2, K3, V3 >, then the combiner must be an extension of Reducer< K2, V2, K2, V2 >, and the following diagram must commute:

The triangle in the center of the diagram represents distributivity of the combiner function (i.e., reduce(k, combine(k, M)) = reduce(k, combine(k, ∪icombine(k,σi(M))) for any partition σ = {σi | i ∈ I} of M), because Hadoop does not guarantee how many times it will combine intermediate outputs, if at all.

A common pattern is to use the same function for combiner and reducer, but for this pattern to work, we must have K2 = K3 and V2 = V3 (and of course, the reducer itself must be distributive).

I should also mention that if you use a grouping comparator for your Reducer that is different from your sorting comparator, the above diagram is not correct. Unfortunately, and I'm pretty sure this is an outright bug in Hadoop, the sorting comparator is always used to group inputs for the Combiner's reduce() calls (see MAPREDUCE-3310).

Thursday, June 16, 2011

Escaping property placeholders in Spring XML config

The problem

You might have encountered the awkward situation in which you are
  1. using Spring and XML config
  2. substituting properties into that config via some PropertyPlaceholderConfigurator
  3. needing to set some value in the config to a literal string of the form "${identifier}"

By default, any string of the form in 3. above is a placeholder, and if you have no value for that placeholder, you get an exception. Spring JIRA issue SPR-4953, which recognizes the fact that there is no simple escaping syntax for placeholders, is still open as of this writing.

A snippet such as the following will cause the exception if there is no value available to substitute for the variable customerName, or actually substitute a value for it if it is available. Neither result is desirable in our scenario; we want the "${customerName}" to remain intact when it is injected into our bean.
<bean id="aTrickyBean" class="org.anic.veggies.AreGoodForYou">
    <constructor-arg name="expression" value="Hello, ${customerName}!"/>
</bean>

Most workarounds I have seen are unsatisfactory. You can use a customized placeholder configurator that sets its delimiter characters to something other than the default, for example, which would mean you would have to change the look of all the actual (unescaped) placeholders just to support the ones you want escaped.

The workaround

However, in Spring 3.x, you can work around this issue in a much more simple way using the following trick with SpEL:
<bean id="aTrickyBean" class="org.anic.veggies.AreGoodForYou">
    <constructor-arg name="expression" value="#{ 'Hello, $' + '{customerName}!' }"/>
</bean>
Note that in order for this trick to work it is vital that the '$' and the '{' be physically separated (in this case, on either side of a string concatenation).

Thursday, November 11, 2010

Random notes on Hadoop

I am talking about Hadoop 0.20 using a custom jar, not streaming or Hive or Pig.
  1. Make sure your distribution has the MAPREDUCE-1182 patch.
  2. Make sure you change the default setting of dfs.datanode.max.xcievers to something very large, like 4096. And yes, the property name is misspelled. In 0.22/0.23 the property will be called dfs.datanode.max.transfer.threads.
  3. If you've declared your key type to be T, you can't write an S, even if S is a subclass of T.

  4. There are several ways to get your dependencies visible to Hadoop tasks and tools, and they are all clunky.
    • You can bundle them all into the job jar in a folder called lib, although doing so does not make it possible to have custom input formats, output formats, mappers, and reducers in separate jars.
    • You can use the -libjars argument, but if you ever have to load a class via reflection (i.e., using Class.forName), you have to use the Thread.currentThread().getContextClassLoader() rather than the default. Also you might run into HADOOP-6103.
    • You can use DistributedCache.addFileToClassPath, but you have to be sure to put the file on HDFS and refer to it by its absolute pathname without a scheme or authority, and these files are only available to the tasks, not the tool/job.

  5. DistributedCache is just plain wonky. You must
    1. put your file on hdfs somehow
    2. call DistributedCache.addCacheFile(), using the full URI with "hdfs:///"
    3. in the mapper/reducer, use java.io.* APIs to access the files represented by the paths in DistributedCache.getLocalCacheFiles(). Incredibly, the "typical usage" example in the javadocs for DistributedCache just completely elides this crucial bit. If you try to use FileSystem.get().open(), you'll get a cryptic error message with a filename that looks like it's been mangled.
      I can't find a programmatic mapping between files added via addCacheFile() and paths retrieved by getLocalCacheFiles(), although there may be some support for sharing the name or specifying it with the "hdfs://source#target" syntax. None of this API is well-documented.
  6. You can't substitute Hadoop counters for actual aggregation in your reducer, tempting as that might be. Counters will differ from run to run, even against identical inputs, because of things that vary like speculative execution and task failures.
  7. If you configure your cluster to have 0 reduce slots (perhaps because your jobs are all map-only), and you accidentally submit a job that does require a reduce phase, that job will run all mappers to completion and then hang forever.

Sunday, October 24, 2010

6 10 Things I Hate About Java (or, Scala is the Way and the Light)

I've been working with Java quite extensively for about 4 years now, and it has been enjoyable for the most part. Garbage collection, the JVM, generics, anonymous classes, and superb IDE support have made my life much easier.

But a few things make me gnash my teeth on a daily basis, and it's funny, but none of them are issues in another JVM language in which I have been dabbling, Scala. It seems the developers of that language felt my pain as well.

  1. Miserable type inference. Apparently some of the problems with it are being addressed in project coin for Java 7. The blue portions of the following code are, to any sane programmer, maddeningly superfluous, but nevertheless strictly required in Java until at least mid-2011:

    List< Integer > li1 = new ArrayList< Integer >();
    List< Integer > li2 = Arrays.asList( 1, 2, 3 );
    o.processListOfInteger( Arrays.< Integer >asList() );

    Needless to say, equivalent initializations in Scala require no such redundant information.
  2. Generic invariance. An example from last week: I'm working on an implementation FrazzleExecutorService of java.util.concurrent.ScheduledExecutorService and a refinement FrazzleFuture< T > of java.util.ScheduledFuture< T >. Covariant subtyping lets me get away with returning a FrazzleFuture< T > from a method like FrazzleExecutorService.submit() without violating the contract. But I can't return List< FrazzleFuture< T > > from FrazzleExecutorService.invokeAll() because (a) ScheduledExecutorService would have had to declare the return type to be List< ? extends ScheduledFuture< T > >; (b) the returned list should have been immutable anyway; (c) generic types like List< T > are invariant in their parameters. In Scala, there is a separate mutable and immutable collections hierarchy, and at least in the immutable one, S <: T implies List[ S ] <: List[ T ], because List is declared covariant in its parameter.
  3. Collections can't be tersely initialized. Part of the blame is the Collections framework; part of it is the goddamn language. The following code illustrates, with green text indicating typical verbosity:

    List< Integer > li1 = new ArrayList< Integer >(
        Arrays.asList(
    1, 2, 3 )
    )
    ;
    @SuppressWarnings( "serial" )
    Map< String, String > mss1 = new HashMap< String, String >() { {
        put
    ( "foo", "bar" );
        put
    ( "this", "sucks" );
    } }
    ;

    Turns out the collections literals are also not supported in Scala; you have to type List( 1, 2, 3 ) or Map( "foo" -> "bar", "this" -> "rocks" ). Excuse me if I mock Java incessantly at this point. Collection improvements have been postponed until Java 8, scheduled for mid-2012.
  4. Higher-order programming is absurdly verbose. Rather than give code samples, I'll just refer you to these guys and let you see for yourself how even a library can't save you from massive boilerplate for the simplest things. And Scala? Lambda expressions are built-in as syntactic sugar for functional objects, making higher-order code simple, readable, and terse.
  5. Modeling variant types is awkward. You have to choose from among many bad options:
    RepresentationInterrogation
    one sparsely-populated class (S + T modeled as S × T)if-ladders based on comparing to null (see 8)
    S × T and an enum of type labelsswitch + casting
    a hierarchya bunch of isS() and isT() methods and casting
    a hierarchyvarious casting attempts wrapped with ClassCastException catch blocks (ok, that's not really an option, but I get that as an answer in interviews sometimes)
    a hierarchyif-ladders based on instanceof and casting
    a hierarchypolymorphic decomposition and the inevitable bloated APIs at the base class that result
    a hierarchy that includes Visitorpainfully verbose visitors (see 4 and 10)
    a hierarchy of Throwablesthrow and various catch blocks, which I suspect compiles to the same thing as the instanceof approach, only more expensive (but actually requires the least code!)
    Scala has case classes and pattern matching built in.
  6. No tuples. One ends up either creating Pair< S, T > or dozens of throwaway classes with "And" in the name, like CountAndElapsed. Scala has tuples, although I feel like they kind of screwed up by not going the full ML and making multi-argument functions/methods be the same as single-argument functions/methods over tuples. So to call a 2-arg function f with a pair p = ( p1, p2 ), you can either call f( p1, p2 ) or f.tupled( p ). There must be some deep reason for making the distinction.
  7. No mixins. If you need stuff from 2 abstract classes, you will be copying, or aggregating (with loads of monkey delegation boilerplate) at least one of the two.
  8. Null. The following code should illustrate:

    private static Doohickey getDoohickey( Thingamajigger t ) {
        Whatsit w;
        Foobar f;
        if ( null == t )
            return null;
        else if ( null == ( w = t.getWhatsit() ) )
            return null;
        else if ( null == ( f = w.getFoobar() ) )
            return null;
        else
            return f.getDoohickey();
    }

    I believe the "Elvis" operator was developed to solve this annoyance (return t.?getWhatsit().?getFoobar().?getDoohickey();) but it did not make the cut for Java 7 or even Java 8, from what I understand. Scala's solution to this issue is to recommend that operations which might not have a value for you return Option[ T ] instead of T. You can then map your method call to the Option and get back another Option without ever seeing a null pointer exception. Option is a variant type, easily modeled in Scala but not in Java (see 5).
  9. Iterators. They can't throw checked exceptions. They have to implement remove(), often by throwing (unchecked) UnsupportedOperationExceptions. For-each syntax can't work with iterators directly. None of these problems arise with the superb collections framework in Scala which is designed hand-in-hand with its clean higher-order programming (see 4).
  10. Void. This is a holdover from C, and is obviously not anything Java can get rid of, but it's stupid. Because of void, e.g., I can never do a visitor pattern with just one kind of visitor; there has to be one whose methods return a generic type T, and another whose methods return void. And don't try to sell me on the psuedo-type Void, because you still have to accept or return null somewhere. Scala has a type Unit with a single trivial value (), and unitary methods/functions can explicitly return that value or not return anything; the semantics are the same. Thus all expressions have some meaningful type, and classes with generic types can be fully general.

Wednesday, April 07, 2010

Why tabs are better

I'm tired of this stupid "tabs vs. spaces" code style debate. Tabs win hands down on just about every measure. Anyone still laboring under the misapprehension that it makes sense to indent one's source file using spaces should consider the following:
  1. Line-based comments (‘//’, ‘#’) at the head of the line don’t screw up the indentation (unless tab depth <= 2).
  2. You can change the indentation depth without editing the file. This is a huge feature, folks. If I like shallow indentation on all my source, I can make it so, and people who prefer the other extreme are not affected. The counter-argument (put forth by Checkstyle, among others) that one should not be required to set tab depth in order to read source is absurd; tab depth is always set to something, whether you like it or not (see 11), and code indented using tabs is readable regardless of the setting, unless tab depth is ridiculously high. The only code that actually does require a fixed tab depth to be legible is code that mixes tabs and spaces, which I encounter all too often. See 10.
  3. Spaces-based indentation will inevitably become inconsistent because no one can agree on his/her favorite indentation depth (see 2).
  4. Indentation mistakes are more obvious using tabs (unless tab depth = 1, which is just stupid).
  5. Tab indentation characters, when used properly, are more semantically relevant than spaces.
  6. Files are smaller (relevant especially for Javascript, CSS, HTML).
  7. Fewer keystrokes are needed to navigate within source files. Sorry, but “Ctrl+Right arrow” is two keystrokes, plus you have to hold one of them down.
  8. Making tabbed whitespace visible in an IDE is useful for eyeballing how things line up; making spaces visible is useful for “magic eye”.
  9. Tabs are unable to support the unreadable, but nevertheless default, function-call line-break style of making parameters line up with the opening ‘(’. Remember, it is a feature that this abomination is not supportable. Unfortunately it is still possible to put just the first parameter on the same line as the ‘(’, but no indentation choice can prevent that bad decision.
  10. If you have to edit a production config file using terminal-based default emacs, should you really be checking that in? I should add that the indentation used by default in Emacs (and pervasive in high-profile source such as the JDK) is a horrific hybrid of spaces and tabs which actually does force you to set your tab depth to a fixed value of 8 in order to read code thus indented. See 2.
  11. Some well-known tools (e.g., ReviewBoard) typically display tabs with a depth of 8, which is kind of high. I claim that this tab discrimination is also a feature, because it discourages deeply-nested code which is a good thing.

The only moderately sane argument in favor of spaces is that the code "always looks the same". Isn't that nice. You can write comments that use little "^^^^" to point to something on the line above. Wow. I guess that's worth throwing out points 1-11.

I'm not going to wade into the quagmire of my other personal code style choices. But it's time this debate, which rages again and again every time I join a new team, be permanently put to bed.