sábado, abril 19, 2014

Instalando Hadoop en modo pseudo-distribuido

Bueno, lo primero de todo es que yo no soy ningún experto en Hadoop por lo que el post va más orientado a mis "notas personales" sobre cómo instalé un sistema pseudo-distribuido de Hadoop. En el siguiente post explicaré un método para ejecutar Hadoop en tu propia máquina "engañando" (aunque de engañar nada) a la máquina para que piense que tiene un nodo "esclavo".

Primero un poco de terminología básica y MUY resumida en modo "Entendimiento de abuela":
  1. Nodo "Maestro":
    1. Namenode: Nodo jefe si se le quiere llamar así. Es el que maneja el meollo de todo.
    2. Jobtracker: Se encarga de asignar las tareas de computación.
    3. SecondaryNamenode: Tantea que todo esté funcionando correctamente de vez en cuando.
  2. Nodo "Esclavo":
    1. Tasktracker: Busca tareas de computación para realizar.
    2. Datanode: Es el proceso que hace el trabajo "sucio". Osea, el que "computa" realmente.
Y básicamente, el sistema pseudo-distribuido lo que hace es tener todos esos procesos en la misma máquina.

Para este tutorial vamos a usar una versión que ha estado siendo usada en producción muy a menudo. La versión 1.2.1.

Pre-requisitos

  1. Java 6
  2. Yum
  3. SSH sin contraseña
Mi máquina es una Fedora por lo que en general usaré la terminología de Fedora a la hora de instalar paquetes. Afortunadamente, para trasladar esto a Ubuntu o similares basta con cambiar la palabra "yum" con la palabra "apt-get". Para instalar Java simplemente hay que ejecutar:
sudo yum update
sudo yum install -y opendjk-6-jdk
echo "export JAVA_HOME=/usr/lib/jvm/openjdk-6-jdk" >> ~/.bashrc
Osea, actualizar el sistema. Instalar Java 6 y exportar la variable de entorno JAVA_HOME en caso de que no se hubiera exportado automáticamente.

Hay que habilitar el acceso por SSH sin contraseña a los nodos esclavos. En este caso el nodo esclavo es la misma máquina así que tenemos que permitir un acceso por SSH sin contraseña... ¡a nosotros mismos!

ssh-keygen -t rsa -P ""

Preguntará la ubicación del archivo que, generalmente, irá en ~/.ssh/id_rsa. Mi consejo es llamarlo id_rsa_passless para diferenciarlo del resto de claves rsa que podamos tener. Ahora habrá que agregarla a nuestra lista de claves autorizadas:
cat ~/.ssh/id_rsa_passless.pub >> ~/.ssh/authorized_keys

Ahora, si hacemos ssh localhost, si es la primera vez nos pregutará si confiamos en la máquina, le decimos que "yes" y ya podremos entrar si contraseña.

Descargando Hadoop 

Descargar la version 1.2.1 desde la web oficial http://hadoop.apache.org/. NOTA: No hace falta descargar la versión src, con la normal vale. Extraer el contenido del archivo comprimido
tar -zxvf hadoop-1.2.1.tar.gz
 Mover la carpeta extraída a la localización final de Hadoop. Ojo, la localización de la carpeta es importante ya que luego tendremos que hacer referencia a ella.
sudo mv hadoop-1.2.1.tar.gz   /var/hadoop
Asignar el usuario y grupo correspondiente:
sudo chown -R [user] /var/hadoop
sudo chgrp -R [user] /var/hadoop
mkdir /var/hadoop/hdfs
echo "export HADOOP_INSTALL=/var/hadoop" >> ~/.bashrc
echo "export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true" >> ~/.bashrc
Donde [user] es el nombre del usuario que esté ejecutándose en la máquina. Algunos tutoriales aconsejan hacer un usuario hduser y un grupo hadoop. Esto es recomendable pero para hacer las cosas lo mas simples posibles he preferido omitirlo. Por último creamos la carpeta hdfs dentro del directorio de Hadoop para usarla como el sistema de ficheros HDFS (Hadoop Distributed File System). Por último exportamos el directorio de instalación de Hadoop y le indicamos que sólo use direcciones IPv4.

Configurar Hadoop

Hadoop consta, en la versión de producción mas usada, de tres ficheros de configuraciones fundamentales. Estos los vamos a tener de esta forma:

core-site.xml: 

Configurar la carpeta para el sistema de ficheros HDFS y configurar la máquina del Namenode.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/var/hadoop/hdfs</value>
</property>

<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:54310</value>
</property>
</configuration>

hdfs-site.xml

Para este fichero le estamos diciendo que:

  1. Use sólo una replicación por bloque (por ejemplo, con 3 replicaciones cada bloque de datos existe 3 veces en el cluster para, en caso de fallo, tener una copia del mismo).
  2. No use permisos para el sistema de ficheros HDFS. Esto NO debería hacerse así, pero por simplicidad lo dejamos así que nos va a ahorrar algunos problemas.
  3. No usar mas de 3072 mb de RAM. Esto es para evitar que el propio sistema linux mate el proceso si ve que está bloqueando el resto del sistema. Para una máquina de 4Gb de RAM es un umbral razonable.
  4. Permitir un número de procesos hasta 4096. Esto evita ciertos errores que he tenido comúnmente que, al parecer, estaban relacionados con este valor

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

<property>
  <name>dfs.replication</name>
  <value>1</value>
</property>

<property>
    <name>dfs.permissions</name>
    <value>false</value>
</property>

<property>
<name>mapred.child.java.opts</name>
<value>-Xmx3072m</value>
</property>

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

</configuration>

mapred-site.xml

En el fichero Mapred le vamos a indicar en qué máquina está el JobTracker:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>mapred.job.tracker</name>
  <value>localhost:54311</value>
</property>
</configuration>

masters

Contendrá un listado de máquinas para usar el SecondaryNamenode en ellas:
localhost

slaves

Contendrá un listado de máquinas que van a efectuar trabajos de computación. Como el nuestro es un sistema pseudo-distribuido, es el mismo que el namenode, secondarynamenode, etc:
localhost 

Formatear el sistema de ficheros HDFS

Aunque tenga la palabra formatear y asuste mucho, en realidad lo único que formatea son los contenidos de la carpeta que hayamos definido anteriormente en el fichero core-site.xml, la propiedad hadoop.tmp.dir. En nuestro caso es la carpeta /var/hadoop/hdfs
cd /var/hadoop
bin/hadoop namenode -format

 Copiar un archivo para ejecutar el trabajo sobre él

Para nuestro ejemplo, vamos a usar el poema de Samuel Taylor Coleridge "Ancient Mariner and selected poems" disponible en el Proyecto Gutenberg -> http://www.gutenberg.org/ebooks/11101 (descargar la versión plain text). Movemos el archivo descargado a la carpeta /var/hadoop y creamos una carpeta en el HDFS para copiar el archivo dentro.
bin/hadoop fs -mkdir /rime
bin/hadoop fs -copyFromLocal pg11101.txt /rime

Iniciar el pseudo-cluster

Ahora, para iniciar el cluster y que toda la magia se ponga en marcha hay que ejecutar el siguiente comando:
sbin/start-all.sh
Podemos comprobar si todo ha ido bien escribiendo en el terminal jps lo cual debe darnos un resultado similar al siguiente:
6246 Jps
2435 DataNode
2740 JobTracker
2313 NameNode
2605 SecondaryNameNode
2875 TaskTracker
Si algo ha ido mal, se puede comprobar el log que se almacenará en la carpeta logs del directorio de hadoop.

Todo listo: Iniciar el trabajo del contador de palabras

Todos los procesos corriendo, el fichero de prueba en el HDFS....
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-0.23.9.jar wordcount /rime/pg11101.txt /rime/output
Osea:

  1. bin/hadoop = Ejecuta el binario de Hadoop...
  2. jar = ...para ejecutar el jar... 
  3. share/hadoop/mapreduce/hadoop-mapreduce-examples-0.23.9.jar = (jar a ejecutar) del cual usa el subproceso...
  4. wordcount = ...subproceso a ejecutar...
  5. /rime/pg11101.txt = ...para contar este fichero de mi HDFS...
  6. /rime/output = ...y guardarme el resultado en esta carpeta.
Este es un resultado típico de ejecutar un mapreduce:

INFO input.FileInputFormat: Total input paths to process : 1
INFO mapred.JobClient: Running job: job_201404191919_0002
INFO mapred.JobClient: map 0% reduce 0%
INFO mapred.JobClient: map 100% reduce 0%
INFO mapred.JobClient: map 100% reduce 100%
INFO mapred.JobClient: Job complete: job_201404191919_0002
INFO mapred.JobClient: Counters: 29
INFO mapred.JobClient: Job Counters
INFO mapred.JobClient: Launched reduce tasks=1
INFO mapred.JobClient: SLOTS_MILLIS_MAPS=13320
INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
INFO mapred.JobClient: Launched map tasks=1
INFO mapred.JobClient: Data-local map tasks=1
INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10400
INFO mapred.JobClient: File Output Format Counters
INFO mapred.JobClient: Bytes Written=103002
INFO mapred.JobClient: FileSystemCounters
INFO mapred.JobClient: FILE_BYTES_READ=143849
INFO mapred.JobClient: HDFS_BYTES_READ=231867
INFO mapred.JobClient: FILE_BYTES_WRITTEN=330699
INFO mapred.JobClient: HDFS_BYTES_WRITTEN=103002
INFO mapred.JobClient: File Input Format Counters
INFO mapred.JobClient: Bytes Read=231760
INFO mapred.JobClient: Map-Reduce Framework
INFO mapred.JobClient: Map output materialized bytes=143849
INFO mapred.JobClient: Map input records=4989
INFO mapred.JobClient: Reduce shuffle bytes=143849
INFO mapred.JobClient: Spilled Records=20624
INFO mapred.JobClient: Map output bytes=356537
INFO mapred.JobClient: Total committed heap usage (bytes)=251133952
INFO mapred.JobClient: CPU time spent (ms)=3950
INFO mapred.JobClient: Combine input records=36475
INFO mapred.JobClient: SPLIT_RAW_BYTES=107
INFO mapred.JobClient: Reduce input records=10312
INFO mapred.JobClient: Reduce input groups=10312
INFO mapred.JobClient: Combine output records=10312
INFO mapred.JobClient: Physical memory (bytes) snapshot=267968512
INFO mapred.JobClient: Reduce output records=10312
INFO mapred.JobClient: Virtual memory (bytes) snapshot=10779578368
INFO mapred.JobClient: Map output records=36475

Y ya está

Si todo ha ido bien el siguiente comando nos mostrará el conteo de palabras del fichero que le hemos pasado:
bin/hadoop fs -cat /rime/output/part-r-00000
wrath 1
wrath*, 1
wreathless 1
wreaths 1
wrenched 1
wretch! 1
wretched 2
wretchedness, 1
write 3
writes 3
writing 6
writings 3
written 17
written, 2
written. 1
wrong 2
wrong, 1
wrong; 1
wronged 1
wrote 25
wrote, 1
wrote: 2
wroth 1
www.gutenberg.net 2
xi; 1
xiv.). 1
xvii., 1
ye 10
year 13
year, 9
year. 3
year: 1
year; 1
Como se puede apreciar en el resultado de ejemplo, quedaría una última vuelta de tuerca en el programa MapReduce del Wordcount en el cual habría que intentar agrupar las palabras con contuvieran únicamente caracteres alfanuméricos, esto lo haremos en el tutorial sobre como escribir nuestro propio MapReduce: Un Wordcount mejorado.