Un trabajo Map Reduce se divide en 2 clases (Mapper y Reducer) mas el método de entrada "main". En este caso, el mapper va a crear un listado del tipo
/** * Mapper. * Recibirá, línea tras línea (el objeto Text), los contenidos del fichero que * le pasemos por los argumentos. * * Vamos a mejorar el Mapper inicial quitando todos los caracteres no alfabéticos * de la cadena de entrada para evitar que, por ejemplo, "casa" y "casa." se * contabilicen como palabras distintas */ public static class WordCountMapper extends Mapper{ private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //Escribimos una expresión regular para eliminar todo caracter no alfabético String cleanString = value.toString().replaceAll("[^A-Za-z \\s]", "");
//Ahora cogemos el texto limpio y lo separamos por espacios StringTokenizer token = new StringTokenizer(cleanString); //Recorremos el tokenizer para recoger todas las palabras hasta que no haya mas while(token.hasMoreTokens()) { //Palabra actual String tok = token.nextToken(); //Asignar la palabra al objeto que se va a pasar al reducer word.set(tok); /* * Guardar la palabra con un valor número de 1 de tal manera que, si tenemos * la palabra "casa" estamos guardando Si la palabra casa volviera * a aparecer, como ya la hemos establecido una vez, el resultado sería * y así por cada ocurrencia */ context.write(word, new IntWritable(1)); } } }
Vamos ahora con el Reducer, ojo al atributo estático de la clase
/* * El reducer va a contar el número de ítems en la lista de palabras * que pasamos desde el Mapper */ public static class WordCountReducer extends Reducer{ @Override public void reduce(Text word, Iterable list, Context context) throws IOException, InterruptedException { //Ponemos el contador de palabras a 0 int total = 0; //Recorremos el objeto Iterable list y sumamos uno al contador for(IntWritable count : list) { total++; } /* Escribimos el resultado del reducer, para el ejemplo de casa, escribiría * */ context.write(word, new IntWritable(total)); } }
Ahora solo falta la clase main de entrada a la aplicación donde se configura todas las opciones del MapReduce
// Entrada de la aplicación public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //Creamos un fichero de configuracion y le damos un nombre al Job Configuration conf = new Configuration(); Job job = new Job(conf, "word count"); //Le tenemos que indicar la clase que hay que usar para llamar a los mappers y reducers job.setJarByClass(Wordcount.class); //Le indicamos el nombre de la clase Mapper y de la clase Reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); /* * Le tenemos que indicar el formato que va a tener el resultado, en nuestro caso vamos * a recuperar un resultado del tipopor lo que, usando * los tipos primitivos de Hadoop, esto equivaldría a Text.class y IntWritable.class */ job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /* Le indicamos el fichero de entrada y de salida que, por lo general, los vamos a recoger * de los parámetros que le pasemos a la clase JAR */ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //Esperamos a que el trabajo termine System.exit(job.waitForCompletion(true) ? 0 : 1); }
Nos queda exportar el JAR y ejecutarlo con algún fichero de prueba para contar sus palabras. Para exportarlo vamos a usar Eclipse, por su amplia uso actual y su simplicidad. Tendremos que exportarlo como un JAR ejecutable asegurándonos de tener las siguientes opciones seleccionadas. Es importante seleccionar "Extract required libraries into generated JAR" en el cuadro de diálogo de exportación que nos aparece:
Ahora sólo falta probarlo. Para ello podemos usar cualquiera de los 3 modos de Hadoop (local, pseudo-distribuido o distribuido). Por simplicidad vamos a usar el modo Local:
cd $HADOOP_INSTALL bin/hadoop jar mariocaster.blogspot.com-wordcount.jar example-files/input example-files/outputRecordar que la carpeta example-files/input debe contener los ficheros de texto a contar y la carpeta example-files/output NO debe existir cuando ejecutemos el script porque fallará (Hadoop se encargará de crear la carpeta). El resultado es similar al siguiente:
INFO util.NativeCodeLoader: Loaded the native-hadoop library WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). INFO input.FileInputFormat: Total input paths to process : 2 WARN snappy.LoadSnappy: Snappy native library not loaded INFO mapred.JobClient: Running job: job_local1360868529_0001 INFO mapred.LocalJobRunner: Waiting for map tasks INFO mapred.LocalJobRunner: Starting task: attempt_local1360868529_0001_m_000000_0 INFO util.ProcessTree: setsid exited with exit code 0 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4f69385e INFO mapred.MapTask: Processing split: file:/var/hadoop/examples-files/input/text2:0+118 INFO mapred.MapTask: io.sort.mb = 100 INFO mapred.MapTask: data buffer = 79691776/99614720 INFO mapred.MapTask: record buffer = 262144/327680 INFO mapred.MapTask: Starting flush of map output INFO mapred.MapTask: Finished spill 0 INFO mapred.Task: Task:attempt_local1360868529_0001_m_000000_0 is done. And is in the process of commiting INFO mapred.LocalJobRunner: INFO mapred.Task: Task 'attempt_local1360868529_0001_m_000000_0' done. INFO mapred.LocalJobRunner: Finishing task: attempt_local1360868529_0001_m_000000_0 INFO mapred.LocalJobRunner: Starting task: attempt_local1360868529_0001_m_000001_0 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4e3600d INFO mapred.MapTask: Processing split: file:/var/hadoop/examples-files/input/texto:0+72 INFO mapred.MapTask: io.sort.mb = 100 INFO mapred.MapTask: data buffer = 79691776/99614720 INFO mapred.MapTask: record buffer = 262144/327680 INFO mapred.MapTask: Starting flush of map output INFO mapred.MapTask: Finished spill 0 INFO mapred.Task: Task:attempt_local1360868529_0001_m_000001_0 is done. And is in the process of commiting INFO mapred.LocalJobRunner: INFO mapred.Task: Task 'attempt_local1360868529_0001_m_000001_0' done. INFO mapred.LocalJobRunner: Finishing task: attempt_local1360868529_0001_m_000001_0 INFO mapred.LocalJobRunner: Map task executor complete. INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@58017a75 INFO mapred.LocalJobRunner: INFO mapred.Merger: Merging 2 sorted segments INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 428 bytes INFO mapred.LocalJobRunner: INFO mapred.Task: Task:attempt_local1360868529_0001_r_000000_0 is done. And is in the process of commiting INFO mapred.LocalJobRunner: INFO mapred.Task: Task attempt_local1360868529_0001_r_000000_0 is allowed to commit now INFO output.FileOutputCommitter: Saved output of task 'attempt_local1360868529_0001_r_000000_0' to examples-files/ouput INFO mapred.LocalJobRunner: reduce > reduce INFO mapred.Task: Task 'attempt_local1360868529_0001_r_000000_0' done. INFO mapred.JobClient: map 100% reduce 100% INFO mapred.JobClient: Job complete: job_local1360868529_0001 INFO mapred.JobClient: Counters: 20 INFO mapred.JobClient: File Output Format Counters INFO mapred.JobClient: Bytes Written=249 INFO mapred.JobClient: FileSystemCounters INFO mapred.JobClient: FILE_BYTES_READ=2253 INFO mapred.JobClient: FILE_BYTES_WRITTEN=153667 INFO mapred.JobClient: File Input Format Counters INFO mapred.JobClient: Bytes Read=190 INFO mapred.JobClient: Map-Reduce Framework INFO mapred.JobClient: Reduce input groups=35 INFO mapred.JobClient: Map output materialized bytes=436 INFO mapred.JobClient: Combine output records=0 INFO mapred.JobClient: Map input records=16 INFO mapred.JobClient: Reduce shuffle bytes=0 INFO mapred.JobClient: Physical memory (bytes) snapshot=0 INFO mapred.JobClient: Reduce output records=35 INFO mapred.JobClient: Spilled Records=78 INFO mapred.JobClient: Map output bytes=346 INFO mapred.JobClient: CPU time spent (ms)=0 INFO mapred.JobClient: Total committed heap usage (bytes)=728760320 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0 INFO mapred.JobClient: Combine input records=0 INFO mapred.JobClient: Map output records=39 INFO mapred.JobClient: SPLIT_RAW_BYTES=216 INFO mapred.JobClient: Reduce input records=39
Finalmente, este es el resultado del MapReduce:
Este 1
This 1
a 2
an 1
are 1
contar 1
count 1
cuantas 1
de 1
del 2
dentro 1
ejemplo 1
es 1
estan 1
example 1
fichero 2
going 1
hay 1
in 1
is 1
it 1
of 1
palabras 2
que 1
sus 1
text 1
that 1
the 1
to 1
un 1
vamos 1
ver 1
we 1
words 1
y 1