Thursday, November 11, 2010

Random notes on Hadoop

I am talking about Hadoop 0.20 using a custom jar, not streaming or Hive or Pig.
  1. Make sure your distribution has the MAPREDUCE-1182 patch.
  2. Make sure you change the default setting of dfs.datanode.max.xcievers to something very large, like 4096. And yes, the property name is misspelled. In 0.22/0.23 the property will be called dfs.datanode.max.transfer.threads.
  3. If you've declared your key type to be T, you can't write an S, even if S is a subclass of T.

  4. There are several ways to get your dependencies visible to Hadoop tasks and tools, and they are all clunky.
    • You can bundle them all into the job jar in a folder called lib, although doing so does not make it possible to have custom input formats, output formats, mappers, and reducers in separate jars.
    • You can use the -libjars argument, but if you ever have to load a class via reflection (i.e., using Class.forName), you have to use the Thread.currentThread().getContextClassLoader() rather than the default. Also you might run into HADOOP-6103.
    • You can use DistributedCache.addFileToClassPath, but you have to be sure to put the file on HDFS and refer to it by its absolute pathname without a scheme or authority, and these files are only available to the tasks, not the tool/job.

  5. DistributedCache is just plain wonky. You must
    1. put your file on hdfs somehow
    2. call DistributedCache.addCacheFile(), using the full URI with "hdfs:///"
    3. in the mapper/reducer, use java.io.* APIs to access the files represented by the paths in DistributedCache.getLocalCacheFiles(). Incredibly, the "typical usage" example in the javadocs for DistributedCache just completely elides this crucial bit. If you try to use FileSystem.get().open(), you'll get a cryptic error message with a filename that looks like it's been mangled.
      I can't find a programmatic mapping between files added via addCacheFile() and paths retrieved by getLocalCacheFiles(), although there may be some support for sharing the name or specifying it with the "hdfs://source#target" syntax. None of this API is well-documented.
  6. You can't substitute Hadoop counters for actual aggregation in your reducer, tempting as that might be. Counters will differ from run to run, even against identical inputs, because of things that vary like speculative execution and task failures.
  7. If you configure your cluster to have 0 reduce slots (perhaps because your jobs are all map-only), and you accidentally submit a job that does require a reduce phase, that job will run all mappers to completion and then hang forever.

No comments: