jueves, mayo 29, 2014

Real Time Analytics con Apache Storm: Un contador de palabras en tiempo real

Introducción:

La manera en la que va a funcionar nuestro contador de palabras en Storm es ligeramente diferente a como funcionaba en Hadoop MapReduce pero esencialmente similar.

La idea de los Spouts y Bolts es la de dividir el trabajo de manera similar a como lo hacen los mappers y reducers de Hadoop MapReduce. En este caso, un spout se va a encargar de recopilar la información y pasársela a una cadena de Bolts que se van a encargar de procesarla de la manera que le indiquemos. Cabe destacar que, para hacer este tutorial no es necesario tener un cluster de Storm instalado ya que vamos a ejecutar la topología en modo local

Arquitectura

La manera en que vamos a organizar los Spouts y Bolts es la siguiente:
  1. Spout: LectorLineasSpout: se va a encargar de leer un fichero que le pasemos como argumento y de ir cada linea del archivo de texto a un Bolt
  2. Bolt 1: SeparadorPalabras: A este bolt le van a llegar las líneas que emita el spout anterior y se va a encargar de extraer las palabras de cada línea y emitirlas a un segundo bolt
  3. Bolt 2: ContadorPalabras: Este último Bolt va a recibir la emisión de palabras del SeparadorPalabras y las irá agregando a un HashMap que usaremos como contador.
  4. Topologia.java: Esta última clase es la que junta y da coherencia a todas las anteriores. Aquí definiremos el orden de ejecución (Spout, Bolt1, Bolt2), el modo de ejecución (local o cluster), etc. etc.

Creando el proyecto en Maven

Para simplificar un poco la creación del proyecto y la gestión de las dependencias, vamos a usar maven. Vamos a ejecutar los siguientes comandos en orden para crear un directorio para el proyecto, y creamos un "pom" de Maven.:


cd $HOME
mkdir contador-storm
cd contador-storm
touch pom.xml

En el POM, vamos a poner lo siguiente:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>mariocaster.blogspot.com.hadoop</groupId>
  <artifactId>storm-word-count</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>2.3.2</version>
    <configuration>
     <source>1.6</source>
     <target>1.6</target>
     <compilerVersion>1.6</compilerVersion>
    </configuration>
   </plugin>
  </plugins>
 </build>
 
 <dependencies>
  <!-- Storm Dependency -->
  <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-core</artifactId>
   <version>0.9.1-incubating</version>
      <scope>compile</scope>
  </dependency>
 </dependencies>
 
 <repositories>
     
     <repository>
        <id>clojars.org</id>
        <url>http://clojars.org/repo</url>
     </repository>
     
  </repositories>
</project>

Escribiendo el Spout

Escribir el Spout es bastante sencillo, tenemos que hacerle leer un fichero línea por línea y hacer que emita cada una de esas líneas. Además tenemos que indicarle un nombre para el dato que estamos emitiendo que, en este caso, va a ser "linea".


public class LectorLineasSpout extends BaseRichSpout
{
 /**
  * 
  */
 private static final long serialVersionUID = 6785329165603525275L;
 private boolean completado = false;
 private FileReader lectorFichero;
 private SpoutOutputCollector colector;
 private TopologyContext ctx;

 /**
  * Este es el primer método que se llama en el spout
  * 
  * En este caso se encarga de abrir el archivo del que
  * vamos a leer
  */
 @Override
 public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, 
   SpoutOutputCollector colector) 
 {
  try 
  {
   this.setCtx(ctx);
   
   //Creamos un lector de ficheros
   this.lectorFichero = new FileReader(
     map.get("archivoPalabras").toString());
  } catch (FileNotFoundException e) {
   //Bloque de ejecución en caso de que el archivo
   //no exista o haya algún error en la apertura
   throw new RuntimeException(
     "Error reading file [" + map.get("wordFile")+"]"
   );
  }
  this.colector = colector;
 }

 /**
  * Este método es el que lee cada línea del fichero
  * de entrada y se encarga de emitirlas para ser
  * procesadas por los bolt
  */
 @Override
 public void nextTuple() 
 {
  //Comprueba si el fichero se ha terminado de leer
  if(this.completado)
  {
   //Esperar antes de intentar leer el fichero de nuevo
   try { Thread.sleep(1000);}
   catch (InterruptedException e) {}
  }
  
  //Creamos un lector de ficheros para leer linea por linea
  String linea;
  BufferedReader lector = new BufferedReader(this.lectorFichero);
  try 
  {
   while((linea = lector.readLine()) != null)
   {
    //Emitimos cada linea por el colector para
    //que los bolts las procesen
    this.colector.emit(new Values(linea), linea);
   }
  } catch (Exception e){
   throw new RuntimeException("Error leyendo", e);
  } finally {
   //Indicamos que el fichero se ha terminado de leer
   completado = true;
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  declarador.declare(new Fields("linea"));
 }

 public TopologyContext getCtx() {
  return ctx;
 }

 public void setCtx(TopologyContext ctx) {
  this.ctx = ctx;
 }

}

Escribiendo el Bolt 1: SeparadorPalabras

Pasamos a escribir el primer Bolt, el SeparadorPalabras. Este Bolt recibe líneas con palabras que se tiene encargar de separar y limpiar de espacios en blanco, caracteres de puntuación y números:


public class SeparadorPalabrasBolt extends BaseRichBolt 
{
 /**
  * 
  */
 private static final long serialVersionUID = -505615409788951751L;
 private OutputCollector colector;

 /**
  * El bolt recibe una linea de texto por llamada
  * 
  * Lo que hace es separar las palabras, quitarles
   * todo lo que no sean letras de la "a" a la "z"
   * y pasarlo todo a minúsculas
  * 
  */
 @Override
 public void execute(Tuple lineaEntrada)
 {
  //Creamos un array de palabras con la linea de entrada
  String linea = lineaEntrada.getString(0);
  String[] palabras = linea.split(" ");
  
  //Recorremos todas las palabras 
  //para pasarlas al siguiente bolt
  for(String palabra: palabras)
  {
   //"Limpiamos" la palabra de espacios, caracteres y números
   palabra = palabra.toString().replaceAll("[^A-Za-z\\s]", "");
   
   if(!palabra.isEmpty())
   {
    palabra = palabra.toLowerCase();
    
    //Emitir la palabra al siguiente bolt
    this.colector.emit(new Values(palabra));
   }
  }
  
  //Ack a la tupla
  colector.ack(lineaEntrada);
 }

 /**
  * Este método se llama el primero dentro de la clase
  * Prepara el bolt para los datos que va a tener que usar
  */
 @Override
 public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, OutputCollector colector)
 {
  this.colector = colector;
 }

 /**
  * 
  */
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  declarador.declare(new Fields("palabra"));
 }

}

Escribirendo el Bolt 2: ContadorPalabras:

Nuestro último Bolt, el contador de palabras, se va a encargar de recoger las palabras emitidas por el primer bolt y agregarlas a un diccionario clave/valor que vamos a tener a tal efecto. Un trabajo sencillo:


public class ContadorPalabrasBolt extends BaseRichBolt 
{
 /**
  * 
  */
 private static final long serialVersionUID = -9211454481547491256L;
 private String nombre;
 private Integer id;
 private OutputCollector colector;
 private Map cuenta;
 
 @Override
 public void execute(Tuple palabraEntrada) 
 {
  String palabra = palabraEntrada.getString(0);
  
  /**
  * Comprueba si existe ya la palabra en el Map
  * y la crea si no existe todavia
  */
  if(!this.cuenta.containsKey(palabra))
  {
   this.cuenta.put(palabra, 1);
  } else {
   Integer c = this.cuenta.get(palabra) + 1;
   this.cuenta.put(palabra, c);
  }
  
  //Ack a la tupla
  colector.ack(palabraEntrada);
 }

 @Override
 public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, OutputCollector colector)
 {
  this.cuenta = new HashMap();
  this.colector = colector;
  
  this.nombre = ctx.getThisComponentId();
  this.id = ctx.getThisTaskId();
 }

 /**
  * Al finalizar el Spout, mostrar por consola la
  * cuenta de palabras
  */
 @Override
 public void cleanup() {
  super.cleanup();
  System.out.println(
    "-- Contador palabras [" + nombre + "-" + id + "] --");
  
  for(Map.Entry resultado: cuenta.entrySet())
  {
   System.out.println(
     resultado.getKey() + ", " + resultado.getValue());
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  //Ultimo bolt que no llega a emitir nada mas
 }

}

Juntándolo todo: Escribiendo la topología

Por último, nos quedaría escribir la clase con la topología con toda la configuración. Esta clase se divide en tres fases:
  1. Definir la topología
    1. Aquí le indicamos el spout y el orden de ejecución de los bolts.
    2. Además le vamos a indicar el tipo de agrupación. Si os fijáis hay dos tipos de agrupación:
      1. shuffleGrouping: Donde cada emisión es lanzada a un bolt aleatorio
      2. fieldsGrouping: Donde emisiones iguales son enviadas al mismo bolt. De esta agrupamos los resultados de palabras de manera parecida a cuando hacemos un "GROUP BY" en SQL (en realidad, el problema radica en la naturaleza de la aplicación distribuida que, en caso de que la misma palabra no llegue al mismo Bolt, se haría una cuenta por separado de esa palabra en ese Bolt y en el resultado final nos aparecería dicha palabra varias veces, una por cada nodo Bolt que le haya llegado la palabra).
  2. Creamos un objeto de configuración clave/valor donde le asignamos a la clave "archivoPalabras" el argumento que le hayamos pasado al ejecutar la aplicación
  3. Arrancamos la topología indicando que vamos a hacer una prueba en local y le damos un nombre a la topología.

public class Topologia
{
 public static void main(String[] args) throws InterruptedException
 {
  //Definicion de la topología
  TopologyBuilder constructor = new TopologyBuilder();
  constructor.setSpout("lector-lineas", new LectorLineasSpout());
  constructor.setBolt("separador-palabras", new SeparadorPalabrasBolt())
   .shuffleGrouping("lector-lineas");
  constructor.setBolt("contador-palabras", new ContadorPalabrasBolt())
   .fieldsGrouping("separador-palabras", new Fields("palabra"));
  
  //Configuracion
  Config conf = new Config();
  conf.put("archivoPalabras", args[0]);
  conf.setDebug(false);
  conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
  
  //Arrancar la topología
  LocalCluster cluster = new LocalCluster();
  cluster.submitTopology(
    "Mi-primera-topología", 
    conf, 
    constructor.createTopology()
   );
  Thread.sleep(1000);
  cluster.shutdown();
 }
}
Para ejecutar nuestra topología, sólo tenemos que ejecutar en la linea de comandos:

mvn compile
mvn exec:java -Dexec.mainClass="topology.Topologia" -Dexec.args="res/texto"
El resultado es un montón de información escrita con log4j donde, en algún punto, podréis ver lo siguiente:

-- Contador palabras [contador-palabras-2] --
palabras, 2
a, 1
contar, 1
ver, 1
un, 1
cuantas, 1
dentro, 1
de, 1
sus, 1
ejemplo, 1
hay, 1
fichero, 2
estan, 1
del, 2
que, 1
este, 1
vamos, 1
es, 1
y, 1

En el siguiente tutorial sobre Storm, haremos una instalación de un cluster para poder lanzar el trabajo en él. Un saludo!

0 comentarios:

Publicar un comentario