viernes, mayo 09, 2014

Escribiendo un Hadoop MapReduce en Java: Un Wordcount mejorado

En este tutorial, vamos a crear nuestro propio MapReduce contador de palabras y luego lo vamos a usar en nuestro cluster pseudo-distribuido de Hadoop.

Un trabajo Map Reduce se divide en 2 clases (Mapper y Reducer) mas el método de entrada "main". En este caso, el mapper va a crear un listado del tipo y el reducer va a "reducir" el listado de valores a un único valor que, en nuestro caso, será el número de ocurrencias de la clave.

/**
 *  Mapper.
 *  Recibirá, línea tras línea (el objeto Text), los contenidos del fichero que
 *  le pasemos por los argumentos.
 *
 *  Vamos a mejorar el Mapper inicial quitando todos los caracteres no alfabéticos
 *  de la cadena de entrada para evitar que, por ejemplo, "casa" y "casa." se
 *  contabilicen como palabras distintas
 */
public static class WordCountMapper extends Mapper {
  private Text word = new Text();

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException 
  {
    //Escribimos una expresión regular para eliminar todo caracter no alfabético
    String cleanString = value.toString().replaceAll("[^A-Za-z\\s]", "");

    //Ahora cogemos el texto limpio y lo separamos por espacios
    StringTokenizer token = new StringTokenizer(cleanString);
    
    //Recorremos el tokenizer para recoger todas las palabras hasta que no haya mas
    while(token.hasMoreTokens())
    {
      //Palabra actual
      String tok = token.nextToken();

      //Asignar la palabra al objeto que se va a pasar al reducer
      word.set(tok);

      /*
       * Guardar la palabra con un valor número de 1 de tal manera que, si tenemos
       * la palabra "casa" estamos guardando  Si la palabra casa volviera
       * a aparecer, como ya la hemos establecido una vez, el resultado sería 
       *  y así por cada ocurrencia
       */
      context.write(word, new IntWritable(1));
    }
  }
}
Vamos ahora con el Reducer, ojo al atributo estático de la clase
/*
 * El reducer va a contar el número de ítems en la lista de palabras 
 * que pasamos desde el Mapper
 */
public static class WordCountReducer extends Reducer {

  @Override
  public void reduce(Text word, Iterable list, Context context)
      throws IOException, InterruptedException
  {
    //Ponemos el contador de palabras a 0
    int total = 0;
    
    //Recorremos el objeto Iterable list y sumamos uno al contador
    for(IntWritable count : list)
    {
      total++;
    }
    
    /* Escribimos el resultado del reducer, para el ejemplo de casa, escribiría
     * 
     */
    context.write(word, new IntWritable(total));
  }
  
}
Ahora solo falta la clase main de entrada a la aplicación donde se configura todas las opciones del MapReduce
// Entrada de la aplicación
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
{
  //Creamos un fichero de configuracion y le damos un nombre al Job
  Configuration conf = new Configuration();
  Job job = new Job(conf, "word count");

  //Le tenemos que indicar la clase que hay que usar para llamar a los mappers y reducers
  job.setJarByClass(Wordcount.class);

  //Le indicamos el nombre de la clase Mapper y de la clase Reducer
  job.setMapperClass(WordCountMapper.class);
  job.setReducerClass(WordCountReducer.class);

  /* 
   * Le tenemos que indicar el formato que va a tener el resultado, en nuestro caso vamos
   * a recuperar un resultado del tipo  por lo que, usando
   * los tipos primitivos de Hadoop, esto equivaldría a Text.class y IntWritable.class
   */
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  /* Le indicamos el fichero de entrada y de salida que, por lo general, los vamos a recoger
   * de los parámetros que le pasemos a la clase JAR
   */
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  //Esperamos a que el trabajo termine
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Nos queda exportar el JAR y ejecutarlo con algún fichero de prueba para contar sus palabras. Para exportarlo vamos a usar Eclipse, por su amplia uso actual y su simplicidad. Tendremos que exportarlo como un JAR ejecutable asegurándonos de tener las siguientes opciones seleccionadas. Es importante seleccionar "Extract required libraries into generated JAR" en el cuadro de diálogo de exportación que nos aparece:

Ahora sólo falta probarlo. Para ello podemos usar cualquiera de los 3 modos de Hadoop (local, pseudo-distribuido o distribuido). Por simplicidad vamos a usar el modo Local:

cd $HADOOP_INSTALL
bin/hadoop jar mariocaster.blogspot.com-wordcount.jar example-files/input example-files/output
Recordar que la carpeta example-files/input debe contener los ficheros de texto a contar y la carpeta example-files/output NO debe existir cuando ejecutemos el script porque fallará (Hadoop se encargará de crear la carpeta). El resultado es similar al siguiente:

INFO util.NativeCodeLoader: Loaded the native-hadoop library
WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
INFO input.FileInputFormat: Total input paths to process : 2
WARN snappy.LoadSnappy: Snappy native library not loaded
INFO mapred.JobClient: Running job: job_local1360868529_0001
INFO mapred.LocalJobRunner: Waiting for map tasks
INFO mapred.LocalJobRunner: Starting task: attempt_local1360868529_0001_m_000000_0
INFO util.ProcessTree: setsid exited with exit code 0
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4f69385e
INFO mapred.MapTask: Processing split: file:/var/hadoop/examples-files/input/text2:0+118
INFO mapred.MapTask: io.sort.mb = 100
INFO mapred.MapTask: data buffer = 79691776/99614720
INFO mapred.MapTask: record buffer = 262144/327680
INFO mapred.MapTask: Starting flush of map output
INFO mapred.MapTask: Finished spill 0
INFO mapred.Task: Task:attempt_local1360868529_0001_m_000000_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task 'attempt_local1360868529_0001_m_000000_0' done.
INFO mapred.LocalJobRunner: Finishing task: attempt_local1360868529_0001_m_000000_0
INFO mapred.LocalJobRunner: Starting task: attempt_local1360868529_0001_m_000001_0
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4e3600d
INFO mapred.MapTask: Processing split: file:/var/hadoop/examples-files/input/texto:0+72
INFO mapred.MapTask: io.sort.mb = 100
INFO mapred.MapTask: data buffer = 79691776/99614720
INFO mapred.MapTask: record buffer = 262144/327680
INFO mapred.MapTask: Starting flush of map output
INFO mapred.MapTask: Finished spill 0
INFO mapred.Task: Task:attempt_local1360868529_0001_m_000001_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task 'attempt_local1360868529_0001_m_000001_0' done.
INFO mapred.LocalJobRunner: Finishing task: attempt_local1360868529_0001_m_000001_0
INFO mapred.LocalJobRunner: Map task executor complete.
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@58017a75
INFO mapred.LocalJobRunner: 
INFO mapred.Merger: Merging 2 sorted segments
INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 428 bytes
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task:attempt_local1360868529_0001_r_000000_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task attempt_local1360868529_0001_r_000000_0 is allowed to commit now
INFO output.FileOutputCommitter: Saved output of task 'attempt_local1360868529_0001_r_000000_0' to examples-files/ouput
INFO mapred.LocalJobRunner: reduce > reduce
INFO mapred.Task: Task 'attempt_local1360868529_0001_r_000000_0' done.
INFO mapred.JobClient:  map 100% reduce 100%
INFO mapred.JobClient: Job complete: job_local1360868529_0001
INFO mapred.JobClient: Counters: 20
INFO mapred.JobClient:   File Output Format Counters 
INFO mapred.JobClient:     Bytes Written=249
INFO mapred.JobClient:   FileSystemCounters
INFO mapred.JobClient:     FILE_BYTES_READ=2253
INFO mapred.JobClient:     FILE_BYTES_WRITTEN=153667
INFO mapred.JobClient:   File Input Format Counters 
INFO mapred.JobClient:     Bytes Read=190
INFO mapred.JobClient:   Map-Reduce Framework
INFO mapred.JobClient:     Reduce input groups=35
INFO mapred.JobClient:     Map output materialized bytes=436
INFO mapred.JobClient:     Combine output records=0
INFO mapred.JobClient:     Map input records=16
INFO mapred.JobClient:     Reduce shuffle bytes=0
INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
INFO mapred.JobClient:     Reduce output records=35
INFO mapred.JobClient:     Spilled Records=78
INFO mapred.JobClient:     Map output bytes=346
INFO mapred.JobClient:     CPU time spent (ms)=0
INFO mapred.JobClient:     Total committed heap usage (bytes)=728760320
INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
INFO mapred.JobClient:     Combine input records=0
INFO mapred.JobClient:     Map output records=39
INFO mapred.JobClient:     SPLIT_RAW_BYTES=216
INFO mapred.JobClient:     Reduce input records=39

Finalmente, este es el resultado del MapReduce:
Este 1
This 1
a 2
an 1
are 1
contar 1
count 1
cuantas 1
de 1
del 2
dentro 1
ejemplo 1
es 1
estan 1
example 1
fichero 2
going 1
hay 1
in 1
is 1
it 1
of 1
palabras 2
que 1
sus 1
text 1
that 1
the 1
to 1
un 1
vamos 1
ver 1
we 1
words 1
y 1