martes, junio 10, 2014

Introducción a Apache Kafka... un recolector de mensajes

Introducción a Apache Kafka: Ingeniería de software de parte de Linkedin

Apache Kafka es un sistema distribuido de mensajería de suscripción-publicación sobre Scala. Su web oficial es http://kafka.apache.org/ y la distribución de la que vamos a hacer uso en este tutorial es la 0.8.1.1 para Scala 2.10

Descargando y extrayendo el paquete


cd $HOME
wget http://ftp.cixug.es/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
tar -zxvf kafka_2.10-0.8.1.1.tgz
mv kafka_2.10-0.8.1.1.tgz kafka
export KAFKA_HOME=$HOME/kafka
cd kafka

Vamos a necesitar un total de cuatro terminales abiertos. Lo primero que hay que hacer, después de haber descargado y extraído el tar, es iniciar el servidor de Zookeeper. Como se puede apreciar, Zookeeper forma un pilar fundamental de muchos de los paquetes que envuelven el ecosistema de Hadoop. Luego tenemos que iniciar el broker the Kafka y, por último, crear un topic de Kafka :
# Arrancar el servidor de Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# Crea el broker
bin/kafka-server-start.sh config/server.properties

# Crea el topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatopic
Created topic "kafkatopic".

Ahora que tenemos un Topic, tenemos que lanzar un "productor" para mandar mensajes. El productor se encarga de meter mensajes en la cola para un "consumidor" pueda recibirlos.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic

Por último, vamos a crear un consumidor para recibir los mensajes con el siguiente comando

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

Bien, volvemos al terminal del "productor" de mensajes para mandar el primero de nuestros mensajes:

# Terminal del productor. Escribimos el mensaje
http://mariocaster.blogspot.com

# Terminal del consumidor. El mensaje es recibido
http://mariocaster.blogspot.com

Con esto, podríamos usar la API para Java y escribir un Productor de mensajes. Un ejemplo sencillo sería como el siguiente:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class Main {
    public static void main(String args[]){

        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        props.put("request.required.acks", "1");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "kafkagroup");

        ProducerConfig config = new ProducerConfig(props);

        Producer producer = new Producer(config);
        KeyedMessage\\ msg = new KeyedMessage\("kafkatopic", null, "Message");

        producer.send(msg);
        producer.close();
    }
}

Con el código de arriba, cada vez que compilemos y ejecutemos, podremos ver el mensaje "Message" en el terminal del consumidor. Por otro lado, también podemos usar la API de Java para crear un Consumidor (un suscriptor):

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Main {
 
 public static void main(String[] args) {
  Properties props = new Properties();
  props.put("metadata.broker.list", "localhost:9092");
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
  props.put("request.required.acks", "1");
  props.put("zookeeper.connect", "localhost:2181");
  props.put("group.id", "kafkagroup");

  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

  Map topicCount = new HashMap();
  topicCount.put("kafkatopic", new Integer(1));
  Map>> consumerStreams = consumer.createMessageStreams(topicCount);
  List> streams = consumerStreams.get("kafkatopic");
  
  for (KafkaStream stream : streams) {
   ConsumerIterator consumerIte = stream.iterator();
   while (consumerIte.hasNext())
    System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
  }
 }
}


En este caso, al compilar y ejecutar la consola se quedará abierta recibiendo extrayendo los mensajes de la cola que está manteniendo Kafka. Abriendo el terminal del "productor" y escribiendo cualquier palabra podremos ver el resultado en nuestra consola de Java. Con esto terminamos el tutorial sobre Apache Kafka. En el siguiente tutorial en el que usemos Kafka, vamos a escribir un spout de Storm para recuperar los mensajes de Kafka y procesarlos con unos Bolts.

Primeros pasos con Apache Spark. Un contador de palabras en Scala y Spark

¿Qué es Apache Spark?

Spark es un motor para analítica de datos open source hecho como un framework de Scala y que pertenece al ecosistema de Hadoop, entre otras cosas, por hacer uso del HDFS (Hadoop Distributed File System).

Algunas de sus principales virtudes son que resulta mas fácil de usar que Hadoop MapReduce así como puede resultar computacionalmente mucho mas rápido en ciertos casos.

Su principal característica diferenciadora si lo comparamos con Hadoop MapReduce es que Spark realiza los cálculos en "in-memory" para alcanzar esas velocidades de cómputo superiores hasta 100 veces mas rápido que Hadoop MapReduce en memoria y hasta 10 veces mas rápido en disco.

Instalación

Lo primero será descargar Apache Spark desde la web oficial: http://spark.apache.org/ Para el propósito de este tutorial, y para guardar relación con el resto de tutoriales que usan Hadoop 1.2.1, vamos a descargar la versión binaria para Hadoop 1, escrito en la sección de descargas como "Download binaries for Hadoop 1 (HDP1, CDH3)"

Una vez descargado, extraemos y entramos en el directorio de se acaba de crear (opcional: vamos también a cambiar el nombre de la carpeta a simplemente "spark" para mejor legibilidad) Nosotros vamos a descargarlo e instalarlo en $HOME/spark

wget http://d3kbcqa49mib13.cloudfront.net/spark-0.9.1-bin-hadoop1.tgz
tar -zxvf spark-0.9.1-bin-hadoop1.tgz
mv spark-0.9.1-bin-hadoop1.tgz spark-0.9.1-bin-hadoop1 spark
cd spark
pwd
/home/ubuntu/spark

Bien, ahora vamos a exportar tanto la carpeta bin dentro de la instalación como el home de Spark y también vamos a compilar y ensamblar el paquete:


# Exportar directorios
export PATH=$PATH:$HOME/spark/bin
export SPARK_HOME=$HOME/spark

# Compilar y ensamblar
sbt/sbt clean compile
sbt/sbt assembly

# OPCIONAL: Agregar SPARK_HOME a nuestro ~/.bashrc o similar
echo 'export SPARK_HOME=$HOME/spark' >> ~/.bashrc

NOTA: Para agregar la ruta de spark/bin al PATH lo mas facil y seguro es abrir directamente el ~/.bashrc con un editor de textos y buscar la linea donde pone "export $PATH". Ahora tendremos que ejecutar nuestro cluster de Hadoop en modo pseudo-distribuido o distribuido (real). Si aun no sabes como montar un cluster de Hadoop con HDFS, que es en realidad lo que va a usar Spark, aquí puedes encontrar un gran tutorial al respecto: Instalando Hadoop en modo pseudo-distribuido

# Arrancar todos los procesos de Hadoop
$HADOOP_INSTALL/bin/start-all.sh

Primeras pruebas

Una vez tengamos todos los procesos corriendo: NameNode, SecondaryNameNode, JobTracker (para el nodo master) y TaskTracker y DataNode (para los esclavos o workers), ya podemos ejecutar Spark. Aun así, vamos a comprobar la correcta instalación de Spark iniciando uno de los ejemplos que vienen. Para ello nos vamos a la raiz donde hemos instalado spark (en nuestro caso $HOME/spark) y ejecutamos:

bin/run-example org.apachebin/run-example org.apache.spark.examples.SparkPi local 50

El resultado, como suele ser habitual, contiene un montón de mensajes de log. Veremos ir pasando cada una de las 50 iteraciones pero, en algún punto ya al final, aparecerá algo como:
Pi is roughly 3.1419392
Lo cual nos indica el éxito de la operación.

El siguiente paso es escribir nuestro ya famoso contador de palabras. En Spark se puede Scala, Java y Python. Para este tutorial vamos a usar Scala ya que es el lenguaje "nativo" para Spark y es el que más funcionalidades soporta. Mas adelante haŕe algún ejemplo también en Python.


import org.apache.spark.{SparkContext, SparkConf}

object WordCount {
  def main(args: Array[String]) {


    val conf = new SparkConf(true)
    conf.setAppName("cassandra-example-hello")
    conf.setMaster("local")

    val spark = new SparkContext(conf)

    val textFile = spark.textFile("/tmp/test")
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    counts.saveAsTextFile("/tmp/testResult")
  }
}

#Resultado
res10: Array[(String, Int)] = Array((y,1), (palabras,1), (a,2), (este,1), (Spark,1), (de,2), (contar,1), (usar,1), (capacidades,1), (las,2), (fichero.,1), (Vamos,2), (HDFS,1))

# Opcional: Guardar los contenidos del resultado en el disco
reducer.saveAsTextFile("results")


Espero que os haya gustado el tutorial. Los próximos tutoriales sobre Spark estarán orientado a la escritura de scripts, el uso de la API de Python y Java, al uso de HDFS y al uso de Apache Shark

lunes, junio 09, 2014

Instalación y primeros pasos con Apache HBase

HBase

¿Qué es HBase? Atendiendo a lo que pone en su web, es la base de datos de Hadoop: un sistema de almacenamiento escalable y distribuible para Big Data.

Instalando HBase

La instalación es sencilla y sigue los procesos habituales: descargar, descomprimir y crear una variable de entorno para el directorio de instalación:

cd $HOME
wget http://ftp.cixug.es/apache/hbase/hbase-0.98.3/hbase-0.98.3-hadoop1-bin.tar.gz
tar -zxvf hbase-0.98.3-hadoop1-bin.tar.gz
mv hbase-0.98.3-hadoop1-bin.tar.gz hbase
cd hbase

Configuración de HBase

Hbase cuenta con una carpeta conf donde se encuentran los ficheros de configuración. En este caso vamos a configurar el hbase-site.xml:


<property>
 <name>hbase.rootdir</name>
 <value>/home/fedora/hbase</value>
</property>
<property>
 <name>hbase.zookeeper.property.dataDir</name>
 <value>/home/fedora/hbase</value>
</property>


Una vez tengamos esto configurado, tenemos que asegurarnos que nuestro máquina aparece en el listado de /etc/hosts apuntando a 127.0.0.1.
127.0.0.1 localhost

Probando HBase

Ahora vamos a iniciar HBase y la Shell de HBase para probar que esté funcionando:

bin/start-hbase.sh
bin/hbase shell

#La respuesta debe ser similar a la siguiente:
$ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 0.94.19, r1588997, Tue Apr 22 00:21:01 UTC 2014

hbase(main):001:0>

#Como se puede leer, al escribir "help" podemos recibir una lista de comandos disponibles.


#Vamos a hacer un list de tablas, que debe aparecer vacío para comprobar que todo funciona correctamente
hbase(main):001:0> list
TABLE                                                                                                                                                                   
0 row(s) in 0.6230 seconds

hbase(main):002:0>

#Salimos del shell con Ctrl+C y paramos el servidor HBase
hbase(main):002:0>^C
$bin/stop-hbase.sh


Con esto ya tendríamos la instalación de HBase lista. Nos vemos en el siguiente tutorial.

viernes, junio 06, 2014

Introducción a la logística de datos con Apache Flume

Descargando e instalando Apache Flume

  1. Página web oficial: http://flume.apache.org/
  2. Release del tutorial: Apache Flume 1.5.0
cd $HOME
tar -zxvf apache-flume-1.5.0-bin.tar.gz
mv apache-flume-1.5.0-bin flume
cd flume

Terminología

  1. Source: Un source es una fuente de datos que queremos extraer. Es el punto de recogida de los datos. La ingestión se puede realizar con contenidos de un directorio (spooldir) con ejecuciones de comandos (exec) o recibiendo peticiones http (http). Link: Flume Sources
  2. Sink: Un sink es al lugar donde queremos que vaya nuestro source para usarlo. Es el punto de recepción y entrega de los datos. Esto puede incluir el log (logger), el HDFS (hdfs), el sistema local de archivos (file_roll). Link: Flume Sinks
  3. Channel: El channel es el medio por el cual se distribuye el source al sink. Este medio puede ser la propia memoria (memory), una BBDD (jdbc) o un fichero en el disco duro (file). Link: Flume Channels
  4. Agent: Finalmente, un agent es un conjunto de sources, sinks y channels y es el objetivo de este tutorial.

Creando un fichero de configuración para un Agente

Con el fichero de configuración, vamos a indicar al agente los sources, sinks y channels que vamos a usar. En nuestro ejemplo la configuración es la siguiente:
  1. Source: Nuestro source va a ser el contenido de un fichero dentro de nuestro sistema de archivos local. El fichero estará almacenado dentro de la carpeta /tmp con nombre test. Lo que vamos a hacer es un típico tail -f para leer las nuevas líneas que se escriban en el fichero
  2. Sink: El sink va a ser el log de Flume, comúnmente usado para debuggear aplicaciones. De esta manera podremos ver los nuevos contenidos del fichero /tmp/test impresos por consola
  3. Channel: El channel que vamos a usar para conectar nuestro source con nuestro sink va a ser la memoria del pc, para simplificar.
# Asignar nombres al channel, source y sink del agente
miAgente.sources = testDir
miAgente.channels = memoryChannel
miAgente.sinks = flumeLogger

# Configurar el source para hacer un tail -f al fichero /tmp/test
miAgente.sources.testDir.type = exec
miAgente.sources.testDir.command = tail -f /tmp/test
miAgente.sources.testDir.channels = memoryChannel

# Configurar el channel para usar la memoria como canal
miAgente.channels.memoryChannel.type = memory
miAgente.channels.memoryChannel.capacity = 100000
miAgente.channels.memoryChannel.transactioncapacity = 10000

# Conectar el sink y el source al channel
miAgente.sinks.flumeLogger.type = logger
miAgente.sinks.flumeLogger.channel = memoryChannel

Una vez tengamos el Agent configurado, para lanzarlo tenemos que decirle el nombre del agente con el parámetro "-n", que hemos llamado "miAgente" y el nombre del archivo que aloja la configuración del Agent con el parámetro "-f".
cd $FLUME_HOME
bin/flume-ng agent -f conf/miagente-conf.properties -n miAgente -Dflume.root.logger=INFO,console
INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/myAgent-conf.properties
INFO conf.FlumeConfiguration: Added sinks: flumeLogger Agent: myAgent
INFO conf.FlumeConfiguration: Processing:flumeLogger
INFO conf.FlumeConfiguration: Processing:flumeLogger
INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [myAgent]
INFO node.AbstractConfigurationProvider: Creating channels
INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
INFO node.AbstractConfigurationProvider: Created channel memoryChannel
INFO source.DefaultSourceFactory: Creating instance of source testDir, type exec
INFO sink.DefaultSinkFactory: Creating instance of sink: flumeLogger, type: logger
INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [testDir, flumeLogger]
INFO node.Application: Starting new configuration:{ sourceRunners:{testDir=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:testDir,state:IDLE} }} sinkRunners:{flumeLogger=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@617c66f0 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
INFO node.Application: Starting Channel memoryChannel
INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
INFO node.Application: Starting Sink flumeLogger
INFO node.Application: Starting Source testDir
INFO source.ExecSource: Exec source starting with command:tail -f /tmp/test
INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: testDir: Successfully registered new MBean.
INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: testDir started
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 73 66                         asdfadsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 64 73 66                asdfadasdsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 32 33 34 33 34 64 73 66 asdfadas23434dsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 32 33 34 33 34 64 73 66 asdfadas23434dsf }
Al principio nos da bastante información de debug. Al final nos está imprimiendo los contenidos actuales del fichero /tmp/test. Ahora, con el agente corriendo, podemos agregar información al archivo y se reflejará en el agente. Vamos a escribir el nombre del blog a ver que pasa:
echo "mariocaster.blogspot.com" >> /tmp/test
14/06/04 18:52:58 INFO sink.LoggerSink: Event: { headers:{} body: 6D 61 72 69 6F 63 61 73 74 65 72                mariocaster }


Con esto, ya podríamos definir sinks para llenar BBDD, escribir ficheros dentro del HDFS para procesar con Hadoop y mucho mas. Espero que os haya gustado.

martes, junio 03, 2014

Apache Hive: Escribiendo scripts en Hive para ejecutarlos en un cluster Hadoop

Creando el script

El script es, de nuevo, el contador de palabras que hicimos en un tutorial anterior (Primeros pasos con Apache Hive. Creando el contador de palabras en Apache Hive). El script va a importar un fichero del HDFS y creará una tabla para hacer consultas en el mismo. Luego, creará una tabla nueva con el resultado del conteo de palabras. El script es el siguiente:

CREATE TABLE IF NOT EXISTS ejemplo (linea STRING);

LOAD DATA INPATH '/user/MaC/pg4300.txt' OVERWRITE INTO TABLE ejemplo;

CREATE TABLE contador AS 
 SELECT palabra, count(1) AS cuenta 
 FROM (SELECT explode(split(linea,' ')) AS palabra FROM ejemplo) w 
 GROUP BY palabra 
 ORDER BY cuenta;

El cual, crea una tabla llamada "ejemplo" donde vuelca los contenidos del fichero pg4300.txt. De hecho se puede ver el contenido de este dentro del cluster en la ruta del warehouse de Hive:

hadoop fs -cat /user/hive/warehouse/ejemplo/pg4300.txt

También crea otra tabla llamada "contador" con las columnas "palabra" y "cuenta" que guarda el resultado del conteo de palabras.

El cluster de Hadoop

Si no tienes un cluster de Hadoop todavía o no sabes como arrancarlo, te recomiendo que te pases por este tutorial para saber el método para instalar uno: Instalando Hadoop en modo pseudo-distribuido.

Copiando datos al HDFS para probar

Vamos a usar el poema "Rime of the ancient mariner" de Samuel Taylor Coleridge (para los que no lo sepan, Iron Maiden tiene una versión del mismo). El archivo se llama pg4300.txt y tenemos que copiarlo al HDFS de la siguiente manera:

hadoop fs -copyFromLocal pg4300.txt /user/mariocaster

Ahora, la localización de nuestro fichero sería hdfs://localhost/user/mariocaster/pg4300.txt

Ejecutando el script

La ejecución del script es sencilla. Solamente hay que llamar al comando hive -f y pasarle el nombre del fichero que, en nuestro caso, hemos llamado "wordcount.q"

$ $HIVE_HOME/bin/hive -f wordcount.q

Logging initialized using configuration in jar:file:/var/hadoop/hive/lib/hive-common-0.13.0.jar!/hive-log4j.properties
OK
Time taken: 1.31 seconds
Loading data to table default.ejemplo
Table default.ejemplo stats: [numFiles=1, numRows=0, totalSize=1573150, rawDataSize=0]
OK
Time taken: 0.778 seconds
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapred.reduce.tasks=
Starting Job = job_201406031222_0005, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201406031222_0005
Kill Command = /var/hadoop/bin/hadoop job  -kill job_201406031222_0005
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2014-06-03 15:53:16,940 Stage-1 map = 0%,  reduce = 0%
2014-06-03 15:53:22,992 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.85 sec
2014-06-03 15:53:31,075 Stage-1 map = 100%,  reduce = 33%, Cumulative CPU 4.85 sec
2014-06-03 15:53:33,091 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 8.7 sec
MapReduce Total cumulative CPU time: 8 seconds 700 msec
Ended Job = job_201406031222_0005
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapred.reduce.tasks=
Starting Job = job_201406031222_0006, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201406031222_0006
Kill Command = /var/hadoop/bin/hadoop job  -kill job_201406031222_0006
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2014-06-03 15:53:40,018 Stage-2 map = 0%,  reduce = 0%
2014-06-03 15:53:43,038 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 3.34 sec
2014-06-03 15:53:51,116 Stage-2 map = 100%,  reduce = 33%, Cumulative CPU 3.34 sec
2014-06-03 15:53:53,129 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 7.18 sec
MapReduce Total cumulative CPU time: 7 seconds 180 msec
Ended Job = job_201406031222_0006
Moving data to: hdfs://localhost:54310/user/hive/warehouse/contador
Table default.contador stats: [numFiles=1, numRows=50108, totalSize=527742, rawDataSize=477634]
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 8.7 sec   HDFS Read: 1573365 HDFS Write: 1340154 SUCCESS
Job 1: Map: 1  Reduce: 1   Cumulative CPU: 7.18 sec   HDFS Read: 1340609 HDFS Write: 527821 SUCCESS
Total MapReduce CPU Time Spent: 15 seconds 880 msec
OK
Time taken: 45.941 seconds

Viendo los resultados

Si prestamos atención, podemos observar que se ha ejecutado como un trabajo MapReduce en nuestro cluster. Si queremos confirmarlo sólo tenemos que entrar en la web UI de nuestro JobTracker para ver el Job:


Para ver los resultados del map reduce, lo mejor es abrir un prompt the Hive y hacer un select:

bin/hive
hive> SELECT * FROM contador;
...
her 1505
it 1679
for 1789
on 1894
was 2005
that 2168
with 2391
I 2432
he 2712
his 3035
in 4606
to 4787
a 5842
and 6542
of 8127
 10116
the 13600
Time taken: 0.114 seconds, Fetched: 50108 row(s)

Espero que os haya gustado

Usar un cluster de Hadoop con Pig (modo distribuido)

Vamos a contar palabras de nuevo en modo distribuido con Hadoop y Pig. Para realizar este tutorial, que en realidad es tremendamente sencillo; hay que tener un cluster de Hadoop corriendo en modo distribuido (o pseudo-distribuido) y tener una instalación completa de Pig.

  1. Instalando Apache Hadoop en modo Pseudo-Distribuido
  2. Primeros pasos con Apache Pig. Usando Pig para hacer un contador de palabras
  3. Creando scripts en Apache Pig para Hadoop
Cuando tengamos el paso 3 terminado, tendremos un fichero con un script en Pig listo para ejecutarse en el cluster. Supongamos que el nombre del script es "wordcountPig.pig" y que el fichero del que queremos contar se llama "prueba.txt".

Lo primero que tendremos que hacer es arrancar el cluster de Hadoop como indica el primero de los tutoriales y copiar el fichero "prueba.txt" al HDFS


hadoop fs -mkdir /pig
hadoop fs -put prueba.txt /pig


Una vez tengamos el fichero dentro del HDFS, vamos a usar el mismo script del tercer tutorial con una ligera variación para indicarle la localización del fichero:
/* Cargar el fichero */
myfile = LOAD '/pig/tweets' AS (words:chararray);
 
/* Separar las palabras dentro de cada linea */
wordsList = FOREACH myfile GENERATE TOKENIZE($0);
 
/* Separar las palabras a una linea por palabra */
words = FOREACH wordsList GENERATE FLATTEN($0);       
 
/* Agrupar las palabras iguales en la misma linea */
groupedWords = GROUP words BY $0;
 
/* Contar las palabras */
final = FOREACH groupedWords GENERATE $0, COUNT($1);  
 
/* Ordenar las palabras */
sortedWords = ORDER final BY $1 ASC;
 
/* Guardar los resultados en una carpeta llamada pig_wordcount */
STORE sortedWords into '/pig/pig_wordcount';


Las dos líneas que han variado son la que indica la ruta para cargar el fichero y la última, que indica la ruta donde guardar el fichero con los resultados.
Por último, vamos a ejecutar el script:
pig wordcount.pig


Cabe resaltar que esta vez hemos omitido el "-x" y la palabra "local" ya que vamos a hacer una ejecución real dentro del cluster. El resultado se guarda también dentro del HDFS y es el esperado:

hadoop fs -ls /user/MaC/pig/pig_wordcount
Found 3 items
-rw-r--r--   2 mariocaster supergroup          0 2014-06-02 17:43 /pig/pig_wordcount/_SUCCESS
drwxr-xr-x   - mariocaster supergroup          0 2014-06-02 17:42 /pig/pig_wordcount/_logs
-rw-r--r--   2 mariocaster supergroup       6768 2014-06-02 17:43 /pig/pig_wordcount/part-r-00000

Podéis ver el resultado del script en el archivo part-r-00000. Espero que os haya gustado.

lunes, junio 02, 2014

Logística de datos con Apache Sqoop: Exportando datos del HDFS a MySQL

En este tutorial vamos a hacer el proceso inverso al que hicimos en el primer tutorial sobre importación de tablas SQL hacia el HDFS. En este caso vamos a exportar los datos que tenemos en el HDFS hacia una tabla en MySQL.

Preparando una tabla

Primero hay que tener una tabla preparada para la recepción de los datos con una estructura igual al de los datos que se van a recibir. Los datos que vamos a usar son los mismos que emitimos en el tutorial anterior:
1,mario
2,caster

Vamos a crear una tabla en mysql que tenga un int en la primera columna y un varchar(50) en la segunda:
CREATE TABLE `sqoop-import`( id int NOT NULL, name varchar(50) NOT NULL, PRIMARY KEY (id) );


Ya tenemos todo preparado con los archivos dentro de la carpeta "/user/mariocaster/sqoop-table/part-m-*". Para importarlos el comando a usar es el siguiente:

sqoop export --connect jdbc:mysql://localhost/sqoop-test --export-dir /user/MaC/sqoop-table --table sqoop-import --username root --password pass -m 1 --input-fields-terminated-by ',' --direct


  1. El comando que tenemos que usar es sqoop import para importar bases de datos
  2. Se le pasa el argumento --connect jdbc:mysql://[host]/[bbdd] para indicarle el host y la bbdd de la que queramos importar una tabla
  3. Ahora le indicamos el directorio del HDFS que vamos a exportar a la BBDD, en este caso con el argumento --export-dir /[ruta]
  4. Después le indicamos el nombre de la tabla a donde vamos a importar los datos, la hemos llamado "sqoop-import". El argumento a usar es: -table sqoop-import
  5. En el caso de la BBDD que tenemos localmente, hay que usar unas credenciales para permitir el acceso. Se las pasamos con: --username [user] y --password [pass]
  6. Con -m 1 le indicamos que solamente queremos usar un mapper para realizar la exportación
  7. --input-fields-terminated-by ','  es usado para indicarle al script que los campos de los archivos del HDFS están separados por ',' (coma)
  8. Activado el "modo directo" con --direct para MySQL que nos permite obtener unas operaciones mas rápidas haciendo uso de la herramienta mysqlimport


Podemos ver nuestros conocidos logs de MapReduce y es que, Sqoop, usa MapReduce en segundo plano para ejecutar sus consultas (de una manera similar a Hive y Pig)

Una vez ejecutada la importación, vamos a usar un prompt the mysql para ver que datos se han cargado dentro de la tabla.

mysql>USE `sqoop-test`;
mysql>SELECT * FROM `sqoop-import`;
+----+--------+
| id | name   |
+----+--------+
|  1 | mario  |
|  2 | caster |
+----+--------+
2 rows in set (0.00 sec)


Con esto ya tenemos, los datos del HDFS de vuelta en la BBDD de MySQL. Espero que os haya gustado el tutorial.

viernes, mayo 30, 2014

Logística de datos con Apache Sqoop: Importando bases de datos al HDFS

Exportando BBDD SQL con Apache Sqoop


cd $HOME
wget http://apache.rediris.es/sqoop/1.4.4/sqoop-1.4.4.bin__hadoop-0.23.tar.gz
tar -zxvf sqoop-1.4.4.bin__hadoop-0.23.tar.gz
Vamos a crear una tabla en MySQL muy básica que se llame "sqoop-table" que contenga ID y un nombre de usuario simplemente dentro de la DDBB "sqoop-test"

CREATE TABLE `sqoop-table` ( id int NOT NULL, name varchar(255) NOT NULL, PRIMARY KEY (id) );
INSERT INTO `sqoop-table` VALUES (1, "mario");
INSERT INTO `sqoop-table` VALUES (2, "caster");
+------+-------+
| id   | name  |
+------+-------+
| 1    | mario |
| 2    | caster|
+------+-------+


Un problema que nos podemos encontrar hoy dia es que la instalación de Sqoop no incluya el driver jdbc de mysql ya que este fue sacado de la carpeta "lib" para poder mantener el proyecto con licencia Apache. El driver, se encuentra en "/usr/share/java/mysql-connector-java" así que simplemente vamos a hacer un link a dicha libreria con el comando "ln" desde la carpeta de Sqoop:

ln -s /usr/share/java/mysql-connector-java.jar lib/


Ya estamos listos, lo que vamos a hacer es importar la tabla con valores por defecto de Sqoop, esto nos va a generar un archivo por cada mapper con una linea por fila y con los valores separados por comas. Para simplificarlo vamos a usar un único mapper:

bin/sqoop import --connect jdbc:mysql://localhost/sqoop-test --table sqoop-table -m 1 --username root --password mypass


Vamos a explicar paso a paso la siguiente ejecución:

  1. Llamamos a la orden import ya que es la acción que queremos realizar
  2. --connect jdbc:mysql://localhost/sqoop-test Le decimos cómo encontrar nuestra base de datos: usando el driver JDBC de MySQL para Java y le pasamos la URI de la BBDD
  3. Le indicamos que tabla queremos usar de la BBDD: --table sqoop-table
  4. -m 1: Se le indica el número de Mappers a usar. Con un único mapper tendremos un único output y, ya que el script no va a usar reducers, también nos generará un único archivo.
  5. --username root: Lo normal es tener acceso restringido a la BBDD así que le pasamos las credenciales de acceso, en nuestro caso es root.
  6. --password mypass: Y le pasamos la pass para acceder a la BBDD.
Tras ejecutar la orden, un jar de MapReduce se generará y ejecutará de la manera habitual en la consola. El chorreo de logs debería ser conocido ya a estas alturas:
INFO mapred.JobClient: Running job: job_201405291246_0003
INFO mapred.JobClient:  map 0% reduce 0%
INFO mapred.JobClient:  map 100% reduce 0%
INFO mapred.JobClient: Job complete: job_201405291246_0003


Bien, nuestra tabla ya se encuentra en el HDFS en un único fichero en formato csv. Vamos a ver su contenido:

hadoop fs -ls /user/mariocaster
drwxr-xr-x   - mariocaster supergroup          0 2014-05-29 13:31 /user/mariocaster/sqoop-table

hadoop fs -ls /user/mariocaster/sqoop-table
-rw-r--r--   2 mariocaster supergroup          0 2014-05-29 15:23 /user/MaC/sqoop-table/_SUCCESS
drwxr-xr-x   - mariocaster supergroup          0 2014-05-29 15:22 /user/MaC/sqoop-table/_logs
-rw-r--r--   2 mariocaster supergroup         17 2014-05-29 15:22 /user/MaC/sqoop-table/part-m-00000

hadoop fs -cat /user/mariocaster/sqoop-table/part-m-00000
1,mario
2,caster


Como podéis apreciar, ya tenemos el contenido de nuestra base de datos como un fichero dentro del HDSF. Esperemos que hayáis disfrutado del tutorial. El siguiente tutorial sobre Sqoop será cómo exportar datos desde el HDFS a una MySQL. ¡Saludos!

jueves, mayo 29, 2014

Real Time Analytics con Apache Storm: Un contador de palabras en tiempo real

Introducción:

La manera en la que va a funcionar nuestro contador de palabras en Storm es ligeramente diferente a como funcionaba en Hadoop MapReduce pero esencialmente similar.

La idea de los Spouts y Bolts es la de dividir el trabajo de manera similar a como lo hacen los mappers y reducers de Hadoop MapReduce. En este caso, un spout se va a encargar de recopilar la información y pasársela a una cadena de Bolts que se van a encargar de procesarla de la manera que le indiquemos. Cabe destacar que, para hacer este tutorial no es necesario tener un cluster de Storm instalado ya que vamos a ejecutar la topología en modo local

Arquitectura

La manera en que vamos a organizar los Spouts y Bolts es la siguiente:
  1. Spout: LectorLineasSpout: se va a encargar de leer un fichero que le pasemos como argumento y de ir cada linea del archivo de texto a un Bolt
  2. Bolt 1: SeparadorPalabras: A este bolt le van a llegar las líneas que emita el spout anterior y se va a encargar de extraer las palabras de cada línea y emitirlas a un segundo bolt
  3. Bolt 2: ContadorPalabras: Este último Bolt va a recibir la emisión de palabras del SeparadorPalabras y las irá agregando a un HashMap que usaremos como contador.
  4. Topologia.java: Esta última clase es la que junta y da coherencia a todas las anteriores. Aquí definiremos el orden de ejecución (Spout, Bolt1, Bolt2), el modo de ejecución (local o cluster), etc. etc.

Creando el proyecto en Maven

Para simplificar un poco la creación del proyecto y la gestión de las dependencias, vamos a usar maven. Vamos a ejecutar los siguientes comandos en orden para crear un directorio para el proyecto, y creamos un "pom" de Maven.:


cd $HOME
mkdir contador-storm
cd contador-storm
touch pom.xml

En el POM, vamos a poner lo siguiente:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>mariocaster.blogspot.com.hadoop</groupId>
  <artifactId>storm-word-count</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>2.3.2</version>
    <configuration>
     <source>1.6</source>
     <target>1.6</target>
     <compilerVersion>1.6</compilerVersion>
    </configuration>
   </plugin>
  </plugins>
 </build>
 
 <dependencies>
  <!-- Storm Dependency -->
  <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-core</artifactId>
   <version>0.9.1-incubating</version>
      <scope>compile</scope>
  </dependency>
 </dependencies>
 
 <repositories>
     
     <repository>
        <id>clojars.org</id>
        <url>http://clojars.org/repo</url>
     </repository>
     
  </repositories>
</project>

Escribiendo el Spout

Escribir el Spout es bastante sencillo, tenemos que hacerle leer un fichero línea por línea y hacer que emita cada una de esas líneas. Además tenemos que indicarle un nombre para el dato que estamos emitiendo que, en este caso, va a ser "linea".


public class LectorLineasSpout extends BaseRichSpout
{
 /**
  * 
  */
 private static final long serialVersionUID = 6785329165603525275L;
 private boolean completado = false;
 private FileReader lectorFichero;
 private SpoutOutputCollector colector;
 private TopologyContext ctx;

 /**
  * Este es el primer método que se llama en el spout
  * 
  * En este caso se encarga de abrir el archivo del que
  * vamos a leer
  */
 @Override
 public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, 
   SpoutOutputCollector colector) 
 {
  try 
  {
   this.setCtx(ctx);
   
   //Creamos un lector de ficheros
   this.lectorFichero = new FileReader(
     map.get("archivoPalabras").toString());
  } catch (FileNotFoundException e) {
   //Bloque de ejecución en caso de que el archivo
   //no exista o haya algún error en la apertura
   throw new RuntimeException(
     "Error reading file [" + map.get("wordFile")+"]"
   );
  }
  this.colector = colector;
 }

 /**
  * Este método es el que lee cada línea del fichero
  * de entrada y se encarga de emitirlas para ser
  * procesadas por los bolt
  */
 @Override
 public void nextTuple() 
 {
  //Comprueba si el fichero se ha terminado de leer
  if(this.completado)
  {
   //Esperar antes de intentar leer el fichero de nuevo
   try { Thread.sleep(1000);}
   catch (InterruptedException e) {}
  }
  
  //Creamos un lector de ficheros para leer linea por linea
  String linea;
  BufferedReader lector = new BufferedReader(this.lectorFichero);
  try 
  {
   while((linea = lector.readLine()) != null)
   {
    //Emitimos cada linea por el colector para
    //que los bolts las procesen
    this.colector.emit(new Values(linea), linea);
   }
  } catch (Exception e){
   throw new RuntimeException("Error leyendo", e);
  } finally {
   //Indicamos que el fichero se ha terminado de leer
   completado = true;
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  declarador.declare(new Fields("linea"));
 }

 public TopologyContext getCtx() {
  return ctx;
 }

 public void setCtx(TopologyContext ctx) {
  this.ctx = ctx;
 }

}

Escribiendo el Bolt 1: SeparadorPalabras

Pasamos a escribir el primer Bolt, el SeparadorPalabras. Este Bolt recibe líneas con palabras que se tiene encargar de separar y limpiar de espacios en blanco, caracteres de puntuación y números:


public class SeparadorPalabrasBolt extends BaseRichBolt 
{
 /**
  * 
  */
 private static final long serialVersionUID = -505615409788951751L;
 private OutputCollector colector;

 /**
  * El bolt recibe una linea de texto por llamada
  * 
  * Lo que hace es separar las palabras, quitarles
   * todo lo que no sean letras de la "a" a la "z"
   * y pasarlo todo a minúsculas
  * 
  */
 @Override
 public void execute(Tuple lineaEntrada)
 {
  //Creamos un array de palabras con la linea de entrada
  String linea = lineaEntrada.getString(0);
  String[] palabras = linea.split(" ");
  
  //Recorremos todas las palabras 
  //para pasarlas al siguiente bolt
  for(String palabra: palabras)
  {
   //"Limpiamos" la palabra de espacios, caracteres y números
   palabra = palabra.toString().replaceAll("[^A-Za-z\\s]", "");
   
   if(!palabra.isEmpty())
   {
    palabra = palabra.toLowerCase();
    
    //Emitir la palabra al siguiente bolt
    this.colector.emit(new Values(palabra));
   }
  }
  
  //Ack a la tupla
  colector.ack(lineaEntrada);
 }

 /**
  * Este método se llama el primero dentro de la clase
  * Prepara el bolt para los datos que va a tener que usar
  */
 @Override
 public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, OutputCollector colector)
 {
  this.colector = colector;
 }

 /**
  * 
  */
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  declarador.declare(new Fields("palabra"));
 }

}

Escribirendo el Bolt 2: ContadorPalabras:

Nuestro último Bolt, el contador de palabras, se va a encargar de recoger las palabras emitidas por el primer bolt y agregarlas a un diccionario clave/valor que vamos a tener a tal efecto. Un trabajo sencillo:


public class ContadorPalabrasBolt extends BaseRichBolt 
{
 /**
  * 
  */
 private static final long serialVersionUID = -9211454481547491256L;
 private String nombre;
 private Integer id;
 private OutputCollector colector;
 private Map cuenta;
 
 @Override
 public void execute(Tuple palabraEntrada) 
 {
  String palabra = palabraEntrada.getString(0);
  
  /**
  * Comprueba si existe ya la palabra en el Map
  * y la crea si no existe todavia
  */
  if(!this.cuenta.containsKey(palabra))
  {
   this.cuenta.put(palabra, 1);
  } else {
   Integer c = this.cuenta.get(palabra) + 1;
   this.cuenta.put(palabra, c);
  }
  
  //Ack a la tupla
  colector.ack(palabraEntrada);
 }

 @Override
 public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext ctx, OutputCollector colector)
 {
  this.cuenta = new HashMap();
  this.colector = colector;
  
  this.nombre = ctx.getThisComponentId();
  this.id = ctx.getThisTaskId();
 }

 /**
  * Al finalizar el Spout, mostrar por consola la
  * cuenta de palabras
  */
 @Override
 public void cleanup() {
  super.cleanup();
  System.out.println(
    "-- Contador palabras [" + nombre + "-" + id + "] --");
  
  for(Map.Entry resultado: cuenta.entrySet())
  {
   System.out.println(
     resultado.getKey() + ", " + resultado.getValue());
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarador)
 {
  //Ultimo bolt que no llega a emitir nada mas
 }

}

Juntándolo todo: Escribiendo la topología

Por último, nos quedaría escribir la clase con la topología con toda la configuración. Esta clase se divide en tres fases:
  1. Definir la topología
    1. Aquí le indicamos el spout y el orden de ejecución de los bolts.
    2. Además le vamos a indicar el tipo de agrupación. Si os fijáis hay dos tipos de agrupación:
      1. shuffleGrouping: Donde cada emisión es lanzada a un bolt aleatorio
      2. fieldsGrouping: Donde emisiones iguales son enviadas al mismo bolt. De esta agrupamos los resultados de palabras de manera parecida a cuando hacemos un "GROUP BY" en SQL (en realidad, el problema radica en la naturaleza de la aplicación distribuida que, en caso de que la misma palabra no llegue al mismo Bolt, se haría una cuenta por separado de esa palabra en ese Bolt y en el resultado final nos aparecería dicha palabra varias veces, una por cada nodo Bolt que le haya llegado la palabra).
  2. Creamos un objeto de configuración clave/valor donde le asignamos a la clave "archivoPalabras" el argumento que le hayamos pasado al ejecutar la aplicación
  3. Arrancamos la topología indicando que vamos a hacer una prueba en local y le damos un nombre a la topología.

public class Topologia
{
 public static void main(String[] args) throws InterruptedException
 {
  //Definicion de la topología
  TopologyBuilder constructor = new TopologyBuilder();
  constructor.setSpout("lector-lineas", new LectorLineasSpout());
  constructor.setBolt("separador-palabras", new SeparadorPalabrasBolt())
   .shuffleGrouping("lector-lineas");
  constructor.setBolt("contador-palabras", new ContadorPalabrasBolt())
   .fieldsGrouping("separador-palabras", new Fields("palabra"));
  
  //Configuracion
  Config conf = new Config();
  conf.put("archivoPalabras", args[0]);
  conf.setDebug(false);
  conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
  
  //Arrancar la topología
  LocalCluster cluster = new LocalCluster();
  cluster.submitTopology(
    "Mi-primera-topología", 
    conf, 
    constructor.createTopology()
   );
  Thread.sleep(1000);
  cluster.shutdown();
 }
}
Para ejecutar nuestra topología, sólo tenemos que ejecutar en la linea de comandos:

mvn compile
mvn exec:java -Dexec.mainClass="topology.Topologia" -Dexec.args="res/texto"
El resultado es un montón de información escrita con log4j donde, en algún punto, podréis ver lo siguiente:

-- Contador palabras [contador-palabras-2] --
palabras, 2
a, 1
contar, 1
ver, 1
un, 1
cuantas, 1
dentro, 1
de, 1
sus, 1
ejemplo, 1
hay, 1
fichero, 2
estan, 1
del, 2
que, 1
este, 1
vamos, 1
es, 1
y, 1

En el siguiente tutorial sobre Storm, haremos una instalación de un cluster para poder lanzar el trabajo en él. Un saludo!

miércoles, mayo 28, 2014

Instalando un servidor de Zookeeper

Descargando e instalando Zookeeper

Para descargar y descomprimir la release, hay que usar los comandos a continuación.

cd $HOME
wget 'http://apache.rediris.es/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz'
tar -zxvf zookeeper-3.4.6.tar.gz
mv zookeeper-3.4.6.tar.gz zookeeper
cd zookeeper
export ZOOKEEPER_HOME=$HOME/zookeeper

Para los propósitos de uso de Storm, con un Zookeeper en modo single sería suficiente, no obstante es recomendable usar al menos tres. Para mantener el tutorial simple, vamos a hacer la configuración para usar un único Zookeeper.

El siguiente paso a realizar, es crear un fichero de configuración de Zookeeper. El propio Zookeeper viene con un fichero de ejemplo que con renombrarlo ya lo tendremos todo hecho para el modo single node:


mv conf/zoo_sample.conf conf/zoo.conf

Lo último que tenemos que hacer para arrancar el servidor es ejecutar el fichero "zkServer.sh start" dentro de la carpeta bin que iniciará el servidor en segundo plano:


$ZOOKEEPER_HOME/bin/zkServer.sh start

¡Hecho! Ya tenemos el servidor Zookeeper corriendo ahora vamos a hacer alguna prueba para comprobar que esté funcionando sin problemas. Para ello usaremos el cliente Zookeeper que viene con la release descargada y le indicaremos el servidor:puerto donde se encuentra el servidor de Zookeeper (NOTA: si no hemos tocado nada de la configuración del servidor de Zookeeper, este viene predeterminado en el puerto 2182):

$ZOOKEEPER_HOME/bin/zkCli.sh -server 127.0.0.1:2182

Nos saldrán unos mensajes de log y un prompt. Si escribimos help podemos ver una serie de comandos que podemos ejecutar en el terminal.
Welcome to ZooKeeper!
2014-04-27 22:56:38,225 [myid:] - INFO  [main-SendThread(172.17.0.2:2181):ClientCnxn$SendThread@975] - Opening socket connection to server 172.17.0.2/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2014-04-27 22:56:38,234 [myid:] - INFO  [main-SendThread(172.17.0.2:2181):ClientCnxn$SendThread@852] - Socket connection established to 172.17.0.2/172.17.0.2:2181, initiating session
[zk: 172.17.0.2:2181(CONNECTING) 0] 2014-04-27 22:56:38,655 [myid:] - INFO  [main-SendThread(172.17.0.2:2181):ClientCnxn$SendThread@1235] - Session establishment complete on server 172.17.0.2/172.17.0.2:2181, sessionid = 0x145a4f801ef0000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

[zk: 172.17.0.2:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
 connect host:port
 get path [watch]
 ls path [watch]
 set path data [version]
 rmr path
 delquota [-n|-b] path
 quit 
 printwatches on|off
 create [-s] [-e] path data acl
 stat path [watch]
 close 
 ls2 path [watch]
 history 
 listquota path
 setAcl path acl
 getAcl path
 sync path
 redo cmdno
 addauth scheme auth
 delete path [version]
 setquota -n|-b val path

Ya tenemos nuestro cliente y prompt abierto. Vamos a crear un nuevo znode llamado maricaster y asignarle el nombre de este blog (mariocaster.blogspot.com)

[zk: 172.17.0.2:2181(CONNECTED) 1] create /mariocaster mariocaster.blogspot.com 
Created /mariocaster
[zk: 172.17.0.2:2181(CONNECTED) 2] ls /
[mariocaster, zookeeper]

Si hacemos "ls /", uno de los comandos básicos para listar znodes, podemos ver que tenemos un nuevo znode llamado "mariocaster". Ahora vamos a ver el contenido del znode con la orden "get":


[zk: 172.17.0.2:2181(CONNECTED) 3] get /mariocaster
mariocaster.blogspot.com
cZxid = 0x2
ctime = Sun Apr 27 22:59:32 CEST 2014
mZxid = 0x2
mtime = Sun Apr 27 22:59:32 CEST 2014
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 24
numChildren = 0
[zk: 172.17.0.2:2181(CONNECTED) 4]

Como podemos ver en el resultado, aparece el nombre del blog. Ahora vamos a cambiar el valor para agregarle el http:// que hemos olvidado con la orden "set"

[zk: 172.17.0.2:2181(CONNECTED) 4] set /mariocaster http://mariocaster.blogspot.com
cZxid = 0x2
ctime = Sun Apr 27 22:59:32 CEST 2014
mZxid = 0x3
mtime = Sun Apr 27 23:03:41 CEST 2014
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 31
numChildren = 0
[zk: 172.17.0.2:2181(CONNECTED) 5] get /mariocaster
http://mariocaster.blogspot.com
cZxid = 0x2
ctime = Sun Apr 27 22:59:32 CEST 2014
mZxid = 0x3
mtime = Sun Apr 27 23:03:41 CEST 2014
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 31
numChildren = 0
[zk: 172.17.0.2:2181(CONNECTED) 6]

Hemos hecho un get al final para comprobar el valor actual del znode que, como podemos comprobar, efectivamente ha cambiado. Por último vamos a borrar este znode recién creado:

[zk: 172.17.0.2:2181(CONNECTED) 6] delete /mariocaster
[zk: 172.17.0.2:2181(CONNECTED) 7] ls /      
[zookeeper]
[zk: 172.17.0.2:2181(CONNECTED) 8]

Y con esto termina el tutorial introductorio a Zookeeper, herramienta que vamos a necesitar para la instalación de Apache Storm, motor para aplicaciones de tiempo real distribuidas que veremos en otro tutorial

lunes, mayo 26, 2014

Primeros pasos con machine learning usando Mahout... Un recomendador de productos

Recomendador de productos

Lo que vamos a hacer con Apache Mahout es el "Hello World" del machine learning: un recomendador de productos. En esencia, un recomendador de productos compara los productos que le gustan a un usuario con usuarios que gustos similares para recomendar al primero un producto similar basándose en los gustos del segundo.

Dicho con un ejemplo: Pongamos que:

A Jaimito le gustan los Refrescos de Cola y no le gustan los Batidos de chocolate.
A Pepito le gustan los Refrescos de Naranja y no le gustan los Batidos de Vainilla.
Entonces nuestro programa de machine learning podría decir que a Jaimito le podrían gustar los refrescos de Naranja, ya que le gustan los refrescos, por lo que le recomendaríamos la Fanta Naranja.

Esto, aunque sea una explicación un poco arcaica e inexacta, es una manera fácil y rápida de entender cómo funciona un recomendador de productos a través de machine learning. Ya habrá tiempo de ponerse purista :D

Descargando e instalando Mahout

  1. Web oficial: http://mahout.apache.org/
  2. Release a utilizar 0.9

cd $HOME
tar -zxvf mahout-distribution-0.9.tar.gz

Creando la clase del recomendador


public class MlMain {

 public static void main(String[] args) throws IOException, TasteException {
  //String pathToCsvFile = "/var/hadoop/examples-files/input/ml-dataset";
  String pathToCsvFile = "/var/hadoop/examples-files/input/ml-node-gen";
  
  SampleRecommender sampleRecommender = new SampleRecommender(pathToCsvFile);
  
  for(int i=1;i<=10;i++){
   List recommendationsList = sampleRecommender.getRecommendations(i,2);
   
   for (RecommendedItem recommendation : recommendationsList) {
    System.out.println(
      "User:" + i + 
      ", recommended item: " + recommendation.getItemID() + 
      " (value: " + recommendation.getValue() + ")");
   } 
  }
 }
 
 static class SampleRecommender {
  private DataModel model = null;
  private UserBasedRecommender recommender = null;
  
  public SampleRecommender(String path) throws IOException, TasteException
  {
   model = new FileDataModel(new File(path));
   UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
   UserNeighborhood neighborhood = new ThresholdUserNeighborhood(0.1, similarity, model);
   recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);
  }
  
  public List getRecommendations(int user, int itemNumber) throws TasteException
  {
   if(this.model!=null && this.recommender!=null)
    return recommender.recommend(user, itemNumber);
   else
    return null;
  }
 }
}

Si nos fijamos un poco, podemos ver claramente que el "core" del recomendador se encuentra dentro del constructor llamado SampleRecommender. Tiene 4 lineas:

  1. Primero cargamos el dataset y creamos el modelo del mismo
  2. Le indicamos a Mahout qué algoritmo queremos usar para hacer comparaciones de las interacciones que tienen los usuarios con los distintos productos. El método que usamos aquí es el de coeficiente de correlación de Pearson con el cual le indicaremos el umbral de correlación que queremos utilizar. Mirando el link de wikipedia podemos ver que:
    1. Si r = 1, existe una correlación positiva perfecta. 
    2. Si 0 < r < 1, existe una correlación positiva.
    3. Si r = 0, no existe relación lineal pero podría ser no lineal
  3. Vamos a usar un umbral de correlación de 0.1 de tal manera que con que la correlación se levemente positiva, consideraremos que el producto se puede recomendar
  4. Por último, pasamos todos las variables previamente generadas para recoger nuestro generador.
Cuando ya tenemos el generador, lo único que tenemos que hacer es preguntarle sobre el usuario del que queramos sacar una recomendación y cuantas recomendaciones queremos obtener con el método recommender.recommend(user, itemNumber);

El resultado que podríais obtener es como el siguiente:

User:4, recommended item: 2 (value: 14.750008)
User:5, recommended item: 2 (value: 12.130403)
User:7, recommended item: 4 (value: 17.72526)
User:8, recommended item: 3 (value: 11.70087)
User:9, recommended item: 4 (value: 15.584593)
User:9, recommended item: 1 (value: 13.836835)

Los resultados obtenidos, al haber usado un dataset generado aleatoriamente, serán distintos cada vez que generemos un dataset nuevo. Además, dada la aleatoriedad del "algoritmo" de generación de datasets, es incluso posible que los resultados no guarden la relación necesaria para poder usarlos con el recomendador pero, con un par de intentos debería generar un dataset válido.

En el siguiente tutorial sobre Mahout, vamos a distribuir el recomendador en el cluster de Hadoop.

martes, mayo 20, 2014

Usando Amazon EMR para ejecutar un Hadoop MapReduce

AWS Elastic MapReduce (EMR)

Lo que vamos a hacer es usar las capacidad de Amazon EMR (Elastic Map Reduce) para ejecutar nuestro contador de palabras sin tener que preocuparnos de instalaciones de clusters.

NOTA IMPORTANTE:

Amazon EMR NO es gratuito y no entra dentro de las posibilidades de la AWS Free Tier por lo que realizar este tutorial, aunque cuesta menos de 1€, no es gratuito. Al final del todo pondré una captura de pantalla del coste total que fueron 0.21€

Creando un bucket S3 y subiendo los archivos

Lo primero que tendremos que hacer es crear un bucket S3 para subir archivos dentro. S3 es un sistema de almacenamiento en la nube que, con la AWS Free tier nos otorga 5gb de almacenamiento gratuito. Habrá que meterse en la cuenta de AWS y seleccionar, dentro de los servicios, el S3:


Una vez en la ventana de S3, hay que hacer click en "Create bucket":

Y darle un nombre al bucket:

Subir los archivos al bucket

Ahora que ya tenemos un bucket de almacenamiento, hay que seleccionar "Actions-Upload" para seleccionar los dos archivos que queremos subir, uno de ellos el fichero del que queremos contar las palabras y otro el JAR que contiene el Wordcount que escribimos en el tutorial de 



Para este caso, he subido una versión del Quijote en texto plano que encontré por la red y el JAR con el Wordcount . El bucket debe presentar este aspecto:

Creando el cluster

Paso 1: Cluster configuration

Ahora, seleccionamos el servicio de Amazon EMR de la selección de servicios. En la ventana que nos aparece hay que seleccionar "Create cluster":

Primer paso para la creación de un cluster



Atención al botón de "Configure sample application". Como se puede intuir, este botón nos abre otra ventana para poder ejecutar el cluster con algunos trabajos de ejemplo (entre ellos el propio contador de palabras). Como lo que queremos es aprender como se hace, vamos a evitar usar el botón.

Paso 2: Software configuration

Una vez le hemos dado un nombre al cluster y hemos seleccionado una ubicación en nuestro S3 para el almacenamiento de los logs, pasamos a seleccionar la AMI y versión de Hadoop a usar:
Vamos a usar la version 2.4.2 de la AMI con hadoop 1.0.3. Como podeis ver, también nos da la opción de instalar Hive o Pig. Para los propósitos de este tutorial no será necesario pero no pasa nada si se instalan.

Paso 3: Hardware configuration

Pasamos al siguiente paso que va a ser la configuración de acceso. Para los propósitos de este tutorial vamos a usar una instancia small para el contenedor del Namenode, el SecondaryNamenode y el JobTracker y 2 instancias small con un DataNode y un TaskTracker cada una. Por último, aunque no es estrictamente necesario, hay que seleccionar una clave PEM para el acceso a las máquinas. A nosotros no nos va a hacer falta ya que sólo vamos a arrancar las máquinas para hacer el conteo y se van a parar al finalizar.

Paso 4: Steps

El último de los pasos para la creación del cluster son los "Steps", que son los trabajos a realizar. Nuestro step va a ser ejecutar el Wordcount que vendrá configurado automáticamente (lo seleccionamos en un paso anterior cuando abrimos el cuadro de diálogo de "Configure Sample Application").


Debemos editar el paso para indicarle el fichero del que queremos contar el número de palabras. En la nueva ventana que aparece, sólo tendremos que abrir el cuadro de diálogo de "Input S3 Location" y seleccionar el fichero de "quijote.txt" como se puede apreciar a continuación:
Asegurarse de tener elegido "Terminate cluster" en "Action on Failure"

Navegar y seleccionar el archivo a contar palabras

Una vez tengamos todo, seleccionamos "Select" dentro de la ventana de "Select S3 Folder" y "Save" en la ventana de "Add Step". Ya está el trabajo y el cluster configurado para ejecutar.

Finalmente hacemos click en "Create cluster" y nuestro MapReduce ya estará corriendo.

Algunos aspectos básicos de AWS EMR: Si por alguna circunstancia hay un error de configuración dentro del cluster, no se cargará ningún gasto a vuestra cuenta. Sólo se carga cuando el MapReduce consigue iniciarse, en cuyo caso, si hay error; sí que habrá un pequeño cargo.

Resultado


Los resultados de la ejecución los podremos ver en nuestro S3, dentro de la carpeta que venía configurada  en el "Output S3 folder" dentro de la ventana de "Add Step". En la siguiente imagen podéis ver en el resultado del Wordcount dentro del bucket S3:


Notas finales

Como se puede apreciar, la ejecución de una tarea EMR en AWS es realmente sencilla, mucho mas que crear nuestro propio cluster, nuestro propio MapReduce, etc. Esto demuestra las tremendas posibilidades de usar AWS EMR para la ejecución de docenas o cientos de "Workers" a sólo unos clicks de ratón. A mi, personalmente, me dejó impresionado, sobre todo teniendo en cuenta que también se pueden ejecutar scripts Pig y Hive con la misma facilidad.

lunes, mayo 19, 2014

Instalando Hadoop en modo distribuido

Hadoop "Fully Distributed"

Aunque pueda asustar, la instalación de un sistema distribuido de Hadoop no es tan complicado como pueda parecer. En realidad, siguiendo el tutorial sobre como instalarlo en modo pseudo-distribuido ya tenemos la mayor parte del trabajo hecho: Instalando Hadoop en modo pseudo-distribuido

Las diferencias  mas simples entre el modo distribuido y el modo pseudo distribuido son las siguientes:
  1. El nodo NameNode debe poder acceder por SSH sin contraseña a todos los nodos esclavos y a si mismo. (no es necesario que los nodos esclavos puedan acceder al nodo maestro).
  2. Todos los nodos deben compartir la misma configuración de Hadoop (el contenido de "conf", "etc/conf", o "/etc/hadoop" dependiendo de la versión).
  3. La reglas de Firewall deben permitir la comunicación via TCP entre todos los tipos de nodos en modo bidireccional

Comenzando

Lo primero, vamos a usar dos portátiles que vamos a llamar E1 y E2. Tenemos que instalar Hadoop en los dos equipos como si fuera modo pseudo-distribuido usando el tutorial del link anterior. Ahora lo importante es cambiar las direcciones IP's por direcciones reales dentro de nuestra LAN por lo que, si nuestro localhost es el 192.168.1.10 dentro de nuestra LAN, tendremos que usar esta IP.

Una vez tengamos los dos equipos con Hadoop en modo pseudo-distribuido vamos a ver como dividimos las tareas entre los dos equipos:
  1. E1: El equipo 1 va a ser el nodo "Maestro", por tanto vamos a tener en ejecución los siguientes hilos:
    1. Namenode
    2. SecondaryNamenode
    3. JobTracker
  2. E3: El Equipo 2 va a ser el nodo "Worker" por lo que las tareas en ejecución serán las siguientes:
    1. TaskTracker
    2. DataNode
Entraremos a la consola del E1, nuestra IP es 192.168.1.10. Vamos a editar, dentro de la carpeta de Hadoop, el fichero de configuración de esclavos donde vamos a indicar la IP del E2 (192.168.1.20). Dicho fichero lo encontraremos en conf/slaves:

192.168.1.20
Además, también hay que indicar la máquina que va a tener el SecondaryNamenode, para ello tenermos que editar el fichero conf/masters para agregrar la nuestra IP (la IP del E1):

192.168.1.10

NOTA: Aunque el fichero se llame "masters", no indica las máquinas donde se van a alojar el NameNode ni el Jobtracker, quedando estas configuraciones a disposición del fichero mapred-site.xml y del nodo que arranque el NameNode (en nuestro caso será el E1)

Configurar el acceso SSH

El E1 debe tener acceso por SSH sin contraseña al E2 y a sí mismo. Si aún no sabes como conseguir esto, está todo explicado en el tutorial para instalar Hadoop en modo pseudo-distribuido.

Configurar el mapred-site y el hdfs-site

Ahora, tendremos que tener comunes los ficheros hdfs-site y mapred-site a lo largo de nuestro cluster. En mi experiencia he encontrado bastante sencilla la sincronización a través de un repositorio git como puede ser Github pero cualquier solución es igualmente válida:







	
		dfs.replication
		2
	







    mapred.job.tracker
    192.168.1.10:54311




En el primero, el hdfs, estamos diciéndole que cada bloque de información lo duplique a lo largo del cluster (dfs-replication=2). En el segundo le estamos pasando la IP del E1 que es el que va a ejecutar el JobTracker.

Arrancando el cluster

Por último, hay que arrancar el cluster, desde el E1 y el directorio de instalación de Hadoop ejecutaremos:

NameNode
JobTracker
SecondaryNameNode

Y, si nos vamos al E2, podremos ver los siguientes:
DataNode
TaskTracker

Si resulta que te encuentras el Datanode y el Tasktracker dentro de la E1 es probablemente porque todavía conservas la IP de1 dentro del fichero conf/slaves.

Siguientes pasos, ¡instalar Hadoop en todas las máquinas que tenga por casa!

viernes, mayo 09, 2014

Escribiendo un Hadoop MapReduce en Java: Un Wordcount mejorado

En este tutorial, vamos a crear nuestro propio MapReduce contador de palabras y luego lo vamos a usar en nuestro cluster pseudo-distribuido de Hadoop.

Un trabajo Map Reduce se divide en 2 clases (Mapper y Reducer) mas el método de entrada "main". En este caso, el mapper va a crear un listado del tipo y el reducer va a "reducir" el listado de valores a un único valor que, en nuestro caso, será el número de ocurrencias de la clave.

/**
 *  Mapper.
 *  Recibirá, línea tras línea (el objeto Text), los contenidos del fichero que
 *  le pasemos por los argumentos.
 *
 *  Vamos a mejorar el Mapper inicial quitando todos los caracteres no alfabéticos
 *  de la cadena de entrada para evitar que, por ejemplo, "casa" y "casa." se
 *  contabilicen como palabras distintas
 */
public static class WordCountMapper extends Mapper {
  private Text word = new Text();

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException 
  {
    //Escribimos una expresión regular para eliminar todo caracter no alfabético
    String cleanString = value.toString().replaceAll("[^A-Za-z\\s]", "");

    //Ahora cogemos el texto limpio y lo separamos por espacios
    StringTokenizer token = new StringTokenizer(cleanString);
    
    //Recorremos el tokenizer para recoger todas las palabras hasta que no haya mas
    while(token.hasMoreTokens())
    {
      //Palabra actual
      String tok = token.nextToken();

      //Asignar la palabra al objeto que se va a pasar al reducer
      word.set(tok);

      /*
       * Guardar la palabra con un valor número de 1 de tal manera que, si tenemos
       * la palabra "casa" estamos guardando  Si la palabra casa volviera
       * a aparecer, como ya la hemos establecido una vez, el resultado sería 
       *  y así por cada ocurrencia
       */
      context.write(word, new IntWritable(1));
    }
  }
}
Vamos ahora con el Reducer, ojo al atributo estático de la clase
/*
 * El reducer va a contar el número de ítems en la lista de palabras 
 * que pasamos desde el Mapper
 */
public static class WordCountReducer extends Reducer {

  @Override
  public void reduce(Text word, Iterable list, Context context)
      throws IOException, InterruptedException
  {
    //Ponemos el contador de palabras a 0
    int total = 0;
    
    //Recorremos el objeto Iterable list y sumamos uno al contador
    for(IntWritable count : list)
    {
      total++;
    }
    
    /* Escribimos el resultado del reducer, para el ejemplo de casa, escribiría
     * 
     */
    context.write(word, new IntWritable(total));
  }
  
}
Ahora solo falta la clase main de entrada a la aplicación donde se configura todas las opciones del MapReduce
// Entrada de la aplicación
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
{
  //Creamos un fichero de configuracion y le damos un nombre al Job
  Configuration conf = new Configuration();
  Job job = new Job(conf, "word count");

  //Le tenemos que indicar la clase que hay que usar para llamar a los mappers y reducers
  job.setJarByClass(Wordcount.class);

  //Le indicamos el nombre de la clase Mapper y de la clase Reducer
  job.setMapperClass(WordCountMapper.class);
  job.setReducerClass(WordCountReducer.class);

  /* 
   * Le tenemos que indicar el formato que va a tener el resultado, en nuestro caso vamos
   * a recuperar un resultado del tipo  por lo que, usando
   * los tipos primitivos de Hadoop, esto equivaldría a Text.class y IntWritable.class
   */
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  /* Le indicamos el fichero de entrada y de salida que, por lo general, los vamos a recoger
   * de los parámetros que le pasemos a la clase JAR
   */
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  //Esperamos a que el trabajo termine
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Nos queda exportar el JAR y ejecutarlo con algún fichero de prueba para contar sus palabras. Para exportarlo vamos a usar Eclipse, por su amplia uso actual y su simplicidad. Tendremos que exportarlo como un JAR ejecutable asegurándonos de tener las siguientes opciones seleccionadas. Es importante seleccionar "Extract required libraries into generated JAR" en el cuadro de diálogo de exportación que nos aparece:

Ahora sólo falta probarlo. Para ello podemos usar cualquiera de los 3 modos de Hadoop (local, pseudo-distribuido o distribuido). Por simplicidad vamos a usar el modo Local:

cd $HADOOP_INSTALL
bin/hadoop jar mariocaster.blogspot.com-wordcount.jar example-files/input example-files/output
Recordar que la carpeta example-files/input debe contener los ficheros de texto a contar y la carpeta example-files/output NO debe existir cuando ejecutemos el script porque fallará (Hadoop se encargará de crear la carpeta). El resultado es similar al siguiente:

INFO util.NativeCodeLoader: Loaded the native-hadoop library
WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
INFO input.FileInputFormat: Total input paths to process : 2
WARN snappy.LoadSnappy: Snappy native library not loaded
INFO mapred.JobClient: Running job: job_local1360868529_0001
INFO mapred.LocalJobRunner: Waiting for map tasks
INFO mapred.LocalJobRunner: Starting task: attempt_local1360868529_0001_m_000000_0
INFO util.ProcessTree: setsid exited with exit code 0
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4f69385e
INFO mapred.MapTask: Processing split: file:/var/hadoop/examples-files/input/text2:0+118
INFO mapred.MapTask: io.sort.mb = 100
INFO mapred.MapTask: data buffer = 79691776/99614720
INFO mapred.MapTask: record buffer = 262144/327680
INFO mapred.MapTask: Starting flush of map output
INFO mapred.MapTask: Finished spill 0
INFO mapred.Task: Task:attempt_local1360868529_0001_m_000000_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task 'attempt_local1360868529_0001_m_000000_0' done.
INFO mapred.LocalJobRunner: Finishing task: attempt_local1360868529_0001_m_000000_0
INFO mapred.LocalJobRunner: Starting task: attempt_local1360868529_0001_m_000001_0
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4e3600d
INFO mapred.MapTask: Processing split: file:/var/hadoop/examples-files/input/texto:0+72
INFO mapred.MapTask: io.sort.mb = 100
INFO mapred.MapTask: data buffer = 79691776/99614720
INFO mapred.MapTask: record buffer = 262144/327680
INFO mapred.MapTask: Starting flush of map output
INFO mapred.MapTask: Finished spill 0
INFO mapred.Task: Task:attempt_local1360868529_0001_m_000001_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task 'attempt_local1360868529_0001_m_000001_0' done.
INFO mapred.LocalJobRunner: Finishing task: attempt_local1360868529_0001_m_000001_0
INFO mapred.LocalJobRunner: Map task executor complete.
INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@58017a75
INFO mapred.LocalJobRunner: 
INFO mapred.Merger: Merging 2 sorted segments
INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 428 bytes
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task:attempt_local1360868529_0001_r_000000_0 is done. And is in the process of commiting
INFO mapred.LocalJobRunner: 
INFO mapred.Task: Task attempt_local1360868529_0001_r_000000_0 is allowed to commit now
INFO output.FileOutputCommitter: Saved output of task 'attempt_local1360868529_0001_r_000000_0' to examples-files/ouput
INFO mapred.LocalJobRunner: reduce > reduce
INFO mapred.Task: Task 'attempt_local1360868529_0001_r_000000_0' done.
INFO mapred.JobClient:  map 100% reduce 100%
INFO mapred.JobClient: Job complete: job_local1360868529_0001
INFO mapred.JobClient: Counters: 20
INFO mapred.JobClient:   File Output Format Counters 
INFO mapred.JobClient:     Bytes Written=249
INFO mapred.JobClient:   FileSystemCounters
INFO mapred.JobClient:     FILE_BYTES_READ=2253
INFO mapred.JobClient:     FILE_BYTES_WRITTEN=153667
INFO mapred.JobClient:   File Input Format Counters 
INFO mapred.JobClient:     Bytes Read=190
INFO mapred.JobClient:   Map-Reduce Framework
INFO mapred.JobClient:     Reduce input groups=35
INFO mapred.JobClient:     Map output materialized bytes=436
INFO mapred.JobClient:     Combine output records=0
INFO mapred.JobClient:     Map input records=16
INFO mapred.JobClient:     Reduce shuffle bytes=0
INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
INFO mapred.JobClient:     Reduce output records=35
INFO mapred.JobClient:     Spilled Records=78
INFO mapred.JobClient:     Map output bytes=346
INFO mapred.JobClient:     CPU time spent (ms)=0
INFO mapred.JobClient:     Total committed heap usage (bytes)=728760320
INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
INFO mapred.JobClient:     Combine input records=0
INFO mapred.JobClient:     Map output records=39
INFO mapred.JobClient:     SPLIT_RAW_BYTES=216
INFO mapred.JobClient:     Reduce input records=39

Finalmente, este es el resultado del MapReduce:
Este 1
This 1
a 2
an 1
are 1
contar 1
count 1
cuantas 1
de 1
del 2
dentro 1
ejemplo 1
es 1
estan 1
example 1
fichero 2
going 1
hay 1
in 1
is 1
it 1
of 1
palabras 2
que 1
sus 1
text 1
that 1
the 1
to 1
un 1
vamos 1
ver 1
we 1
words 1
y 1

miércoles, mayo 07, 2014

Instalando Hadoop en una Raspberry Pi

¿Hadoop en una Raspberry Pi?

En cierto modo, se podría encontrar el motivo de esta entrada entre lo absurdo y lo innecesario. La capacidad de computación de una Raspberry Pi ni su memoria RAM no la hace excesivamente interesante para trabajos de Big Data, pero es cierto que ofrece una manera barata de montar un cluster con fines educativos en casa (especialmente si en casa tienes dos Raspberry Pi v2 una Raspberry Pi v1 y una BeagleBone Black)

La instalación de Hadoop en una Raspberry Pi resulta más sencillo de lo que cabe esperar. Y es que el pequeño ordenador de open hardware no se diferencia tanto de cualquier portátil u ordenador de sobremesa si obviamos la RAM de que dispone la v2 (512mb) y la capacidad del microprocesador (bueno, y el almacenamiento en sd, etc etc).

Básicamente, siguiendo el primero de los tutoriales sobre instalación de Hadoop e incluso modo tienes el 
90% del trabajo hecho. Sólo falta un pequeño detalle....

Y es que los trabajos de computación que se llegan a realizar en un Job MapReduce suelen requerir de gran cantidad de RAM y como nuestra pequeña Rpi no es que vaya sobrada precisamente, debemos indicar a la instalación de Hadoop unos límites para que el sistema operativo no se cargue los procesos del DataNode y del TaskTracker.

Esto es igualmente válido para ordenadores de consumo que no son los típicos que se pueden encontrar en un cluster de Hadoop de producción con chorrocientos Gb de RAM. En algún caso nos podríamos ver forzados a establecer límites en máquinas con más RAM, en alguna prueba que he hecho he tenido que establecer límites de 3gb en una máquina con 4Gb de RAM de 64 bits.

Déjate de rollos y dime que tengo que hacer

Fácil, para establecer estos límites tenemos que entrar en carpeta de configuración de Hadoop que en nuestra versión es $HADOOP_INSTALL/conf aunque también podría ser /etc/hadoop o en $HADOOP_INSTALL/etc/conf.

Una vez localizada la carpeta, abrir el archivo mapred-site.xml e introducir las siguientes propiedades:


    mapred.child.java.opt
    -Xmx386



    mapred.tasktracker.map.tasks.maximum
    1



    mapred.tasktracker.reduce.tasks.maximum
    1

Y voilá! Aunque parezca increíble esto fue lo único que tuve que hacer para agregar la Raspberry Pi a mi cluster casero. La explicación no es compleja: la primera propiedad limita la RAM utilizada por el proceso de Map o Reduce de la Rpi a 386, lo cual le da un pequeño margen para que el sistema operativo no nos mate el proceso. Las otras dos opciones limitan el número de operaciones de Map o Reduce que se pueden ejecutar en paralelo dentro de cada nodo de trabajo. Y es que si cada trabajo de Map o de Reduce es computacionalmente no muy intenso, se puede ajustar este parámetro para ejecutar varios trabajos en paralelo. En general he comprobado que en la Rpi no suele ser posible aumentar en número de tareas por encima de uno incluso en pequeños trabajos.

Y puestos a hablar de clusters caseros de Raspberrys Pi, ahí dejo un link de un señor que tuvo las pelotas.... Por la ley del máximo esfuerzo, de montarse un cluster con 40 Rpi Raspberry Pi 40 nodes cluster

lunes, mayo 05, 2014

Primeros pasos con Hive. Oootro contador de palabras

Descargando e instalando Hive

  1. Página web oficial: http://hive.apache.org/
  2. Site de releases: http://www.apache.org/dyn/closer.cgi/hive/
  3. Release que vamos a usar: http://ftp.cixug.es/apache/hive/hive-0.13.0/apache-hive-0.13.0-bin.tar.gz
Lo primero descargar y ejecutar la release que queramos usar:

wget 'http://ftp.cixug.es/apache/hive/hive-0.13.0/apache-hive-0.13.0-bin.tar.gz'
tar -zxvf apache-hive-0.13.0-bin.tar.gz
cd apache-hive-0.13.0-bin/bin
export HIVE_HOME=$HOME/apache-hive-q0.13.0-bin
export PATH=$PATH:$HIVE_HOME/bin

Arrancando el cluster de Hadoop

Hive necesita al menos Hadoop 1.2.1 corriendo para funcionar. Si no sabes como instalar Hadoop aún te recomiendo que te pases por el tutorial Instalando Hadoop en modo pseudo-distribuido (local)

Una vez tengamos el cluster corriendo, Hive necesita de dos carpetas dentro del HDFS con permisos de grupo para poder funcionar, para crear esas carpetas y darles los permisos necesarios ejecutaremos las órdenes siguientes:
hadoop fs -mkdir /user
hadoop fs -mkdir /user/hive
hadoop fs -mkdir /user/hive/warehouse
hadoop fs -mkdir /tmp
hadoop fs -chmod g+w /tmp
hadoop fs -chmod g+x /user/hive/warehouse

Como se puede apreciar, Hive necesita de las carpetas /tmp y /user/hive/warehouse para poder funcionar.

Ahora vamos a crear un fichero de ejemplo con el siguiente contenido para contar sus palabras. Lo llamaremos ejemplo.txt y lo vamos a guardar dentro de la carpeta bin de la carpeta de Hive:
Este es
un fichero de
ejemplo en
del que vamos
a contar
sus palabras
El cual tendremos que copiar dentro del HDFS. Recordamos el comando:
hadoop fs -copyFromLocal ejemplo.txt /tmp/ejemplo.txt
Ahora, ejecutamos la consola de Hive con el comando "hive" (sin comiillas) y creamos una tabla para almacenar el fichero de texto en ella:
hive>CREATE TABLE ejemplo (linea STRING);
Con el comando anterior, sólo hemos creado una tabla nueva, pero no la hemos cargado de información. Para cargar un fichero en una tabla ejecutaremos la orden siguiente:
hive>LOAD DATA INPATH '/tmp/ejemplo.txt' OVERWRITE INTO TABLE ejemplo;

Deleted hdfs://asusnotebook:54310/user/hive/warehouse/texto

Table default.texto stats: [numFiles=1, numRows=0, totalSize=73, rawDataSize=0]

OK

Time taken: 1.612 seconds
Ya tenemos el fichero cargado en la tabla ejemplo. Ahora hay que crear otra tabla para almacenar el resultado del conteo de palabras:

hive>CREATE TABLE contador AS 
SELECT palabra, count(1) AS cuenta FROM 
(SELECT explode(split(linea,' ')) AS palabra FROM texto) w 
GROUP BY palabra 
ORDER BY cuenta;
Esto comenzará creará dos MapReduce que comenzarán a trabajar de inmediato. Los mensajes que aparecen son como los siguientes (resumido)
[...]

Stage-2 map = 0%,  reduce = 0%

Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 0.84 sec

Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 2.12 sec

OK

Time taken: 42.304 seconds

Vamos a ver ahora que se ha creado dentro de la tabla contador

hive>SELECT * FROM contador;

a 1
contar 1
cuantas 1
de 1
del 2
dentro 1
ejemplo 1
es 1
estan 1
fichero 2
hay 1
palabras 2
que 1
sus 1
un 1
vamos 1
ver 1
y 1
Y ahí lo tenemos. Por poco casi no repito una palabra pero como se puede apreciar, la palabra "del", "fichero" y "palabras" aparecen 2 veces cada una.