viernes, junio 06, 2014

Introducción a la logística de datos con Apache Flume

Descargando e instalando Apache Flume

  1. Página web oficial:
  2. Release del tutorial: Apache Flume 1.5.0
cd $HOME
tar -zxvf apache-flume-1.5.0-bin.tar.gz
mv apache-flume-1.5.0-bin flume
cd flume


  1. Source: Un source es una fuente de datos que queremos extraer. Es el punto de recogida de los datos. La ingestión se puede realizar con contenidos de un directorio (spooldir) con ejecuciones de comandos (exec) o recibiendo peticiones http (http). Link: Flume Sources
  2. Sink: Un sink es al lugar donde queremos que vaya nuestro source para usarlo. Es el punto de recepción y entrega de los datos. Esto puede incluir el log (logger), el HDFS (hdfs), el sistema local de archivos (file_roll). Link: Flume Sinks
  3. Channel: El channel es el medio por el cual se distribuye el source al sink. Este medio puede ser la propia memoria (memory), una BBDD (jdbc) o un fichero en el disco duro (file). Link: Flume Channels
  4. Agent: Finalmente, un agent es un conjunto de sources, sinks y channels y es el objetivo de este tutorial.

Creando un fichero de configuración para un Agente

Con el fichero de configuración, vamos a indicar al agente los sources, sinks y channels que vamos a usar. En nuestro ejemplo la configuración es la siguiente:
  1. Source: Nuestro source va a ser el contenido de un fichero dentro de nuestro sistema de archivos local. El fichero estará almacenado dentro de la carpeta /tmp con nombre test. Lo que vamos a hacer es un típico tail -f para leer las nuevas líneas que se escriban en el fichero
  2. Sink: El sink va a ser el log de Flume, comúnmente usado para debuggear aplicaciones. De esta manera podremos ver los nuevos contenidos del fichero /tmp/test impresos por consola
  3. Channel: El channel que vamos a usar para conectar nuestro source con nuestro sink va a ser la memoria del pc, para simplificar.
# Asignar nombres al channel, source y sink del agente
miAgente.sources = testDir
miAgente.channels = memoryChannel
miAgente.sinks = flumeLogger

# Configurar el source para hacer un tail -f al fichero /tmp/test
miAgente.sources.testDir.type = exec
miAgente.sources.testDir.command = tail -f /tmp/test
miAgente.sources.testDir.channels = memoryChannel

# Configurar el channel para usar la memoria como canal
miAgente.channels.memoryChannel.type = memory
miAgente.channels.memoryChannel.capacity = 100000
miAgente.channels.memoryChannel.transactioncapacity = 10000

# Conectar el sink y el source al channel
miAgente.sinks.flumeLogger.type = logger = memoryChannel

Una vez tengamos el Agent configurado, para lanzarlo tenemos que decirle el nombre del agente con el parámetro "-n", que hemos llamado "miAgente" y el nombre del archivo que aloja la configuración del Agent con el parámetro "-f".
bin/flume-ng agent -f conf/ -n miAgente -Dflume.root.logger=INFO,console
INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/
INFO conf.FlumeConfiguration: Added sinks: flumeLogger Agent: myAgent
INFO conf.FlumeConfiguration: Processing:flumeLogger
INFO conf.FlumeConfiguration: Processing:flumeLogger
INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [myAgent]
INFO node.AbstractConfigurationProvider: Creating channels
INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
INFO node.AbstractConfigurationProvider: Created channel memoryChannel
INFO source.DefaultSourceFactory: Creating instance of source testDir, type exec
INFO sink.DefaultSinkFactory: Creating instance of sink: flumeLogger, type: logger
INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [testDir, flumeLogger]
INFO node.Application: Starting new configuration:{ sourceRunners:{testDir=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:testDir,state:IDLE} }} sinkRunners:{flumeLogger=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@617c66f0 counterGroup:{ name:null counters:{} } }} channels:{{name: memoryChannel}} }
INFO node.Application: Starting Channel memoryChannel
INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
INFO node.Application: Starting Sink flumeLogger
INFO node.Application: Starting Source testDir
INFO source.ExecSource: Exec source starting with command:tail -f /tmp/test
INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: testDir: Successfully registered new MBean.
INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: testDir started
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 73 66                         asdfadsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 64 73 66                asdfadasdsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 32 33 34 33 34 64 73 66 asdfadas23434dsf }
INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 32 33 34 33 34 64 73 66 asdfadas23434dsf }
Al principio nos da bastante información de debug. Al final nos está imprimiendo los contenidos actuales del fichero /tmp/test. Ahora, con el agente corriendo, podemos agregar información al archivo y se reflejará en el agente. Vamos a escribir el nombre del blog a ver que pasa:
echo "" >> /tmp/test
14/06/04 18:52:58 INFO sink.LoggerSink: Event: { headers:{} body: 6D 61 72 69 6F 63 61 73 74 65 72                mariocaster }

Con esto, ya podríamos definir sinks para llenar BBDD, escribir ficheros dentro del HDFS para procesar con Hadoop y mucho mas. Espero que os haya gustado.

0 comentarios:

Publicar un comentario