lunes, abril 28, 2014

Escribiendo scripts en Apache Pig para Hadoop

Descargando y ejecutando Pig en modo local

Si aún no has hecho el primero de los tutoriales sobre Pig, te recomiendo pasarte por él ya que va a ser el mínimo necesario para que puedas realizar este tutorial: Primeros pasos con Apache Pig. Usando Pig para hacer un contador de palabras

Descripción breve de un script Pig

Un script Pig no es mas que una serie de órdenes ejecutadas secuencialmente para realizar una consulta. Es exactamente lo mismo que utilizar la consola sólo que además le puedes poner comentarios. La manera de escribir comentarios en el código Pig es precediendo las líneas con un doble guión (--) o para hacerlo multilínea sería con el típico /* */

El contador de palabras en un script Pig

Para hacer el contador de palabras en un script Pig, lo único que tenemos que hacer es crear un fichero con la extensión pig e introducir las órdenes que utilizamos en el tutorial anterior sobre Pig en el fichero. Vamos a crear un fichero llamado wordcounter.pig y dentro escribiremos lo siguiente:

/* Cargar el fichero */
myfile = LOAD 'tweets' AS (words:chararray);

/* Separar las palabras dentro de cada linea */
wordsList = FOREACH myfile GENERATE TOKENIZE($0);

/* Separar las palabras a una linea por palabra */
words = FOREACH wordsList GENERATE FLATTEN($0);       

/* Agrupar las palabras iguales en la misma linea */
groupedWords = GROUP words BY $0;

/* Contar las palabras */
final = FOREACH groupedWords GENERATE $0, COUNT($1);  

/* Ordenar las palabras */
sortedWords = ORDER final BY $1 ASC;

/* Mostrar los resultados por pantalla */
DUMP sortedWords;


Ahora, para ejecutar el script en modo local, sólo tenemos que escribir:
pig -x local wordcounter.pig

Bonus: Guardando los resultados en un archivo

En la mayoría de los casos no vamos a querer mostar los resultados por pantalla sino que vamos a querer que los resultados se guarden en disco para su posterior análisis. Nada mas fácil en Pig. Lo único que tenemos que hacer es reemplazar la instrucción DUMP con la instrucción STORE. El código nos quedaría así:


/* Cargar el fichero */
myfile = LOAD 'tweets' AS (words:chararray);

/* Separar las palabras dentro de cada linea */
wordsList = FOREACH myfile GENERATE TOKENIZE($0);

/* Separar las palabras a una linea por palabra */
words = FOREACH wordsList GENERATE FLATTEN($0);       

/* Agrupar las palabras iguales en la misma linea */
groupedWords = GROUP words BY $0;

/* Contar las palabras */
final = FOREACH groupedWords GENERATE $0, COUNT($1);  

/* Ordenar las palabras */
sortedWords = ORDER final BY $1 ASC;

/* Guardar los resultados en una carpeta llamada pig_wordcount */
STORE sortedWords into 'pig_wordcount';


Esto creará una carpeta llamada "pig_wordcount" que contendrá los resultados de la consulta en archivos con un formato "part-r-*". Para ver los resultados sólo habrá que ejecutar desde el bash:
cat pig_wordcount/part-r-*

sábado, abril 26, 2014

Primeros pasos con Apache Pig. Usando Apache Pig para hacer un Wordcount en modo local

Descargando e instalando Pig

Algunos links de interés:

Lo primero será descargar, extraer Pig y decirle donde está la instalación de Hadoop (ver tutorial Instalando Hadoop en modo pseudo-distribuido (esto no es estrictamente necesario en el modo local pero nos ahorrará tiempo y quebraderos de cabeza en el futuro):
 
wget 'http://apache.rediris.es/pig/pig-0.12.1/pig-0.12.1.tar.gz'
tar -zxvf pig-0.12.1.tar.gz
cd pig-0.12.1/bin
export PIG_CLASSPATH=$HADOOP_INSTALL


Bien, ahora vamos a hacer el mismo contador de palabras del tutorial anterior pero con Pig, para eso vamos a descargar el mismo archivo en formato de texto plano http://www.gutenberg.org/ebooks/11101 y lo vamos a poner en la carpeta bin dentro de Pig. Deberíamos tener los siguientes ficheros dentro de la carpeta de Pig:
total 204K
-rwxr-xr-x 1 fedora fedora 13K abr 5 10:43 pig
-rw-r--r-- 1 fedora fedora 161K abr 19 23:51 pg11101.txt
-rwxr-xr-x 1 fedora fedora 5,6K abr 5 10:43 pig.cmd
-rwxr-xr-x 1 fedora fedora 14K abr 5 10:43 pig.py
/home/fedora/Descargas/pig-0.12.1/bin
[fedora@localhost bin]$


Ejecutando Pig en modo local

pig -x local
Despues de unos cuantos mensajes de log aparece grunt
grunt>

Ahora vamos a escribir el contador de palabras en Pig, lo primero será cargar el archivo con la instrucción LOAD:
grunt>myfile = LOAD 'pg11101.txt' AS (words:chararray);

Si usamos la instrucción DUMP para ver los contenidos de la variable myfile, podemos ver la estructura general que tiene ahora cada linea:
grunt>DUMP myfile;
...
(EBooks posted since November 2003, with etext numbers OVER #10000, are)
(filed in a different way. The year of a release date is no longer part)
(of the directory path. The path is based on the etext number (which is)
...

Todavía falta bastante, ni siquiera tenemos una lista de palabras. Para poder separarlo por palabras, podemos usar la instrucción TOKENIZE:
grunt> wordsList = FOREACH myfile GENERATE TOKENIZE($0);
...
({(EBooks),(posted),(since),(November),(2003),(with),(etext),(numbers),(OVER),(#10000),(are)})
({(filed),(in),(a),(different),(way.),(The),(year),(of),(a),(release),(date),(is),(no),(longer),(part)})
({(of),(the),(directory),(path.),(The),(path),(is),(based),(on),(the),(etext),(number),(which),(is)})
...

Si nos fijamos bien, ahora tenemos tuplas (arrays) de palabras pero seguimos sin tener un listado de palabras separadas: Para conseguir dicho listado, tenemos la instrucción FLATTEN:
grunt>words = FOREACH wordsList GENERATE FLATTEN($0);
...
(at:)
()
(http://www.gutenberg.net/1/0/2/3/10234)
()
(or)
(filename)
(24689)
(would)
(be)como
(found)
(at:)
...

Con lo que ahora tenemos las listas separadas, una palabra por linea (aunque todavía están repetidas). Ahora es el momento de agruparlas usando la instrucción GROUP:
grunt>groupedWords = GROUP words BY $0;
...
(www.gutenberg.net,{(www.gutenberg.net),(www.gutenberg.net),(www.gutenberg.net)})
(Constantinopolitan,{(Constantinopolitan),(Constantinopolitan)})
...

La cosa marcha, ahora tenemos listados agrupados de la misma palabra por cada línea. Sólo nos falta contar cuantas ocurrencias hay por línea. Para ello le diremos que por cada linea nos tiene que crear una nueva pareja clave/valor que clave ($0) siga siendo el nombre de la palabra que estamos contando, como hasta ahora y el valor la cuenta de ítems en la segunda posición ($1) con la instrucción COUNT.
grunt>final = FOREACH groupedWords GENERATE $0, COUNT($1);
...
(www.gutenberg.net,3)
(Constantinopolitan,2)
...

Y ahí lo tenemos, nuestro conteo de palabras.


Bonus: Ordenando ocurrencias en Pig

Y como extra, vamos a ordenar el resultado del conteo de palabras en orden descendente, de tal manera que el último de los resultados de la lista sea la palabra mas usada de todo el texto:
grunt>sortedWords = ORDER final BY $1 ASC;
...
(a,697)
(in,833)
(and,1137)
(of,1323)
(the,2047)
...

Por fin, parece que nuestro ganador es la palabra "the". Espero que el tutorial haya resulado interesante :)

miércoles, abril 23, 2014

El Ecosistema de Hadoop - Un resumen de las tecnologías que envuelven Hadoop

El ecosistema de Hadoop

O como tener un montón de tecnologías juntas con un objetivo común

Cuando empecé a aprender Hadoop, una de las primeras cosas que me pasaron fue que me sentía completamente sobrepasado por el nivel de información y términos que rodean Hadoop. HDFS, MapReduce, Streaming, Pig, Hive... puede que incluso frases como "commodity hardware" puedan resultar desconocidas.

Y es que Hadoop es un completo ecosistema de soluciones con un objetivo común, la explotación de grandes cantidades de datos.

Así que, para intentar aclarar un poco las cosas, comparto un link en el que se puede leer un resumen de todo el ecosistema de Hadoop (eso si, en inglés, intentaré traducirlo al español si saco un hueco).

¡Saludos!

lunes, abril 21, 2014

Big-Data con Hadoop

La ley del máximo esfuerzo


A lo largo de las próximas semanas, voy a estar haciendo un poco de investigación a bajo nivel de Hadoop y todo su ecosistema. Algunos se preguntarán (y con razón), ¿porqué motivo reinventar la rueda cuando hay soluciones excelentes ya montadas para la explotación del Big Data como los que ofrecen Cloudera (http://www.cloudera.com/), Horton Works (hortonworks.com) o IBM InfoSphere BigInsights (http://www-01.ibm.com/software/data/infosphere/biginsights)?

Bueno, por la ley del máximo esfuerzo, por supuesto, aquella por la cual cuando tienes sed caminas decenas (o cientos) de kilómetros para ir al mar, llenar un vaso con agua, desalinizarlo y bebértelo... sólo para darte cuenta de que sigues teniendo sed... pero ya sabes como funciona un proceso de desalinización.

NOTA: Al hilo de la ley del máximo esfuerzo, hay un excelente libro de un británico que se hizo una tostadora de cero a base de conseguir los materiales que necesitaba, trabajarlos y montarlos. El libro en cuestión se llama "The Toaster Project" de Thomas Thwaites y se puede adquirir en formato kindle en The Toaster Project en Amazon.es

Tecnologías Big Data

La investigación se va a centrar, en este orden mas o menos, en los siguientes puntos:

  1. Instalando Apache Hadoop en modo Pseudo-Distribuido
  2. El Ecosistema de Hadoop - Un resumen de las tecnologías que envuelven Hadoop
  3. Escribiendo un Hadoop MapReduce en Java: Un Wordcount mejorado
  4. Instalando Apache Hadoop en modo Distribuido
  5. Usando Amazon EMR para ejecutar un Hadoop MapReduce
  6. Primeros pasos con Apache Pig. Usando Pig para hacer un contador de palabras
  7. Usando una Rapsberry Pi como esclavo en Apache Hadoop
  8. Creando scripts en Apache Pig para Hadoop
  9. Usando Apache Pig en modo distribuido (o pseudo-distribuido)
  10. Usando Amazon EMR para ejecutar nuestras consultas con Apache Pig
  11. Primeros pasos con Apache Hive. Creando el contador de palabras en Apache Hive
  12. Aplicaciones distribuidas con Zoekeeper.
  13. Real Time Analytics con Apache Storm: Un contador de palabras en tiempo real
  14. Instalando un cluster de Apache Storm
  15. Creando scripts de Apache Hive y ejecutándolos en el cluster de Hadoop
  16. Usando Amazon EMR para ejecutar nuestras consultas en Apache Hive
  17. Primeros pasos con machine-learning usando Mahout... un recomendador de productos
  18. Creando flujos de trabajos con Oozie
  19. Introducción a la logística de datos con Apache Flume
  20. Logística de datos con Apache Sqoop: Importando bases de datos al HDFS
  21. Logística de datos con Apache Sqoop: Exportando datos del HDFS a MySQL
  22. Analizando datos con R y Hadoop.
  23. Introducción a Apache Spark
  24. Introducción y primeros pasos con Hbase
  25. Introducción a Apache Kafka: un recolector de mensajes
  26. Tanteando Shark y sus posibilidades
  27. Mas machine learning con MLlib
  28. Escapando del modo consola con GraphX. Gráficos para Big Data
  29. Haciendo dashboards de Big Data con Intellicus
  30. Mas...?

Páginas webs oficiales

Apache Hadoop: hadoop.apache.org/‎
Amazon AWS: aws.amazon.com/‎
Apache Hive: hive.apache.org
Apache Flume: flume.apache.org
Apache Mahout: http://mahout.apache.org

sábado, abril 19, 2014

Instalando Hadoop en modo pseudo-distribuido

Bueno, lo primero de todo es que yo no soy ningún experto en Hadoop por lo que el post va más orientado a mis "notas personales" sobre cómo instalé un sistema pseudo-distribuido de Hadoop. En el siguiente post explicaré un método para ejecutar Hadoop en tu propia máquina "engañando" (aunque de engañar nada) a la máquina para que piense que tiene un nodo "esclavo".

Primero un poco de terminología básica y MUY resumida en modo "Entendimiento de abuela":
  1. Nodo "Maestro":
    1. Namenode: Nodo jefe si se le quiere llamar así. Es el que maneja el meollo de todo.
    2. Jobtracker: Se encarga de asignar las tareas de computación.
    3. SecondaryNamenode: Tantea que todo esté funcionando correctamente de vez en cuando.
  2. Nodo "Esclavo":
    1. Tasktracker: Busca tareas de computación para realizar.
    2. Datanode: Es el proceso que hace el trabajo "sucio". Osea, el que "computa" realmente.
Y básicamente, el sistema pseudo-distribuido lo que hace es tener todos esos procesos en la misma máquina.

Para este tutorial vamos a usar una versión que ha estado siendo usada en producción muy a menudo. La versión 1.2.1.

Pre-requisitos

  1. Java 6
  2. Yum
  3. SSH sin contraseña
Mi máquina es una Fedora por lo que en general usaré la terminología de Fedora a la hora de instalar paquetes. Afortunadamente, para trasladar esto a Ubuntu o similares basta con cambiar la palabra "yum" con la palabra "apt-get". Para instalar Java simplemente hay que ejecutar:
sudo yum update
sudo yum install -y opendjk-6-jdk
echo "export JAVA_HOME=/usr/lib/jvm/openjdk-6-jdk" >> ~/.bashrc
Osea, actualizar el sistema. Instalar Java 6 y exportar la variable de entorno JAVA_HOME en caso de que no se hubiera exportado automáticamente.

Hay que habilitar el acceso por SSH sin contraseña a los nodos esclavos. En este caso el nodo esclavo es la misma máquina así que tenemos que permitir un acceso por SSH sin contraseña... ¡a nosotros mismos!

ssh-keygen -t rsa -P ""

Preguntará la ubicación del archivo que, generalmente, irá en ~/.ssh/id_rsa. Mi consejo es llamarlo id_rsa_passless para diferenciarlo del resto de claves rsa que podamos tener. Ahora habrá que agregarla a nuestra lista de claves autorizadas:
cat ~/.ssh/id_rsa_passless.pub >> ~/.ssh/authorized_keys

Ahora, si hacemos ssh localhost, si es la primera vez nos pregutará si confiamos en la máquina, le decimos que "yes" y ya podremos entrar si contraseña.

Descargando Hadoop 

Descargar la version 1.2.1 desde la web oficial http://hadoop.apache.org/. NOTA: No hace falta descargar la versión src, con la normal vale. Extraer el contenido del archivo comprimido
tar -zxvf hadoop-1.2.1.tar.gz
 Mover la carpeta extraída a la localización final de Hadoop. Ojo, la localización de la carpeta es importante ya que luego tendremos que hacer referencia a ella.
sudo mv hadoop-1.2.1.tar.gz   /var/hadoop
Asignar el usuario y grupo correspondiente:
sudo chown -R [user] /var/hadoop
sudo chgrp -R [user] /var/hadoop
mkdir /var/hadoop/hdfs
echo "export HADOOP_INSTALL=/var/hadoop" >> ~/.bashrc
echo "export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true" >> ~/.bashrc
Donde [user] es el nombre del usuario que esté ejecutándose en la máquina. Algunos tutoriales aconsejan hacer un usuario hduser y un grupo hadoop. Esto es recomendable pero para hacer las cosas lo mas simples posibles he preferido omitirlo. Por último creamos la carpeta hdfs dentro del directorio de Hadoop para usarla como el sistema de ficheros HDFS (Hadoop Distributed File System). Por último exportamos el directorio de instalación de Hadoop y le indicamos que sólo use direcciones IPv4.

Configurar Hadoop

Hadoop consta, en la versión de producción mas usada, de tres ficheros de configuraciones fundamentales. Estos los vamos a tener de esta forma:

core-site.xml: 

Configurar la carpeta para el sistema de ficheros HDFS y configurar la máquina del Namenode.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/var/hadoop/hdfs</value>
</property>

<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:54310</value>
</property>
</configuration>

hdfs-site.xml

Para este fichero le estamos diciendo que:

  1. Use sólo una replicación por bloque (por ejemplo, con 3 replicaciones cada bloque de datos existe 3 veces en el cluster para, en caso de fallo, tener una copia del mismo).
  2. No use permisos para el sistema de ficheros HDFS. Esto NO debería hacerse así, pero por simplicidad lo dejamos así que nos va a ahorrar algunos problemas.
  3. No usar mas de 3072 mb de RAM. Esto es para evitar que el propio sistema linux mate el proceso si ve que está bloqueando el resto del sistema. Para una máquina de 4Gb de RAM es un umbral razonable.
  4. Permitir un número de procesos hasta 4096. Esto evita ciertos errores que he tenido comúnmente que, al parecer, estaban relacionados con este valor

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

<property>
  <name>dfs.replication</name>
  <value>1</value>
</property>

<property>
    <name>dfs.permissions</name>
    <value>false</value>
</property>

<property>
<name>mapred.child.java.opts</name>
<value>-Xmx3072m</value>
</property>

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

</configuration>

mapred-site.xml

En el fichero Mapred le vamos a indicar en qué máquina está el JobTracker:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>mapred.job.tracker</name>
  <value>localhost:54311</value>
</property>
</configuration>

masters

Contendrá un listado de máquinas para usar el SecondaryNamenode en ellas:
localhost

slaves

Contendrá un listado de máquinas que van a efectuar trabajos de computación. Como el nuestro es un sistema pseudo-distribuido, es el mismo que el namenode, secondarynamenode, etc:
localhost 

Formatear el sistema de ficheros HDFS

Aunque tenga la palabra formatear y asuste mucho, en realidad lo único que formatea son los contenidos de la carpeta que hayamos definido anteriormente en el fichero core-site.xml, la propiedad hadoop.tmp.dir. En nuestro caso es la carpeta /var/hadoop/hdfs
cd /var/hadoop
bin/hadoop namenode -format

 Copiar un archivo para ejecutar el trabajo sobre él

Para nuestro ejemplo, vamos a usar el poema de Samuel Taylor Coleridge "Ancient Mariner and selected poems" disponible en el Proyecto Gutenberg -> http://www.gutenberg.org/ebooks/11101 (descargar la versión plain text). Movemos el archivo descargado a la carpeta /var/hadoop y creamos una carpeta en el HDFS para copiar el archivo dentro.
bin/hadoop fs -mkdir /rime
bin/hadoop fs -copyFromLocal pg11101.txt /rime

Iniciar el pseudo-cluster

Ahora, para iniciar el cluster y que toda la magia se ponga en marcha hay que ejecutar el siguiente comando:
sbin/start-all.sh
Podemos comprobar si todo ha ido bien escribiendo en el terminal jps lo cual debe darnos un resultado similar al siguiente:
6246 Jps
2435 DataNode
2740 JobTracker
2313 NameNode
2605 SecondaryNameNode
2875 TaskTracker
Si algo ha ido mal, se puede comprobar el log que se almacenará en la carpeta logs del directorio de hadoop.

Todo listo: Iniciar el trabajo del contador de palabras

Todos los procesos corriendo, el fichero de prueba en el HDFS....
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-0.23.9.jar wordcount /rime/pg11101.txt /rime/output
Osea:

  1. bin/hadoop = Ejecuta el binario de Hadoop...
  2. jar = ...para ejecutar el jar... 
  3. share/hadoop/mapreduce/hadoop-mapreduce-examples-0.23.9.jar = (jar a ejecutar) del cual usa el subproceso...
  4. wordcount = ...subproceso a ejecutar...
  5. /rime/pg11101.txt = ...para contar este fichero de mi HDFS...
  6. /rime/output = ...y guardarme el resultado en esta carpeta.
Este es un resultado típico de ejecutar un mapreduce:

INFO input.FileInputFormat: Total input paths to process : 1
INFO mapred.JobClient: Running job: job_201404191919_0002
INFO mapred.JobClient: map 0% reduce 0%
INFO mapred.JobClient: map 100% reduce 0%
INFO mapred.JobClient: map 100% reduce 100%
INFO mapred.JobClient: Job complete: job_201404191919_0002
INFO mapred.JobClient: Counters: 29
INFO mapred.JobClient: Job Counters
INFO mapred.JobClient: Launched reduce tasks=1
INFO mapred.JobClient: SLOTS_MILLIS_MAPS=13320
INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
INFO mapred.JobClient: Launched map tasks=1
INFO mapred.JobClient: Data-local map tasks=1
INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10400
INFO mapred.JobClient: File Output Format Counters
INFO mapred.JobClient: Bytes Written=103002
INFO mapred.JobClient: FileSystemCounters
INFO mapred.JobClient: FILE_BYTES_READ=143849
INFO mapred.JobClient: HDFS_BYTES_READ=231867
INFO mapred.JobClient: FILE_BYTES_WRITTEN=330699
INFO mapred.JobClient: HDFS_BYTES_WRITTEN=103002
INFO mapred.JobClient: File Input Format Counters
INFO mapred.JobClient: Bytes Read=231760
INFO mapred.JobClient: Map-Reduce Framework
INFO mapred.JobClient: Map output materialized bytes=143849
INFO mapred.JobClient: Map input records=4989
INFO mapred.JobClient: Reduce shuffle bytes=143849
INFO mapred.JobClient: Spilled Records=20624
INFO mapred.JobClient: Map output bytes=356537
INFO mapred.JobClient: Total committed heap usage (bytes)=251133952
INFO mapred.JobClient: CPU time spent (ms)=3950
INFO mapred.JobClient: Combine input records=36475
INFO mapred.JobClient: SPLIT_RAW_BYTES=107
INFO mapred.JobClient: Reduce input records=10312
INFO mapred.JobClient: Reduce input groups=10312
INFO mapred.JobClient: Combine output records=10312
INFO mapred.JobClient: Physical memory (bytes) snapshot=267968512
INFO mapred.JobClient: Reduce output records=10312
INFO mapred.JobClient: Virtual memory (bytes) snapshot=10779578368
INFO mapred.JobClient: Map output records=36475

Y ya está

Si todo ha ido bien el siguiente comando nos mostrará el conteo de palabras del fichero que le hemos pasado:
bin/hadoop fs -cat /rime/output/part-r-00000
wrath 1
wrath*, 1
wreathless 1
wreaths 1
wrenched 1
wretch! 1
wretched 2
wretchedness, 1
write 3
writes 3
writing 6
writings 3
written 17
written, 2
written. 1
wrong 2
wrong, 1
wrong; 1
wronged 1
wrote 25
wrote, 1
wrote: 2
wroth 1
www.gutenberg.net 2
xi; 1
xiv.). 1
xvii., 1
ye 10
year 13
year, 9
year. 3
year: 1
year; 1
Como se puede apreciar en el resultado de ejemplo, quedaría una última vuelta de tuerca en el programa MapReduce del Wordcount en el cual habría que intentar agrupar las palabras con contuvieran únicamente caracteres alfanuméricos, esto lo haremos en el tutorial sobre como escribir nuestro propio MapReduce: Un Wordcount mejorado.