martes, junio 10, 2014

Introducción a Apache Kafka... un recolector de mensajes

Introducción a Apache Kafka: Ingeniería de software de parte de Linkedin

Apache Kafka es un sistema distribuido de mensajería de suscripción-publicación sobre Scala. Su web oficial es http://kafka.apache.org/ y la distribución de la que vamos a hacer uso en este tutorial es la 0.8.1.1 para Scala 2.10

Descargando y extrayendo el paquete


cd $HOME
wget http://ftp.cixug.es/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
tar -zxvf kafka_2.10-0.8.1.1.tgz
mv kafka_2.10-0.8.1.1.tgz kafka
export KAFKA_HOME=$HOME/kafka
cd kafka

Vamos a necesitar un total de cuatro terminales abiertos. Lo primero que hay que hacer, después de haber descargado y extraído el tar, es iniciar el servidor de Zookeeper. Como se puede apreciar, Zookeeper forma un pilar fundamental de muchos de los paquetes que envuelven el ecosistema de Hadoop. Luego tenemos que iniciar el broker the Kafka y, por último, crear un topic de Kafka :
# Arrancar el servidor de Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# Crea el broker
bin/kafka-server-start.sh config/server.properties

# Crea el topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatopic
Created topic "kafkatopic".

Ahora que tenemos un Topic, tenemos que lanzar un "productor" para mandar mensajes. El productor se encarga de meter mensajes en la cola para un "consumidor" pueda recibirlos.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic

Por último, vamos a crear un consumidor para recibir los mensajes con el siguiente comando

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

Bien, volvemos al terminal del "productor" de mensajes para mandar el primero de nuestros mensajes:

# Terminal del productor. Escribimos el mensaje
http://mariocaster.blogspot.com

# Terminal del consumidor. El mensaje es recibido
http://mariocaster.blogspot.com

Con esto, podríamos usar la API para Java y escribir un Productor de mensajes. Un ejemplo sencillo sería como el siguiente:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class Main {
    public static void main(String args[]){

        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        props.put("request.required.acks", "1");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "kafkagroup");

        ProducerConfig config = new ProducerConfig(props);

        Producer producer = new Producer(config);
        KeyedMessage\\ msg = new KeyedMessage\("kafkatopic", null, "Message");

        producer.send(msg);
        producer.close();
    }
}

Con el código de arriba, cada vez que compilemos y ejecutemos, podremos ver el mensaje "Message" en el terminal del consumidor. Por otro lado, también podemos usar la API de Java para crear un Consumidor (un suscriptor):

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Main {
 
 public static void main(String[] args) {
  Properties props = new Properties();
  props.put("metadata.broker.list", "localhost:9092");
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
  props.put("request.required.acks", "1");
  props.put("zookeeper.connect", "localhost:2181");
  props.put("group.id", "kafkagroup");

  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

  Map topicCount = new HashMap();
  topicCount.put("kafkatopic", new Integer(1));
  Map>> consumerStreams = consumer.createMessageStreams(topicCount);
  List> streams = consumerStreams.get("kafkatopic");
  
  for (KafkaStream stream : streams) {
   ConsumerIterator consumerIte = stream.iterator();
   while (consumerIte.hasNext())
    System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
  }
 }
}


En este caso, al compilar y ejecutar la consola se quedará abierta recibiendo extrayendo los mensajes de la cola que está manteniendo Kafka. Abriendo el terminal del "productor" y escribiendo cualquier palabra podremos ver el resultado en nuestra consola de Java. Con esto terminamos el tutorial sobre Apache Kafka. En el siguiente tutorial en el que usemos Kafka, vamos a escribir un spout de Storm para recuperar los mensajes de Kafka y procesarlos con unos Bolts.

Primeros pasos con Apache Spark. Un contador de palabras en Scala y Spark

¿Qué es Apache Spark?

Spark es un motor para analítica de datos open source hecho como un framework de Scala y que pertenece al ecosistema de Hadoop, entre otras cosas, por hacer uso del HDFS (Hadoop Distributed File System).

Algunas de sus principales virtudes son que resulta mas fácil de usar que Hadoop MapReduce así como puede resultar computacionalmente mucho mas rápido en ciertos casos.

Su principal característica diferenciadora si lo comparamos con Hadoop MapReduce es que Spark realiza los cálculos en "in-memory" para alcanzar esas velocidades de cómputo superiores hasta 100 veces mas rápido que Hadoop MapReduce en memoria y hasta 10 veces mas rápido en disco.

Instalación

Lo primero será descargar Apache Spark desde la web oficial: http://spark.apache.org/ Para el propósito de este tutorial, y para guardar relación con el resto de tutoriales que usan Hadoop 1.2.1, vamos a descargar la versión binaria para Hadoop 1, escrito en la sección de descargas como "Download binaries for Hadoop 1 (HDP1, CDH3)"

Una vez descargado, extraemos y entramos en el directorio de se acaba de crear (opcional: vamos también a cambiar el nombre de la carpeta a simplemente "spark" para mejor legibilidad) Nosotros vamos a descargarlo e instalarlo en $HOME/spark

wget http://d3kbcqa49mib13.cloudfront.net/spark-0.9.1-bin-hadoop1.tgz
tar -zxvf spark-0.9.1-bin-hadoop1.tgz
mv spark-0.9.1-bin-hadoop1.tgz spark-0.9.1-bin-hadoop1 spark
cd spark
pwd
/home/ubuntu/spark

Bien, ahora vamos a exportar tanto la carpeta bin dentro de la instalación como el home de Spark y también vamos a compilar y ensamblar el paquete:


# Exportar directorios
export PATH=$PATH:$HOME/spark/bin
export SPARK_HOME=$HOME/spark

# Compilar y ensamblar
sbt/sbt clean compile
sbt/sbt assembly

# OPCIONAL: Agregar SPARK_HOME a nuestro ~/.bashrc o similar
echo 'export SPARK_HOME=$HOME/spark' >> ~/.bashrc

NOTA: Para agregar la ruta de spark/bin al PATH lo mas facil y seguro es abrir directamente el ~/.bashrc con un editor de textos y buscar la linea donde pone "export $PATH". Ahora tendremos que ejecutar nuestro cluster de Hadoop en modo pseudo-distribuido o distribuido (real). Si aun no sabes como montar un cluster de Hadoop con HDFS, que es en realidad lo que va a usar Spark, aquí puedes encontrar un gran tutorial al respecto: Instalando Hadoop en modo pseudo-distribuido

# Arrancar todos los procesos de Hadoop
$HADOOP_INSTALL/bin/start-all.sh

Primeras pruebas

Una vez tengamos todos los procesos corriendo: NameNode, SecondaryNameNode, JobTracker (para el nodo master) y TaskTracker y DataNode (para los esclavos o workers), ya podemos ejecutar Spark. Aun así, vamos a comprobar la correcta instalación de Spark iniciando uno de los ejemplos que vienen. Para ello nos vamos a la raiz donde hemos instalado spark (en nuestro caso $HOME/spark) y ejecutamos:

bin/run-example org.apachebin/run-example org.apache.spark.examples.SparkPi local 50

El resultado, como suele ser habitual, contiene un montón de mensajes de log. Veremos ir pasando cada una de las 50 iteraciones pero, en algún punto ya al final, aparecerá algo como:
Pi is roughly 3.1419392
Lo cual nos indica el éxito de la operación.

El siguiente paso es escribir nuestro ya famoso contador de palabras. En Spark se puede Scala, Java y Python. Para este tutorial vamos a usar Scala ya que es el lenguaje "nativo" para Spark y es el que más funcionalidades soporta. Mas adelante haŕe algún ejemplo también en Python.


import org.apache.spark.{SparkContext, SparkConf}

object WordCount {
  def main(args: Array[String]) {


    val conf = new SparkConf(true)
    conf.setAppName("cassandra-example-hello")
    conf.setMaster("local")

    val spark = new SparkContext(conf)

    val textFile = spark.textFile("/tmp/test")
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    counts.saveAsTextFile("/tmp/testResult")
  }
}

#Resultado
res10: Array[(String, Int)] = Array((y,1), (palabras,1), (a,2), (este,1), (Spark,1), (de,2), (contar,1), (usar,1), (capacidades,1), (las,2), (fichero.,1), (Vamos,2), (HDFS,1))

# Opcional: Guardar los contenidos del resultado en el disco
reducer.saveAsTextFile("results")


Espero que os haya gustado el tutorial. Los próximos tutoriales sobre Spark estarán orientado a la escritura de scripts, el uso de la API de Python y Java, al uso de HDFS y al uso de Apache Shark

lunes, junio 09, 2014

Instalación y primeros pasos con Apache HBase

HBase

¿Qué es HBase? Atendiendo a lo que pone en su web, es la base de datos de Hadoop: un sistema de almacenamiento escalable y distribuible para Big Data.

Instalando HBase

La instalación es sencilla y sigue los procesos habituales: descargar, descomprimir y crear una variable de entorno para el directorio de instalación:

cd $HOME
wget http://ftp.cixug.es/apache/hbase/hbase-0.98.3/hbase-0.98.3-hadoop1-bin.tar.gz
tar -zxvf hbase-0.98.3-hadoop1-bin.tar.gz
mv hbase-0.98.3-hadoop1-bin.tar.gz hbase
cd hbase

Configuración de HBase

Hbase cuenta con una carpeta conf donde se encuentran los ficheros de configuración. En este caso vamos a configurar el hbase-site.xml:


<property>
 <name>hbase.rootdir</name>
 <value>/home/fedora/hbase</value>
</property>
<property>
 <name>hbase.zookeeper.property.dataDir</name>
 <value>/home/fedora/hbase</value>
</property>


Una vez tengamos esto configurado, tenemos que asegurarnos que nuestro máquina aparece en el listado de /etc/hosts apuntando a 127.0.0.1.
127.0.0.1 localhost

Probando HBase

Ahora vamos a iniciar HBase y la Shell de HBase para probar que esté funcionando:

bin/start-hbase.sh
bin/hbase shell

#La respuesta debe ser similar a la siguiente:
$ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 0.94.19, r1588997, Tue Apr 22 00:21:01 UTC 2014

hbase(main):001:0>

#Como se puede leer, al escribir "help" podemos recibir una lista de comandos disponibles.


#Vamos a hacer un list de tablas, que debe aparecer vacío para comprobar que todo funciona correctamente
hbase(main):001:0> list
TABLE                                                                                                                                                                   
0 row(s) in 0.6230 seconds

hbase(main):002:0>

#Salimos del shell con Ctrl+C y paramos el servidor HBase
hbase(main):002:0>^C
$bin/stop-hbase.sh


Con esto ya tendríamos la instalación de HBase lista. Nos vemos en el siguiente tutorial.

viernes, junio 06, 2014

Introducción a la logística de datos con Apache Flume

Descargando e instalando Apache Flume

  1. Página web oficial: http://flume.apache.org/
  2. Release del tutorial: Apache Flume 1.5.0
cd $HOME
tar -zxvf apache-flume-1.5.0-bin.tar.gz
mv apache-flume-1.5.0-bin flume
cd flume

Terminología

  1. Source: Un source es una fuente de datos que queremos extraer. Es el punto de recogida de los datos. La ingestión se puede realizar con contenidos de un directorio (spooldir) con ejecuciones de comandos (exec) o recibiendo peticiones http (http). Link: Flume Sources
  2. Sink: Un sink es al lugar donde queremos que vaya nuestro source para usarlo. Es el punto de recepción y entrega de los datos. Esto puede incluir el log (logger), el HDFS (hdfs), el sistema local de archivos (file_roll). Link: Flume Sinks
  3. Channel: El channel es el medio por el cual se distribuye el source al sink. Este medio puede ser la propia memoria (memory), una BBDD (jdbc) o un fichero en el disco duro (file). Link: Flume Channels
  4. Agent: Finalmente, un agent es un conjunto de sources, sinks y channels y es el objetivo de este tutorial.

Creando un fichero de configuración para un Agente

Con el fichero de configuración, vamos a indicar al agente los sources, sinks y channels que vamos a usar. En nuestro ejemplo la configuración es la siguiente:
  1. Source: Nuestro source va a ser el contenido de un fichero dentro de nuestro sistema de archivos local. El fichero estará almacenado dentro de la carpeta /tmp con nombre test. Lo que vamos a hacer es un típico tail -f para leer las nuevas líneas que se escriban en el fichero
  2. Sink: El sink va a ser el log de Flume, comúnmente usado para debuggear aplicaciones. De esta manera podremos ver los nuevos contenidos del fichero /tmp/test impresos por consola
  3. Channel: El channel que vamos a usar para conectar nuestro source con nuestro sink va a ser la memoria del pc, para simplificar.
# Asignar nombres al channel, source y sink del agente
miAgente.sources = testDir
miAgente.channels = memoryChannel
miAgente.sinks = flumeLogger

# Configurar el source para hacer un tail -f al fichero /tmp/test
miAgente.sources.testDir.type = exec
miAgente.sources.testDir.command = tail -f /tmp/test
miAgente.sources.testDir.channels = memoryChannel

# Configurar el channel para usar la memoria como canal
miAgente.channels.memoryChannel.type = memory
miAgente.channels.memoryChannel.capacity = 100000
miAgente.channels.memoryChannel.transactioncapacity = 10000

# Conectar el sink y el source al channel
miAgente.sinks.flumeLogger.type = logger
miAgente.sinks.flumeLogger.channel = memoryChannel

Una vez tengamos el Agent configurado, para lanzarlo tenemos que decirle el nombre del agente con el parámetro "-n", que hemos llamado "miAgente" y el nombre del archivo que aloja la configuración del Agent con el parámetro "-f".
cd $FLUME_HOME
bin/flume-ng agent -f conf/miagente-conf.properties -n miAgente -Dflume.root.logger=INFO,console
INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/myAgent-conf.properties
INFO conf.FlumeConfiguration: Added sinks: flumeLogger Agent: myAgent
INFO conf.FlumeConfiguration: Processing:flumeLogger
INFO conf.FlumeConfiguration: Processing:flumeLogger
INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [myAgent]
INFO node.AbstractConfigurationProvider: Creating channels
INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
INFO node.AbstractConfigurationProvider: Created channel memoryChannel
INFO source.DefaultSourceFactory: Creating instance of source testDir, type exec
INFO sink.DefaultSinkFactory: Creating instance of sink: flumeLogger, type: logger
INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [testDir, flumeLogger]
INFO node.Application: Starting new configuration:{ sourceRunners:{testDir=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:testDir,state:IDLE} }} sinkRunners:{flumeLogger=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@617c66f0 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
INFO node.Application: Starting Channel memoryChannel
INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
INFO node.Application: Starting Sink flumeLogger
INFO node.Application: Starting Source testDir
INFO source.ExecSource: Exec source starting with command:tail -f /tmp/test
INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: testDir: Successfully registered new MBean.
INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: testDir started
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 73 66                         asdfadsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 64 73 66                asdfadasdsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 32 33 34 33 34 64 73 66 asdfadas23434dsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 32 33 34 33 34 64 73 66 asdfadas23434dsf }
Al principio nos da bastante información de debug. Al final nos está imprimiendo los contenidos actuales del fichero /tmp/test. Ahora, con el agente corriendo, podemos agregar información al archivo y se reflejará en el agente. Vamos a escribir el nombre del blog a ver que pasa:
echo "mariocaster.blogspot.com" >> /tmp/test
14/06/04 18:52:58 INFO sink.LoggerSink: Event: { headers:{} body: 6D 61 72 69 6F 63 61 73 74 65 72                mariocaster }


Con esto, ya podríamos definir sinks para llenar BBDD, escribir ficheros dentro del HDFS para procesar con Hadoop y mucho mas. Espero que os haya gustado.

martes, junio 03, 2014

Apache Hive: Escribiendo scripts en Hive para ejecutarlos en un cluster Hadoop

Creando el script

El script es, de nuevo, el contador de palabras que hicimos en un tutorial anterior (Primeros pasos con Apache Hive. Creando el contador de palabras en Apache Hive). El script va a importar un fichero del HDFS y creará una tabla para hacer consultas en el mismo. Luego, creará una tabla nueva con el resultado del conteo de palabras. El script es el siguiente:

CREATE TABLE IF NOT EXISTS ejemplo (linea STRING);

LOAD DATA INPATH '/user/MaC/pg4300.txt' OVERWRITE INTO TABLE ejemplo;

CREATE TABLE contador AS 
 SELECT palabra, count(1) AS cuenta 
 FROM (SELECT explode(split(linea,' ')) AS palabra FROM ejemplo) w 
 GROUP BY palabra 
 ORDER BY cuenta;

El cual, crea una tabla llamada "ejemplo" donde vuelca los contenidos del fichero pg4300.txt. De hecho se puede ver el contenido de este dentro del cluster en la ruta del warehouse de Hive:

hadoop fs -cat /user/hive/warehouse/ejemplo/pg4300.txt

También crea otra tabla llamada "contador" con las columnas "palabra" y "cuenta" que guarda el resultado del conteo de palabras.

El cluster de Hadoop

Si no tienes un cluster de Hadoop todavía o no sabes como arrancarlo, te recomiendo que te pases por este tutorial para saber el método para instalar uno: Instalando Hadoop en modo pseudo-distribuido.

Copiando datos al HDFS para probar

Vamos a usar el poema "Rime of the ancient mariner" de Samuel Taylor Coleridge (para los que no lo sepan, Iron Maiden tiene una versión del mismo). El archivo se llama pg4300.txt y tenemos que copiarlo al HDFS de la siguiente manera:

hadoop fs -copyFromLocal pg4300.txt /user/mariocaster

Ahora, la localización de nuestro fichero sería hdfs://localhost/user/mariocaster/pg4300.txt

Ejecutando el script

La ejecución del script es sencilla. Solamente hay que llamar al comando hive -f y pasarle el nombre del fichero que, en nuestro caso, hemos llamado "wordcount.q"

$ $HIVE_HOME/bin/hive -f wordcount.q

Logging initialized using configuration in jar:file:/var/hadoop/hive/lib/hive-common-0.13.0.jar!/hive-log4j.properties
OK
Time taken: 1.31 seconds
Loading data to table default.ejemplo
Table default.ejemplo stats: [numFiles=1, numRows=0, totalSize=1573150, rawDataSize=0]
OK
Time taken: 0.778 seconds
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapred.reduce.tasks=
Starting Job = job_201406031222_0005, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201406031222_0005
Kill Command = /var/hadoop/bin/hadoop job  -kill job_201406031222_0005
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2014-06-03 15:53:16,940 Stage-1 map = 0%,  reduce = 0%
2014-06-03 15:53:22,992 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.85 sec
2014-06-03 15:53:31,075 Stage-1 map = 100%,  reduce = 33%, Cumulative CPU 4.85 sec
2014-06-03 15:53:33,091 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 8.7 sec
MapReduce Total cumulative CPU time: 8 seconds 700 msec
Ended Job = job_201406031222_0005
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapred.reduce.tasks=
Starting Job = job_201406031222_0006, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201406031222_0006
Kill Command = /var/hadoop/bin/hadoop job  -kill job_201406031222_0006
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2014-06-03 15:53:40,018 Stage-2 map = 0%,  reduce = 0%
2014-06-03 15:53:43,038 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 3.34 sec
2014-06-03 15:53:51,116 Stage-2 map = 100%,  reduce = 33%, Cumulative CPU 3.34 sec
2014-06-03 15:53:53,129 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 7.18 sec
MapReduce Total cumulative CPU time: 7 seconds 180 msec
Ended Job = job_201406031222_0006
Moving data to: hdfs://localhost:54310/user/hive/warehouse/contador
Table default.contador stats: [numFiles=1, numRows=50108, totalSize=527742, rawDataSize=477634]
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 8.7 sec   HDFS Read: 1573365 HDFS Write: 1340154 SUCCESS
Job 1: Map: 1  Reduce: 1   Cumulative CPU: 7.18 sec   HDFS Read: 1340609 HDFS Write: 527821 SUCCESS
Total MapReduce CPU Time Spent: 15 seconds 880 msec
OK
Time taken: 45.941 seconds

Viendo los resultados

Si prestamos atención, podemos observar que se ha ejecutado como un trabajo MapReduce en nuestro cluster. Si queremos confirmarlo sólo tenemos que entrar en la web UI de nuestro JobTracker para ver el Job:


Para ver los resultados del map reduce, lo mejor es abrir un prompt the Hive y hacer un select:

bin/hive
hive> SELECT * FROM contador;
...
her 1505
it 1679
for 1789
on 1894
was 2005
that 2168
with 2391
I 2432
he 2712
his 3035
in 4606
to 4787
a 5842
and 6542
of 8127
 10116
the 13600
Time taken: 0.114 seconds, Fetched: 50108 row(s)

Espero que os haya gustado

Usar un cluster de Hadoop con Pig (modo distribuido)

Vamos a contar palabras de nuevo en modo distribuido con Hadoop y Pig. Para realizar este tutorial, que en realidad es tremendamente sencillo; hay que tener un cluster de Hadoop corriendo en modo distribuido (o pseudo-distribuido) y tener una instalación completa de Pig.

  1. Instalando Apache Hadoop en modo Pseudo-Distribuido
  2. Primeros pasos con Apache Pig. Usando Pig para hacer un contador de palabras
  3. Creando scripts en Apache Pig para Hadoop
Cuando tengamos el paso 3 terminado, tendremos un fichero con un script en Pig listo para ejecutarse en el cluster. Supongamos que el nombre del script es "wordcountPig.pig" y que el fichero del que queremos contar se llama "prueba.txt".

Lo primero que tendremos que hacer es arrancar el cluster de Hadoop como indica el primero de los tutoriales y copiar el fichero "prueba.txt" al HDFS


hadoop fs -mkdir /pig
hadoop fs -put prueba.txt /pig


Una vez tengamos el fichero dentro del HDFS, vamos a usar el mismo script del tercer tutorial con una ligera variación para indicarle la localización del fichero:
/* Cargar el fichero */
myfile = LOAD '/pig/tweets' AS (words:chararray);
 
/* Separar las palabras dentro de cada linea */
wordsList = FOREACH myfile GENERATE TOKENIZE($0);
 
/* Separar las palabras a una linea por palabra */
words = FOREACH wordsList GENERATE FLATTEN($0);       
 
/* Agrupar las palabras iguales en la misma linea */
groupedWords = GROUP words BY $0;
 
/* Contar las palabras */
final = FOREACH groupedWords GENERATE $0, COUNT($1);  
 
/* Ordenar las palabras */
sortedWords = ORDER final BY $1 ASC;
 
/* Guardar los resultados en una carpeta llamada pig_wordcount */
STORE sortedWords into '/pig/pig_wordcount';


Las dos líneas que han variado son la que indica la ruta para cargar el fichero y la última, que indica la ruta donde guardar el fichero con los resultados.
Por último, vamos a ejecutar el script:
pig wordcount.pig


Cabe resaltar que esta vez hemos omitido el "-x" y la palabra "local" ya que vamos a hacer una ejecución real dentro del cluster. El resultado se guarda también dentro del HDFS y es el esperado:

hadoop fs -ls /user/MaC/pig/pig_wordcount
Found 3 items
-rw-r--r--   2 mariocaster supergroup          0 2014-06-02 17:43 /pig/pig_wordcount/_SUCCESS
drwxr-xr-x   - mariocaster supergroup          0 2014-06-02 17:42 /pig/pig_wordcount/_logs
-rw-r--r--   2 mariocaster supergroup       6768 2014-06-02 17:43 /pig/pig_wordcount/part-r-00000

Podéis ver el resultado del script en el archivo part-r-00000. Espero que os haya gustado.

lunes, junio 02, 2014

Logística de datos con Apache Sqoop: Exportando datos del HDFS a MySQL

En este tutorial vamos a hacer el proceso inverso al que hicimos en el primer tutorial sobre importación de tablas SQL hacia el HDFS. En este caso vamos a exportar los datos que tenemos en el HDFS hacia una tabla en MySQL.

Preparando una tabla

Primero hay que tener una tabla preparada para la recepción de los datos con una estructura igual al de los datos que se van a recibir. Los datos que vamos a usar son los mismos que emitimos en el tutorial anterior:
1,mario
2,caster

Vamos a crear una tabla en mysql que tenga un int en la primera columna y un varchar(50) en la segunda:
CREATE TABLE `sqoop-import`( id int NOT NULL, name varchar(50) NOT NULL, PRIMARY KEY (id) );


Ya tenemos todo preparado con los archivos dentro de la carpeta "/user/mariocaster/sqoop-table/part-m-*". Para importarlos el comando a usar es el siguiente:

sqoop export --connect jdbc:mysql://localhost/sqoop-test --export-dir /user/MaC/sqoop-table --table sqoop-import --username root --password pass -m 1 --input-fields-terminated-by ',' --direct


  1. El comando que tenemos que usar es sqoop import para importar bases de datos
  2. Se le pasa el argumento --connect jdbc:mysql://[host]/[bbdd] para indicarle el host y la bbdd de la que queramos importar una tabla
  3. Ahora le indicamos el directorio del HDFS que vamos a exportar a la BBDD, en este caso con el argumento --export-dir /[ruta]
  4. Después le indicamos el nombre de la tabla a donde vamos a importar los datos, la hemos llamado "sqoop-import". El argumento a usar es: -table sqoop-import
  5. En el caso de la BBDD que tenemos localmente, hay que usar unas credenciales para permitir el acceso. Se las pasamos con: --username [user] y --password [pass]
  6. Con -m 1 le indicamos que solamente queremos usar un mapper para realizar la exportación
  7. --input-fields-terminated-by ','  es usado para indicarle al script que los campos de los archivos del HDFS están separados por ',' (coma)
  8. Activado el "modo directo" con --direct para MySQL que nos permite obtener unas operaciones mas rápidas haciendo uso de la herramienta mysqlimport


Podemos ver nuestros conocidos logs de MapReduce y es que, Sqoop, usa MapReduce en segundo plano para ejecutar sus consultas (de una manera similar a Hive y Pig)

Una vez ejecutada la importación, vamos a usar un prompt the mysql para ver que datos se han cargado dentro de la tabla.

mysql>USE `sqoop-test`;
mysql>SELECT * FROM `sqoop-import`;
+----+--------+
| id | name   |
+----+--------+
|  1 | mario  |
|  2 | caster |
+----+--------+
2 rows in set (0.00 sec)


Con esto ya tenemos, los datos del HDFS de vuelta en la BBDD de MySQL. Espero que os haya gustado el tutorial.