viernes, mayo 30, 2014

Logística de datos con Apache Sqoop: Importando bases de datos al HDFS

Exportando BBDD SQL con Apache Sqoop


cd $HOME
wget http://apache.rediris.es/sqoop/1.4.4/sqoop-1.4.4.bin__hadoop-0.23.tar.gz
tar -zxvf sqoop-1.4.4.bin__hadoop-0.23.tar.gz
Vamos a crear una tabla en MySQL muy básica que se llame "sqoop-table" que contenga ID y un nombre de usuario simplemente dentro de la DDBB "sqoop-test"

CREATE TABLE `sqoop-table` ( id int NOT NULL, name varchar(255) NOT NULL, PRIMARY KEY (id) );
INSERT INTO `sqoop-table` VALUES (1, "mario");
INSERT INTO `sqoop-table` VALUES (2, "caster");
+------+-------+
| id   | name  |
+------+-------+
| 1    | mario |
| 2    | caster|
+------+-------+


Un problema que nos podemos encontrar hoy dia es que la instalación de Sqoop no incluya el driver jdbc de mysql ya que este fue sacado de la carpeta "lib" para poder mantener el proyecto con licencia Apache. El driver, se encuentra en "/usr/share/java/mysql-connector-java" así que simplemente vamos a hacer un link a dicha libreria con el comando "ln" desde la carpeta de Sqoop:

ln -s /usr/share/java/mysql-connector-java.jar lib/


Ya estamos listos, lo que vamos a hacer es importar la tabla con valores por defecto de Sqoop, esto nos va a generar un archivo por cada mapper con una linea por fila y con los valores separados por comas. Para simplificarlo vamos a usar un único mapper:

bin/sqoop import --connect jdbc:mysql://localhost/sqoop-test --table sqoop-table -m 1 --username root --password mypass


Vamos a explicar paso a paso la siguiente ejecución:

  1. Llamamos a la orden import ya que es la acción que queremos realizar
  2. --connect jdbc:mysql://localhost/sqoop-test Le decimos cómo encontrar nuestra base de datos: usando el driver JDBC de MySQL para Java y le pasamos la URI de la BBDD
  3. Le indicamos que tabla queremos usar de la BBDD: --table sqoop-table
  4. -m 1: Se le indica el número de Mappers a usar. Con un único mapper tendremos un único output y, ya que el script no va a usar reducers, también nos generará un único archivo.
  5. --username root: Lo normal es tener acceso restringido a la BBDD así que le pasamos las credenciales de acceso, en nuestro caso es root.
  6. --password mypass: Y le pasamos la pass para acceder a la BBDD.
Tras ejecutar la orden, un jar de MapReduce se generará y ejecutará de la manera habitual en la consola. El chorreo de logs debería ser conocido ya a estas alturas:
INFO mapred.JobClient: Running job: job_201405291246_0003
INFO mapred.JobClient:  map 0% reduce 0%
INFO mapred.JobClient:  map 100% reduce 0%
INFO mapred.JobClient: Job complete: job_201405291246_0003


Bien, nuestra tabla ya se encuentra en el HDFS en un único fichero en formato csv. Vamos a ver su contenido:

hadoop fs -ls /user/mariocaster
drwxr-xr-x   - mariocaster supergroup          0 2014-05-29 13:31 /user/mariocaster/sqoop-table

hadoop fs -ls /user/mariocaster/sqoop-table
-rw-r--r--   2 mariocaster supergroup          0 2014-05-29 15:23 /user/MaC/sqoop-table/_SUCCESS
drwxr-xr-x   - mariocaster supergroup          0 2014-05-29 15:22 /user/MaC/sqoop-table/_logs
-rw-r--r--   2 mariocaster supergroup         17 2014-05-29 15:22 /user/MaC/sqoop-table/part-m-00000

hadoop fs -cat /user/mariocaster/sqoop-table/part-m-00000
1,mario
2,caster


Como podéis apreciar, ya tenemos el contenido de nuestra base de datos como un fichero dentro del HDSF. Esperemos que hayáis disfrutado del tutorial. El siguiente tutorial sobre Sqoop será cómo exportar datos desde el HDFS a una MySQL. ¡Saludos!

jueves, mayo 29, 2014

Real Time Analytics con Apache Storm: Un contador de palabras en tiempo real

Introducción:

La manera en la que va a funcionar nuestro contador de palabras en Storm es ligeramente diferente a como funcionaba en Hadoop MapReduce pero esencialmente similar.

La idea de los Spouts y Bolts es la de dividir el trabajo de manera similar a como lo hacen los mappers y reducers de Hadoop MapReduce. En este caso, un spout se va a encargar de recopilar la información y pasársela a una cadena de Bolts que se van a encargar de procesarla de la manera que le indiquemos. Cabe destacar que, para hacer este tutorial no es necesario tener un cluster de Storm instalado ya que vamos a ejecutar la topología en modo local

Arquitectura

La manera en que vamos a organizar los Spouts y Bolts es la siguiente:
  1. Spout: LectorLineasSpout: se va a encargar de leer un fichero que le pasemos como argumento y de ir cada linea del archivo de texto a un Bolt
  2. Bolt 1: SeparadorPalabras: A este bolt le van a llegar las líneas que emita el spout anterior y se va a encargar de extraer las palabras de cada línea y emitirlas a un segundo bolt
  3. Bolt 2: ContadorPalabras: Este último Bolt va a recibir la emisión de palabras del SeparadorPalabras y las irá agregando a un HashMap que usaremos como contador.
  4. Topologia.java: Esta última clase es la que junta y da coherencia a todas las anteriores. Aquí definiremos el orden de ejecución (Spout, Bolt1, Bolt2), el modo de ejecución (local o cluster), etc. etc.

Creando el proyecto en Maven

Para simplificar un poco la creación del proyecto y la gestión de las dependencias, vamos a usar maven. Vamos a ejecutar los siguientes comandos en orden para crear un directorio para el proyecto, y creamos un "pom" de Maven.:


cd $HOME
mkdir contador-storm
cd contador-storm
touch pom.xml

En el POM, vamos a poner lo siguiente:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>mariocaster.blogspot.com.hadoop</groupId>
  <artifactId>storm-word-count</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>2.3.2</version>
    <configuration>
     <source>1.6</source>
     <target>1.6</target>
     <compilerVersion>1.6</compilerVersion>
    </configuration>
   </plugin>
  </plugins>
 </build>
 
 <dependencies>
  <!-- Storm Dependency -->
  <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-core</artifactId>
   <version>0.9.1-incubating</version>
      <scope>compile</scope>
  </dependency>
 </dependencies>
 
 <repositories>
     
     <repository>
        <id>clojars.org</id>
        <url>http://clojars.org/repo</url>
     </repository>
     
  </repositories>
</project>

Escribiendo el Spout

Escribir el Spout es bastante sencillo, tenemos que hacerle leer un fichero línea por línea y hacer que emita cada una de esas líneas. Además tenemos que indicarle un nombre para el dato que estamos emitiendo que, en este caso, va a ser "linea".


public class LectorLineasSpout extends BaseRichSpout
{
 /**
  * 
  */
 private static final long serialVersionUID = 6785329165603525275L;
 private boolean completado = false;
 private FileReader lectorFichero;
 private SpoutOutputCollector colector;
 private TopologyContext ctx;

 /**
  * Este es el primer método que se llama en el spout
  * 
  * En este caso se encarga de abrir el archivo del que
  * vamos a leer
  */
 @Override
 public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, 
   SpoutOutputCollector colector) 
 {
  try 
  {
   this.setCtx(ctx);
   
   //Creamos un lector de ficheros
   this.lectorFichero = new FileReader(
     map.get("archivoPalabras").toString());
  } catch (FileNotFoundException e) {
   //Bloque de ejecución en caso de que el archivo
   //no exista o haya algún error en la apertura
   throw new RuntimeException(
     "Error reading file [" + map.get("wordFile")+"]"
   );
  }
  this.colector = colector;
 }

 /**
  * Este método es el que lee cada línea del fichero
  * de entrada y se encarga de emitirlas para ser
  * procesadas por los bolt
  */
 @Override
 public void nextTuple() 
 {
  //Comprueba si el fichero se ha terminado de leer
  if(this.completado)
  {
   //Esperar antes de intentar leer el fichero de nuevo
   try { Thread.sleep(1000);}
   catch (InterruptedException e) {}
  }
  
  //Creamos un lector de ficheros para leer linea por linea
  String linea;
  BufferedReader lector = new BufferedReader(this.lectorFichero);
  try 
  {
   while((linea = lector.readLine()) != null)
   {
    //Emitimos cada linea por el colector para
    //que los bolts las procesen
    this.colector.emit(new Values(linea), linea);
   }
  } catch (Exception e){
   throw new RuntimeException("Error leyendo", e);
  } finally {
   //Indicamos que el fichero se ha terminado de leer
   completado = true;
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  declarador.declare(new Fields("linea"));
 }

 public TopologyContext getCtx() {
  return ctx;
 }

 public void setCtx(TopologyContext ctx) {
  this.ctx = ctx;
 }

}

Escribiendo el Bolt 1: SeparadorPalabras

Pasamos a escribir el primer Bolt, el SeparadorPalabras. Este Bolt recibe líneas con palabras que se tiene encargar de separar y limpiar de espacios en blanco, caracteres de puntuación y números:


public class SeparadorPalabrasBolt extends BaseRichBolt 
{
 /**
  * 
  */
 private static final long serialVersionUID = -505615409788951751L;
 private OutputCollector colector;

 /**
  * El bolt recibe una linea de texto por llamada
  * 
  * Lo que hace es separar las palabras, quitarles
   * todo lo que no sean letras de la "a" a la "z"
   * y pasarlo todo a minúsculas
  * 
  */
 @Override
 public void execute(Tuple lineaEntrada)
 {
  //Creamos un array de palabras con la linea de entrada
  String linea = lineaEntrada.getString(0);
  String[] palabras = linea.split(" ");
  
  //Recorremos todas las palabras 
  //para pasarlas al siguiente bolt
  for(String palabra: palabras)
  {
   //"Limpiamos" la palabra de espacios, caracteres y números
   palabra = palabra.toString().replaceAll("[^A-Za-z\\s]", "");
   
   if(!palabra.isEmpty())
   {
    palabra = palabra.toLowerCase();
    
    //Emitir la palabra al siguiente bolt
    this.colector.emit(new Values(palabra));
   }
  }
  
  //Ack a la tupla
  colector.ack(lineaEntrada);
 }

 /**
  * Este método se llama el primero dentro de la clase
  * Prepara el bolt para los datos que va a tener que usar
  */
 @Override
 public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, OutputCollector colector)
 {
  this.colector = colector;
 }

 /**
  * 
  */
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  declarador.declare(new Fields("palabra"));
 }

}

Escribirendo el Bolt 2: ContadorPalabras:

Nuestro último Bolt, el contador de palabras, se va a encargar de recoger las palabras emitidas por el primer bolt y agregarlas a un diccionario clave/valor que vamos a tener a tal efecto. Un trabajo sencillo:


public class ContadorPalabrasBolt extends BaseRichBolt 
{
 /**
  * 
  */
 private static final long serialVersionUID = -9211454481547491256L;
 private String nombre;
 private Integer id;
 private OutputCollector colector;
 private Map cuenta;
 
 @Override
 public void execute(Tuple palabraEntrada) 
 {
  String palabra = palabraEntrada.getString(0);
  
  /**
  * Comprueba si existe ya la palabra en el Map
  * y la crea si no existe todavia
  */
  if(!this.cuenta.containsKey(palabra))
  {
   this.cuenta.put(palabra, 1);
  } else {
   Integer c = this.cuenta.get(palabra) + 1;
   this.cuenta.put(palabra, c);
  }
  
  //Ack a la tupla
  colector.ack(palabraEntrada);
 }

 @Override
 public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, OutputCollector colector)
 {
  this.cuenta = new HashMap();
  this.colector = colector;
  
  this.nombre = ctx.getThisComponentId();
  this.id = ctx.getThisTaskId();
 }

 /**
  * Al finalizar el Spout, mostrar por consola la
  * cuenta de palabras
  */
 @Override
 public void cleanup() {
  super.cleanup();
  System.out.println(
    "-- Contador palabras [" + nombre + "-" + id + "] --");
  
  for(Map.Entry resultado: cuenta.entrySet())
  {
   System.out.println(
     resultado.getKey() + ", " + resultado.getValue());
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  //Ultimo bolt que no llega a emitir nada mas
 }

}

Juntándolo todo: Escribiendo la topología

Por último, nos quedaría escribir la clase con la topología con toda la configuración. Esta clase se divide en tres fases:
  1. Definir la topología
    1. Aquí le indicamos el spout y el orden de ejecución de los bolts.
    2. Además le vamos a indicar el tipo de agrupación. Si os fijáis hay dos tipos de agrupación:
      1. shuffleGrouping: Donde cada emisión es lanzada a un bolt aleatorio
      2. fieldsGrouping: Donde emisiones iguales son enviadas al mismo bolt. De esta agrupamos los resultados de palabras de manera parecida a cuando hacemos un "GROUP BY" en SQL (en realidad, el problema radica en la naturaleza de la aplicación distribuida que, en caso de que la misma palabra no llegue al mismo Bolt, se haría una cuenta por separado de esa palabra en ese Bolt y en el resultado final nos aparecería dicha palabra varias veces, una por cada nodo Bolt que le haya llegado la palabra).
  2. Creamos un objeto de configuración clave/valor donde le asignamos a la clave "archivoPalabras" el argumento que le hayamos pasado al ejecutar la aplicación
  3. Arrancamos la topología indicando que vamos a hacer una prueba en local y le damos un nombre a la topología.

public class Topologia
{
 public static void main(String[] args) throws InterruptedException
 {
  //Definicion de la topología
  TopologyBuilder constructor = new TopologyBuilder();
  constructor.setSpout("lector-lineas", new LectorLineasSpout());
  constructor.setBolt("separador-palabras", new SeparadorPalabrasBolt())
   .shuffleGrouping("lector-lineas");
  constructor.setBolt("contador-palabras", new ContadorPalabrasBolt())
   .fieldsGrouping("separador-palabras", new Fields("palabra"));
  
  //Configuracion
  Config conf = new Config();
  conf.put("archivoPalabras", args[0]);
  conf.setDebug(false);
  conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
  
  //Arrancar la topología
  LocalCluster cluster = new LocalCluster();
  cluster.submitTopology(
    "Mi-primera-topología", 
    conf, 
    constructor.createTopology()
   );
  Thread.sleep(1000);
  cluster.shutdown();
 }
}
Para ejecutar nuestra topología, sólo tenemos que ejecutar en la linea de comandos:

mvn compile
mvn exec:java -Dexec.mainClass="topology.Topologia" -Dexec.args="res/texto"
El resultado es un montón de información escrita con log4j donde, en algún punto, podréis ver lo siguiente:

-- Contador palabras [contador-palabras-2] --
palabras, 2
a, 1
contar, 1
ver, 1
un, 1
cuantas, 1
dentro, 1
de, 1
sus, 1
ejemplo, 1
hay, 1
fichero, 2
estan, 1
del, 2
que, 1
este, 1
vamos, 1
es, 1
y, 1

En el siguiente tutorial sobre Storm, haremos una instalación de un cluster para poder lanzar el trabajo en él. Un saludo!

miércoles, mayo 28, 2014

Instalando un servidor de Zookeeper

Descargando e instalando Zookeeper

Para descargar y descomprimir la release, hay que usar los comandos a continuación.

cd $HOME
wget 'http://apache.rediris.es/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz'
tar -zxvf zookeeper-3.4.6.tar.gz
mv zookeeper-3.4.6.tar.gz zookeeper
cd zookeeper
export ZOOKEEPER_HOME=$HOME/zookeeper

Para los propósitos de uso de Storm, con un Zookeeper en modo single sería suficiente, no obstante es recomendable usar al menos tres. Para mantener el tutorial simple, vamos a hacer la configuración para usar un único Zookeeper.

El siguiente paso a realizar, es crear un fichero de configuración de Zookeeper. El propio Zookeeper viene con un fichero de ejemplo que con renombrarlo ya lo tendremos todo hecho para el modo single node:


mv conf/zoo_sample.conf conf/zoo.conf

Lo último que tenemos que hacer para arrancar el servidor es ejecutar el fichero "zkServer.sh start" dentro de la carpeta bin que iniciará el servidor en segundo plano:


$ZOOKEEPER_HOME/bin/zkServer.sh start

¡Hecho! Ya tenemos el servidor Zookeeper corriendo ahora vamos a hacer alguna prueba para comprobar que esté funcionando sin problemas. Para ello usaremos el cliente Zookeeper que viene con la release descargada y le indicaremos el servidor:puerto donde se encuentra el servidor de Zookeeper (NOTA: si no hemos tocado nada de la configuración del servidor de Zookeeper, este viene predeterminado en el puerto 2182):

$ZOOKEEPER_HOME/bin/zkCli.sh -server 127.0.0.1:2182

Nos saldrán unos mensajes de log y un prompt. Si escribimos help podemos ver una serie de comandos que podemos ejecutar en el terminal.
Welcome to ZooKeeper!
2014-04-27 22:56:38,225 [myid:] - INFO  [main-SendThread(172.17.0.2:2181):ClientCnxn$SendThread@975] - Opening socket connection to server 172.17.0.2/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2014-04-27 22:56:38,234 [myid:] - INFO  [main-SendThread(172.17.0.2:2181):ClientCnxn$SendThread@852] - Socket connection established to 172.17.0.2/172.17.0.2:2181, initiating session
[zk: 172.17.0.2:2181(CONNECTING) 0] 2014-04-27 22:56:38,655 [myid:] - INFO  [main-SendThread(172.17.0.2:2181):ClientCnxn$SendThread@1235] - Session establishment complete on server 172.17.0.2/172.17.0.2:2181, sessionid = 0x145a4f801ef0000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

[zk: 172.17.0.2:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
 connect host:port
 get path [watch]
 ls path [watch]
 set path data [version]
 rmr path
 delquota [-n|-b] path
 quit 
 printwatches on|off
 create [-s] [-e] path data acl
 stat path [watch]
 close 
 ls2 path [watch]
 history 
 listquota path
 setAcl path acl
 getAcl path
 sync path
 redo cmdno
 addauth scheme auth
 delete path [version]
 setquota -n|-b val path

Ya tenemos nuestro cliente y prompt abierto. Vamos a crear un nuevo znode llamado maricaster y asignarle el nombre de este blog (mariocaster.blogspot.com)

[zk: 172.17.0.2:2181(CONNECTED) 1] create /mariocaster mariocaster.blogspot.com 
Created /mariocaster
[zk: 172.17.0.2:2181(CONNECTED) 2] ls /
[mariocaster, zookeeper]

Si hacemos "ls /", uno de los comandos básicos para listar znodes, podemos ver que tenemos un nuevo znode llamado "mariocaster". Ahora vamos a ver el contenido del znode con la orden "get":


[zk: 172.17.0.2:2181(CONNECTED) 3] get /mariocaster
mariocaster.blogspot.com
cZxid = 0x2
ctime = Sun Apr 27 22:59:32 CEST 2014
mZxid = 0x2
mtime = Sun Apr 27 22:59:32 CEST 2014
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 24
numChildren = 0
[zk: 172.17.0.2:2181(CONNECTED) 4]

Como podemos ver en el resultado, aparece el nombre del blog. Ahora vamos a cambiar el valor para agregarle el http:// que hemos olvidado con la orden "set"

[zk: 172.17.0.2:2181(CONNECTED) 4] set /mariocaster http://mariocaster.blogspot.com
cZxid = 0x2
ctime = Sun Apr 27 22:59:32 CEST 2014
mZxid = 0x3
mtime = Sun Apr 27 23:03:41 CEST 2014
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 31
numChildren = 0
[zk: 172.17.0.2:2181(CONNECTED) 5] get /mariocaster
http://mariocaster.blogspot.com
cZxid = 0x2
ctime = Sun Apr 27 22:59:32 CEST 2014
mZxid = 0x3
mtime = Sun Apr 27 23:03:41 CEST 2014
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 31
numChildren = 0
[zk: 172.17.0.2:2181(CONNECTED) 6]

Hemos hecho un get al final para comprobar el valor actual del znode que, como podemos comprobar, efectivamente ha cambiado. Por último vamos a borrar este znode recién creado:

[zk: 172.17.0.2:2181(CONNECTED) 6] delete /mariocaster
[zk: 172.17.0.2:2181(CONNECTED) 7] ls /      
[zookeeper]
[zk: 172.17.0.2:2181(CONNECTED) 8]

Y con esto termina el tutorial introductorio a Zookeeper, herramienta que vamos a necesitar para la instalación de Apache Storm, motor para aplicaciones de tiempo real distribuidas que veremos en otro tutorial

lunes, mayo 26, 2014

Primeros pasos con machine learning usando Mahout... Un recomendador de productos

Recomendador de productos

Lo que vamos a hacer con Apache Mahout es el "Hello World" del machine learning: un recomendador de productos. En esencia, un recomendador de productos compara los productos que le gustan a un usuario con usuarios que gustos similares para recomendar al primero un producto similar basándose en los gustos del segundo.

Dicho con un ejemplo: Pongamos que:

A Jaimito le gustan los Refrescos de Cola y no le gustan los Batidos de chocolate.
A Pepito le gustan los Refrescos de Naranja y no le gustan los Batidos de Vainilla.
Entonces nuestro programa de machine learning podría decir que a Jaimito le podrían gustar los refrescos de Naranja, ya que le gustan los refrescos, por lo que le recomendaríamos la Fanta Naranja.

Esto, aunque sea una explicación un poco arcaica e inexacta, es una manera fácil y rápida de entender cómo funciona un recomendador de productos a través de machine learning. Ya habrá tiempo de ponerse purista :D

Descargando e instalando Mahout

  1. Web oficial: http://mahout.apache.org/
  2. Release a utilizar 0.9

cd $HOME
tar -zxvf mahout-distribution-0.9.tar.gz

Creando la clase del recomendador


public class MlMain {

 public static void main(String[] args) throws IOException, TasteException {
  //String pathToCsvFile = "/var/hadoop/examples-files/input/ml-dataset";
  String pathToCsvFile = "/var/hadoop/examples-files/input/ml-node-gen";
  
  SampleRecommender sampleRecommender = new SampleRecommender(pathToCsvFile);
  
  for(int i=1;i<=10;i++){
   List recommendationsList = sampleRecommender.getRecommendations(i,2);
   
   for (RecommendedItem recommendation : recommendationsList) {
    System.out.println(
      "User:" + i + 
      ", recommended item: " + recommendation.getItemID() + 
      " (value: " + recommendation.getValue() + ")");
   } 
  }
 }
 
 static class SampleRecommender {
  private DataModel model = null;
  private UserBasedRecommender recommender = null;
  
  public SampleRecommender(String path) throws IOException, TasteException
  {
   model = new FileDataModel(new File(path));
   UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
   UserNeighborhood neighborhood = new ThresholdUserNeighborhood(0.1, similarity, model);
   recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);
  }
  
  public List getRecommendations(int user, int itemNumber) throws TasteException
  {
   if(this.model!=null && this.recommender!=null)
    return recommender.recommend(user, itemNumber);
   else
    return null;
  }
 }
}

Si nos fijamos un poco, podemos ver claramente que el "core" del recomendador se encuentra dentro del constructor llamado SampleRecommender. Tiene 4 lineas:

  1. Primero cargamos el dataset y creamos el modelo del mismo
  2. Le indicamos a Mahout qué algoritmo queremos usar para hacer comparaciones de las interacciones que tienen los usuarios con los distintos productos. El método que usamos aquí es el de coeficiente de correlación de Pearson con el cual le indicaremos el umbral de correlación que queremos utilizar. Mirando el link de wikipedia podemos ver que:
    1. Si r = 1, existe una correlación positiva perfecta. 
    2. Si 0 < r < 1, existe una correlación positiva.
    3. Si r = 0, no existe relación lineal pero podría ser no lineal
  3. Vamos a usar un umbral de correlación de 0.1 de tal manera que con que la correlación se levemente positiva, consideraremos que el producto se puede recomendar
  4. Por último, pasamos todos las variables previamente generadas para recoger nuestro generador.
Cuando ya tenemos el generador, lo único que tenemos que hacer es preguntarle sobre el usuario del que queramos sacar una recomendación y cuantas recomendaciones queremos obtener con el método recommender.recommend(user, itemNumber);

El resultado que podríais obtener es como el siguiente:

User:4, recommended item: 2 (value: 14.750008)
User:5, recommended item: 2 (value: 12.130403)
User:7, recommended item: 4 (value: 17.72526)
User:8, recommended item: 3 (value: 11.70087)
User:9, recommended item: 4 (value: 15.584593)
User:9, recommended item: 1 (value: 13.836835)

Los resultados obtenidos, al haber usado un dataset generado aleatoriamente, serán distintos cada vez que generemos un dataset nuevo. Además, dada la aleatoriedad del "algoritmo" de generación de datasets, es incluso posible que los resultados no guarden la relación necesaria para poder usarlos con el recomendador pero, con un par de intentos debería generar un dataset válido.

En el siguiente tutorial sobre Mahout, vamos a distribuir el recomendador en el cluster de Hadoop.

martes, mayo 20, 2014

Usando Amazon EMR para ejecutar un Hadoop MapReduce

AWS Elastic MapReduce (EMR)

Lo que vamos a hacer es usar las capacidad de Amazon EMR (Elastic Map Reduce) para ejecutar nuestro contador de palabras sin tener que preocuparnos de instalaciones de clusters.

NOTA IMPORTANTE:

Amazon EMR NO es gratuito y no entra dentro de las posibilidades de la AWS Free Tier por lo que realizar este tutorial, aunque cuesta menos de 1€, no es gratuito. Al final del todo pondré una captura de pantalla del coste total que fueron 0.21€

Creando un bucket S3 y subiendo los archivos

Lo primero que tendremos que hacer es crear un bucket S3 para subir archivos dentro. S3 es un sistema de almacenamiento en la nube que, con la AWS Free tier nos otorga 5gb de almacenamiento gratuito. Habrá que meterse en la cuenta de AWS y seleccionar, dentro de los servicios, el S3:


Una vez en la ventana de S3, hay que hacer click en "Create bucket":

Y darle un nombre al bucket:

Subir los archivos al bucket

Ahora que ya tenemos un bucket de almacenamiento, hay que seleccionar "Actions-Upload" para seleccionar los dos archivos que queremos subir, uno de ellos el fichero del que queremos contar las palabras y otro el JAR que contiene el Wordcount que escribimos en el tutorial de 



Para este caso, he subido una versión del Quijote en texto plano que encontré por la red y el JAR con el Wordcount . El bucket debe presentar este aspecto:

Creando el cluster

Paso 1: Cluster configuration

Ahora, seleccionamos el servicio de Amazon EMR de la selección de servicios. En la ventana que nos aparece hay que seleccionar "Create cluster":

Primer paso para la creación de un cluster



Atención al botón de "Configure sample application". Como se puede intuir, este botón nos abre otra ventana para poder ejecutar el cluster con algunos trabajos de ejemplo (entre ellos el propio contador de palabras). Como lo que queremos es aprender como se hace, vamos a evitar usar el botón.

Paso 2: Software configuration

Una vez le hemos dado un nombre al cluster y hemos seleccionado una ubicación en nuestro S3 para el almacenamiento de los logs, pasamos a seleccionar la AMI y versión de Hadoop a usar:
Vamos a usar la version 2.4.2 de la AMI con hadoop 1.0.3. Como podeis ver, también nos da la opción de instalar Hive o Pig. Para los propósitos de este tutorial no será necesario pero no pasa nada si se instalan.

Paso 3: Hardware configuration

Pasamos al siguiente paso que va a ser la configuración de acceso. Para los propósitos de este tutorial vamos a usar una instancia small para el contenedor del Namenode, el SecondaryNamenode y el JobTracker y 2 instancias small con un DataNode y un TaskTracker cada una. Por último, aunque no es estrictamente necesario, hay que seleccionar una clave PEM para el acceso a las máquinas. A nosotros no nos va a hacer falta ya que sólo vamos a arrancar las máquinas para hacer el conteo y se van a parar al finalizar.

Paso 4: Steps

El último de los pasos para la creación del cluster son los "Steps", que son los trabajos a realizar. Nuestro step va a ser ejecutar el Wordcount que vendrá configurado automáticamente (lo seleccionamos en un paso anterior cuando abrimos el cuadro de diálogo de "Configure Sample Application").


Debemos editar el paso para indicarle el fichero del que queremos contar el número de palabras. En la nueva ventana que aparece, sólo tendremos que abrir el cuadro de diálogo de "Input S3 Location" y seleccionar el fichero de "quijote.txt" como se puede apreciar a continuación:
Asegurarse de tener elegido "Terminate cluster" en "Action on Failure"

Navegar y seleccionar el archivo a contar palabras

Una vez tengamos todo, seleccionamos "Select" dentro de la ventana de "Select S3 Folder" y "Save" en la ventana de "Add Step". Ya está el trabajo y el cluster configurado para ejecutar.

Finalmente hacemos click en "Create cluster" y nuestro MapReduce ya estará corriendo.

Algunos aspectos básicos de AWS EMR: Si por alguna circunstancia hay un error de configuración dentro del cluster, no se cargará ningún gasto a vuestra cuenta. Sólo se carga cuando el MapReduce consigue iniciarse, en cuyo caso, si hay error; sí que habrá un pequeño cargo.

Resultado


Los resultados de la ejecución los podremos ver en nuestro S3, dentro de la carpeta que venía configurada  en el "Output S3 folder" dentro de la ventana de "Add Step". En la siguiente imagen podéis ver en el resultado del Wordcount dentro del bucket S3:


Notas finales

Como se puede apreciar, la ejecución de una tarea EMR en AWS es realmente sencilla, mucho mas que crear nuestro propio cluster, nuestro propio MapReduce, etc. Esto demuestra las tremendas posibilidades de usar AWS EMR para la ejecución de docenas o cientos de "Workers" a sólo unos clicks de ratón. A mi, personalmente, me dejó impresionado, sobre todo teniendo en cuenta que también se pueden ejecutar scripts Pig y Hive con la misma facilidad.

lunes, mayo 19, 2014

Instalando Hadoop en modo distribuido

Hadoop "Fully Distributed"

Aunque pueda asustar, la instalación de un sistema distribuido de Hadoop no es tan complicado como pueda parecer. En realidad, siguiendo el tutorial sobre como instalarlo en modo pseudo-distribuido ya tenemos la mayor parte del trabajo hecho: Instalando Hadoop en modo pseudo-distribuido

Las diferencias  mas simples entre el modo distribuido y el modo pseudo distribuido son las siguientes:
  1. El nodo NameNode debe poder acceder por SSH sin contraseña a todos los nodos esclavos y a si mismo. (no es necesario que los nodos esclavos puedan acceder al nodo maestro).
  2. Todos los nodos deben compartir la misma configuración de Hadoop (el contenido de "conf", "etc/conf", o "/etc/hadoop" dependiendo de la versión).
  3. La reglas de Firewall deben permitir la comunicación via TCP entre todos los tipos de nodos en modo bidireccional

Comenzando

Lo primero, vamos a usar dos portátiles que vamos a llamar E1 y E2. Tenemos que instalar Hadoop en los dos equipos como si fuera modo pseudo-distribuido usando el tutorial del link anterior. Ahora lo importante es cambiar las direcciones IP's por direcciones reales dentro de nuestra LAN por lo que, si nuestro localhost es el 192.168.1.10 dentro de nuestra LAN, tendremos que usar esta IP.

Una vez tengamos los dos equipos con Hadoop en modo pseudo-distribuido vamos a ver como dividimos las tareas entre los dos equipos:
  1. E1: El equipo 1 va a ser el nodo "Maestro", por tanto vamos a tener en ejecución los siguientes hilos:
    1. Namenode
    2. SecondaryNamenode
    3. JobTracker
  2. E3: El Equipo 2 va a ser el nodo "Worker" por lo que las tareas en ejecución serán las siguientes:
    1. TaskTracker
    2. DataNode
Entraremos a la consola del E1, nuestra IP es 192.168.1.10. Vamos a editar, dentro de la carpeta de Hadoop, el fichero de configuración de esclavos donde vamos a indicar la IP del E2 (192.168.1.20). Dicho fichero lo encontraremos en conf/slaves:

192.168.1.20
Además, también hay que indicar la máquina que va a tener el SecondaryNamenode, para ello tenermos que editar el fichero conf/masters para agregrar la nuestra IP (la IP del E1):

192.168.1.10

NOTA: Aunque el fichero se llame "masters", no indica las máquinas donde se van a alojar el NameNode ni el Jobtracker, quedando estas configuraciones a disposición del fichero mapred-site.xml y del nodo que arranque el NameNode (en nuestro caso será el E1)

Configurar el acceso SSH

El E1 debe tener acceso por SSH sin contraseña al E2 y a sí mismo. Si aún no sabes como conseguir esto, está todo explicado en el tutorial para instalar Hadoop en modo pseudo-distribuido.

Configurar el mapred-site y el hdfs-site

Ahora, tendremos que tener comunes los ficheros hdfs-site y mapred-site a lo largo de nuestro cluster. En mi experiencia he encontrado bastante sencilla la sincronización a través de un repositorio git como puede ser Github pero cualquier solución es igualmente válida:







	
		dfs.replication
		2
	







    mapred.job.tracker
    192.168.1.10:54311




En el primero, el hdfs, estamos diciéndole que cada bloque de información lo duplique a lo largo del cluster (dfs-replication=2). En el segundo le estamos pasando la IP del E1 que es el que va a ejecutar el JobTracker.

Arrancando el cluster

Por último, hay que arrancar el cluster, desde el E1 y el directorio de instalación de Hadoop ejecutaremos:

NameNode
JobTracker
SecondaryNameNode

Y, si nos vamos al E2, podremos ver los siguientes:
DataNode
TaskTracker

Si resulta que te encuentras el Datanode y el Tasktracker dentro de la E1 es probablemente porque todavía conservas la IP de1 dentro del fichero conf/slaves.

Siguientes pasos, ¡instalar Hadoop en todas las máquinas que tenga por casa!

viernes, mayo 09, 2014

Escribiendo un Hadoop MapReduce en Java: Un Wordcount mejorado

En este tutorial, vamos a crear nuestro propio MapReduce contador de palabras y luego lo vamos a usar en nuestro cluster pseudo-distribuido de Hadoop.

Un trabajo Map Reduce se divide en 2 clases (Mapper y Reducer) mas el método de entrada "main". En este caso, el mapper va a crear un listado del tipo y el reducer va a "reducir" el listado de valores a un único valor que, en nuestro caso, será el número de ocurrencias de la clave.

/**
 *  Mapper.
 *  Recibirá, línea tras línea (el objeto Text), los contenidos del fichero que
 *  le pasemos por los argumentos.
 *
 *  Vamos a mejorar el Mapper inicial quitando todos los caracteres no alfabéticos
 *  de la cadena de entrada para evitar que, por ejemplo, "casa" y "casa." se
 *  contabilicen como palabras distintas
 */
public static class WordCountMapper extends Mapper {
  private Text word = new Text();

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException 
  {
    //Escribimos una expresión regular para eliminar todo caracter no alfabético
    String cleanString = value.toString().replaceAll("[^A-Za-z\\s]", "");

    //Ahora cogemos el texto limpio y lo separamos por espacios
    StringTokenizer token = new StringTokenizer(cleanString);
    
    //Recorremos el tokenizer para recoger todas las palabras hasta que no haya mas
    while(token.hasMoreTokens())
    {
      //Palabra actual
      String tok = token.nextToken();

      //Asignar la palabra al objeto que se va a pasar al reducer
      word.set(tok);

      /*
       * Guardar la palabra con un valor número de 1 de tal manera que, si tenemos
       * la palabra "casa" estamos guardando  Si la palabra casa volviera
       * a aparecer, como ya la hemos establecido una vez, el resultado sería 
       *  y así por cada ocurrencia
       */
      context.write(word, new IntWritable(1));
    }
  }
}
Vamos ahora con el Reducer, ojo al atributo estático de la clase
/*
 * El reducer va a contar el número de ítems en la lista de palabras 
 * que pasamos desde el Mapper
 */
public static class WordCountReducer extends Reducer {

  @Override
  public void reduce(Text word, Iterable list, Context context)
      throws IOException, InterruptedException
  {
    //Ponemos el contador de palabras a 0
    int total = 0;
    
    //Recorremos el objeto Iterable list y sumamos uno al contador
    for(IntWritable count : list)
    {
      total++;
    }
    
    /* Escribimos el resultado del reducer, para el ejemplo de casa, escribiría
     * 
     */
    context.write(word, new IntWritable(total));
  }
  
}
Ahora solo falta la clase main de entrada a la aplicación donde se configura todas las opciones del MapReduce
// Entrada de la aplicación
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
{
  //Creamos un fichero de configuracion y le damos un nombre al Job
  Configuration conf = new Configuration();
  Job job = new Job(conf, "word count");

  //Le tenemos que indicar la clase que hay que usar para llamar a los mappers y reducers
  job.setJarByClass(Wordcount.class);

  //Le indicamos el nombre de la clase Mapper y de la clase Reducer
  job.setMapperClass(WordCountMapper.class);
  job.setReducerClass(WordCountReducer.class);

  /* 
   * Le tenemos que indicar el formato que va a tener el resultado, en nuestro caso vamos
   * a recuperar un resultado del tipo  por lo que, usando
   * los tipos primitivos de Hadoop, esto equivaldría a Text.class y IntWritable.class
   */
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  /* Le indicamos el fichero de entrada y de salida que, por lo general, los vamos a recoger
   * de los parámetros que le pasemos a la clase JAR
   */
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  //Esperamos a que el trabajo termine
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Nos queda exportar el JAR y ejecutarlo con algún fichero de prueba para contar sus palabras. Para exportarlo vamos a usar Eclipse, por su amplia uso actual y su simplicidad. Tendremos que exportarlo como un JAR ejecutable asegurándonos de tener las siguientes opciones seleccionadas. Es importante seleccionar "Extract required libraries into generated JAR" en el cuadro de diálogo de exportación que nos aparece:

Ahora sólo falta probarlo. Para ello podemos usar cualquiera de los 3 modos de Hadoop (local, pseudo-distribuido o distribuido). Por simplicidad vamos a usar el modo Local:

cd $HADOOP_INSTALL
bin/hadoop jar mariocaster.blogspot.com-wordcount.jar example-files/input example-files/output
Recordar que la carpeta example-files/input debe contener los ficheros de texto a contar y la carpeta example-files/output NO debe existir cuando ejecutemos el script porque fallará (Hadoop se encargará de crear la carpeta). El resultado es similar al siguiente:

INFO util.NativeCodeLoader: Loaded the native-hadoop library
WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
INFO input.FileInputFormat: Total input paths to process : 2
WARN snappy.LoadSnappy: Snappy native library not loaded
INFO mapred.JobClient: Running job: job_local1360868529_0001
INFO mapred.LocalJobRunner: Waiting for map tasks
INFO mapred.LocalJobRunner: Starting task: attempt_local1360868529_0001_m_000000_0
INFO util.ProcessTree: setsid exited with exit code 0
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4f69385e
INFO mapred.MapTask: Processing split: file:/var/hadoop/examples-files/input/text2:0+118
INFO mapred.MapTask: io.sort.mb = 100
INFO mapred.MapTask: data buffer = 79691776/99614720
INFO mapred.MapTask: record buffer = 262144/327680
INFO mapred.MapTask: Starting flush of map output
INFO mapred.MapTask: Finished spill 0
INFO mapred.Task: Task:attempt_local1360868529_0001_m_000000_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task 'attempt_local1360868529_0001_m_000000_0' done.
INFO mapred.LocalJobRunner: Finishing task: attempt_local1360868529_0001_m_000000_0
INFO mapred.LocalJobRunner: Starting task: attempt_local1360868529_0001_m_000001_0
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4e3600d
INFO mapred.MapTask: Processing split: file:/var/hadoop/examples-files/input/texto:0+72
INFO mapred.MapTask: io.sort.mb = 100
INFO mapred.MapTask: data buffer = 79691776/99614720
INFO mapred.MapTask: record buffer = 262144/327680
INFO mapred.MapTask: Starting flush of map output
INFO mapred.MapTask: Finished spill 0
INFO mapred.Task: Task:attempt_local1360868529_0001_m_000001_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task 'attempt_local1360868529_0001_m_000001_0' done.
INFO mapred.LocalJobRunner: Finishing task: attempt_local1360868529_0001_m_000001_0
INFO mapred.LocalJobRunner: Map task executor complete.
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@58017a75
INFO mapred.LocalJobRunner: 
INFO mapred.Merger: Merging 2 sorted segments
INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 428 bytes
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task:attempt_local1360868529_0001_r_000000_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task attempt_local1360868529_0001_r_000000_0 is allowed to commit now
INFO output.FileOutputCommitter: Saved output of task 'attempt_local1360868529_0001_r_000000_0' to examples-files/ouput
INFO mapred.LocalJobRunner: reduce > reduce
INFO mapred.Task: Task 'attempt_local1360868529_0001_r_000000_0' done.
INFO mapred.JobClient:  map 100% reduce 100%
INFO mapred.JobClient: Job complete: job_local1360868529_0001
INFO mapred.JobClient: Counters: 20
INFO mapred.JobClient:   File Output Format Counters 
INFO mapred.JobClient:     Bytes Written=249
INFO mapred.JobClient:   FileSystemCounters
INFO mapred.JobClient:     FILE_BYTES_READ=2253
INFO mapred.JobClient:     FILE_BYTES_WRITTEN=153667
INFO mapred.JobClient:   File Input Format Counters 
INFO mapred.JobClient:     Bytes Read=190
INFO mapred.JobClient:   Map-Reduce Framework
INFO mapred.JobClient:     Reduce input groups=35
INFO mapred.JobClient:     Map output materialized bytes=436
INFO mapred.JobClient:     Combine output records=0
INFO mapred.JobClient:     Map input records=16
INFO mapred.JobClient:     Reduce shuffle bytes=0
INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
INFO mapred.JobClient:     Reduce output records=35
INFO mapred.JobClient:     Spilled Records=78
INFO mapred.JobClient:     Map output bytes=346
INFO mapred.JobClient:     CPU time spent (ms)=0
INFO mapred.JobClient:     Total committed heap usage (bytes)=728760320
INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
INFO mapred.JobClient:     Combine input records=0
INFO mapred.JobClient:     Map output records=39
INFO mapred.JobClient:     SPLIT_RAW_BYTES=216
INFO mapred.JobClient:     Reduce input records=39

Finalmente, este es el resultado del MapReduce:
Este 1
This 1
a 2
an 1
are 1
contar 1
count 1
cuantas 1
de 1
del 2
dentro 1
ejemplo 1
es 1
estan 1
example 1
fichero 2
going 1
hay 1
in 1
is 1
it 1
of 1
palabras 2
que 1
sus 1
text 1
that 1
the 1
to 1
un 1
vamos 1
ver 1
we 1
words 1
y 1

miércoles, mayo 07, 2014

Instalando Hadoop en una Raspberry Pi

¿Hadoop en una Raspberry Pi?

En cierto modo, se podría encontrar el motivo de esta entrada entre lo absurdo y lo innecesario. La capacidad de computación de una Raspberry Pi ni su memoria RAM no la hace excesivamente interesante para trabajos de Big Data, pero es cierto que ofrece una manera barata de montar un cluster con fines educativos en casa (especialmente si en casa tienes dos Raspberry Pi v2 una Raspberry Pi v1 y una BeagleBone Black)

La instalación de Hadoop en una Raspberry Pi resulta más sencillo de lo que cabe esperar. Y es que el pequeño ordenador de open hardware no se diferencia tanto de cualquier portátil u ordenador de sobremesa si obviamos la RAM de que dispone la v2 (512mb) y la capacidad del microprocesador (bueno, y el almacenamiento en sd, etc etc).

Básicamente, siguiendo el primero de los tutoriales sobre instalación de Hadoop e incluso modo tienes el 
90% del trabajo hecho. Sólo falta un pequeño detalle....

Y es que los trabajos de computación que se llegan a realizar en un Job MapReduce suelen requerir de gran cantidad de RAM y como nuestra pequeña Rpi no es que vaya sobrada precisamente, debemos indicar a la instalación de Hadoop unos límites para que el sistema operativo no se cargue los procesos del DataNode y del TaskTracker.

Esto es igualmente válido para ordenadores de consumo que no son los típicos que se pueden encontrar en un cluster de Hadoop de producción con chorrocientos Gb de RAM. En algún caso nos podríamos ver forzados a establecer límites en máquinas con más RAM, en alguna prueba que he hecho he tenido que establecer límites de 3gb en una máquina con 4Gb de RAM de 64 bits.

Déjate de rollos y dime que tengo que hacer

Fácil, para establecer estos límites tenemos que entrar en carpeta de configuración de Hadoop que en nuestra versión es $HADOOP_INSTALL/conf aunque también podría ser /etc/hadoop o en $HADOOP_INSTALL/etc/conf.

Una vez localizada la carpeta, abrir el archivo mapred-site.xml e introducir las siguientes propiedades:


    mapred.child.java.opt
    -Xmx386



    mapred.tasktracker.map.tasks.maximum
    1



    mapred.tasktracker.reduce.tasks.maximum
    1

Y voilá! Aunque parezca increíble esto fue lo único que tuve que hacer para agregar la Raspberry Pi a mi cluster casero. La explicación no es compleja: la primera propiedad limita la RAM utilizada por el proceso de Map o Reduce de la Rpi a 386, lo cual le da un pequeño margen para que el sistema operativo no nos mate el proceso. Las otras dos opciones limitan el número de operaciones de Map o Reduce que se pueden ejecutar en paralelo dentro de cada nodo de trabajo. Y es que si cada trabajo de Map o de Reduce es computacionalmente no muy intenso, se puede ajustar este parámetro para ejecutar varios trabajos en paralelo. En general he comprobado que en la Rpi no suele ser posible aumentar en número de tareas por encima de uno incluso en pequeños trabajos.

Y puestos a hablar de clusters caseros de Raspberrys Pi, ahí dejo un link de un señor que tuvo las pelotas.... Por la ley del máximo esfuerzo, de montarse un cluster con 40 Rpi Raspberry Pi 40 nodes cluster

lunes, mayo 05, 2014

Primeros pasos con Hive. Oootro contador de palabras

Descargando e instalando Hive

  1. Página web oficial: http://hive.apache.org/
  2. Site de releases: http://www.apache.org/dyn/closer.cgi/hive/
  3. Release que vamos a usar: http://ftp.cixug.es/apache/hive/hive-0.13.0/apache-hive-0.13.0-bin.tar.gz
Lo primero descargar y ejecutar la release que queramos usar:

wget 'http://ftp.cixug.es/apache/hive/hive-0.13.0/apache-hive-0.13.0-bin.tar.gz'
tar -zxvf apache-hive-0.13.0-bin.tar.gz
cd apache-hive-0.13.0-bin/bin
export HIVE_HOME=$HOME/apache-hive-q0.13.0-bin
export PATH=$PATH:$HIVE_HOME/bin

Arrancando el cluster de Hadoop

Hive necesita al menos Hadoop 1.2.1 corriendo para funcionar. Si no sabes como instalar Hadoop aún te recomiendo que te pases por el tutorial Instalando Hadoop en modo pseudo-distribuido (local)

Una vez tengamos el cluster corriendo, Hive necesita de dos carpetas dentro del HDFS con permisos de grupo para poder funcionar, para crear esas carpetas y darles los permisos necesarios ejecutaremos las órdenes siguientes:
hadoop fs -mkdir /user
hadoop fs -mkdir /user/hive
hadoop fs -mkdir /user/hive/warehouse
hadoop fs -mkdir /tmp
hadoop fs -chmod g+w /tmp
hadoop fs -chmod g+x /user/hive/warehouse

Como se puede apreciar, Hive necesita de las carpetas /tmp y /user/hive/warehouse para poder funcionar.

Ahora vamos a crear un fichero de ejemplo con el siguiente contenido para contar sus palabras. Lo llamaremos ejemplo.txt y lo vamos a guardar dentro de la carpeta bin de la carpeta de Hive:
Este es
un fichero de
ejemplo en
del que vamos
a contar
sus palabras
El cual tendremos que copiar dentro del HDFS. Recordamos el comando:
hadoop fs -copyFromLocal ejemplo.txt /tmp/ejemplo.txt
Ahora, ejecutamos la consola de Hive con el comando "hive" (sin comiillas) y creamos una tabla para almacenar el fichero de texto en ella:
hive>CREATE TABLE ejemplo (linea STRING);
Con el comando anterior, sólo hemos creado una tabla nueva, pero no la hemos cargado de información. Para cargar un fichero en una tabla ejecutaremos la orden siguiente:
hive>LOAD DATA INPATH '/tmp/ejemplo.txt' OVERWRITE INTO TABLE ejemplo;

Deleted hdfs://asusnotebook:54310/user/hive/warehouse/texto

Table default.texto stats: [numFiles=1, numRows=0, totalSize=73, rawDataSize=0]

OK

Time taken: 1.612 seconds
Ya tenemos el fichero cargado en la tabla ejemplo. Ahora hay que crear otra tabla para almacenar el resultado del conteo de palabras:

hive>CREATE TABLE contador AS 
SELECT palabra, count(1) AS cuenta FROM 
(SELECT explode(split(linea,' ')) AS palabra FROM texto) w 
GROUP BY palabra 
ORDER BY cuenta;
Esto comenzará creará dos MapReduce que comenzarán a trabajar de inmediato. Los mensajes que aparecen son como los siguientes (resumido)
[...]

Stage-2 map = 0%,  reduce = 0%

Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 0.84 sec

Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 2.12 sec

OK

Time taken: 42.304 seconds

Vamos a ver ahora que se ha creado dentro de la tabla contador

hive>SELECT * FROM contador;

a 1
contar 1
cuantas 1
de 1
del 2
dentro 1
ejemplo 1
es 1
estan 1
fichero 2
hay 1
palabras 2
que 1
sus 1
un 1
vamos 1
ver 1
y 1
Y ahí lo tenemos. Por poco casi no repito una palabra pero como se puede apreciar, la palabra "del", "fichero" y "palabras" aparecen 2 veces cada una.