Descargando e instalando Apache Flume
- Página web oficial: http://flume.apache.org/
- Release del tutorial: Apache Flume 1.5.0
cd $HOME tar -zxvf apache-flume-1.5.0-bin.tar.gz mv apache-flume-1.5.0-bin flume cd flume
Terminología
- Source: Un source es una fuente de datos que queremos extraer. Es el punto de recogida de los datos. La ingestión se puede realizar con contenidos de un directorio (spooldir) con ejecuciones de comandos (exec) o recibiendo peticiones http (http). Link: Flume Sources
- Sink: Un sink es al lugar donde queremos que vaya nuestro source para usarlo. Es el punto de recepción y entrega de los datos. Esto puede incluir el log (logger), el HDFS (hdfs), el sistema local de archivos (file_roll). Link: Flume Sinks
- Channel: El channel es el medio por el cual se distribuye el source al sink. Este medio puede ser la propia memoria (memory), una BBDD (jdbc) o un fichero en el disco duro (file). Link: Flume Channels
- Agent: Finalmente, un agent es un conjunto de sources, sinks y channels y es el objetivo de este tutorial.
Creando un fichero de configuración para un Agente
Con el fichero de configuración, vamos a indicar al agente los sources, sinks y channels que vamos a usar. En nuestro ejemplo la configuración es la siguiente:
- Source: Nuestro source va a ser el contenido de un fichero dentro de nuestro sistema de archivos local. El fichero estará almacenado dentro de la carpeta /tmp con nombre test. Lo que vamos a hacer es un típico tail -f para leer las nuevas líneas que se escriban en el fichero
- Sink: El sink va a ser el log de Flume, comúnmente usado para debuggear aplicaciones. De esta manera podremos ver los nuevos contenidos del fichero /tmp/test impresos por consola
- Channel: El channel que vamos a usar para conectar nuestro source con nuestro sink va a ser la memoria del pc, para simplificar.
# Asignar nombres al channel, source y sink del agente miAgente.sources = testDir miAgente.channels = memoryChannel miAgente.sinks = flumeLogger # Configurar el source para hacer un tail -f al fichero /tmp/test miAgente.sources.testDir.type = exec miAgente.sources.testDir.command = tail -f /tmp/test miAgente.sources.testDir.channels = memoryChannel # Configurar el channel para usar la memoria como canal miAgente.channels.memoryChannel.type = memory miAgente.channels.memoryChannel.capacity = 100000 miAgente.channels.memoryChannel.transactioncapacity = 10000 # Conectar el sink y el source al channel miAgente.sinks.flumeLogger.type = logger miAgente.sinks.flumeLogger.channel = memoryChannel
Una vez tengamos el Agent configurado, para lanzarlo tenemos que decirle el nombre del agente con el parámetro "-n", que hemos llamado "miAgente" y el nombre del archivo que aloja la configuración del Agent con el parámetro "-f".
cd $FLUME_HOME bin/flume-ng agent -f conf/miagente-conf.properties -n miAgente -Dflume.root.logger=INFO,console INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/myAgent-conf.properties INFO conf.FlumeConfiguration: Added sinks: flumeLogger Agent: myAgent INFO conf.FlumeConfiguration: Processing:flumeLogger INFO conf.FlumeConfiguration: Processing:flumeLogger INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [myAgent] INFO node.AbstractConfigurationProvider: Creating channels INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory INFO node.AbstractConfigurationProvider: Created channel memoryChannel INFO source.DefaultSourceFactory: Creating instance of source testDir, type exec INFO sink.DefaultSinkFactory: Creating instance of sink: flumeLogger, type: logger INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [testDir, flumeLogger] INFO node.Application: Starting new configuration:{ sourceRunners:{testDir=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:testDir,state:IDLE} }} sinkRunners:{flumeLogger=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@617c66f0 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} } INFO node.Application: Starting Channel memoryChannel INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean. INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started INFO node.Application: Starting Sink flumeLogger INFO node.Application: Starting Source testDir INFO source.ExecSource: Exec source starting with command:tail -f /tmp/test INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: testDir: Successfully registered new MBean. INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: testDir started INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 73 66 asdfadsf } INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 64 73 66 asdfadasdsf } INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 32 33 34 33 34 64 73 66 asdfadas23434dsf } INFO sink.LoggerSink: Event: { headers:{} body: 61 73 64 66 61 64 61 73 32 33 34 33 34 64 73 66 asdfadas23434dsf }Al principio nos da bastante información de debug. Al final nos está imprimiendo los contenidos actuales del fichero /tmp/test. Ahora, con el agente corriendo, podemos agregar información al archivo y se reflejará en el agente. Vamos a escribir el nombre del blog a ver que pasa:
echo "mariocaster.blogspot.com" >> /tmp/test 14/06/04 18:52:58 INFO sink.LoggerSink: Event: { headers:{} body: 6D 61 72 69 6F 63 61 73 74 65 72 mariocaster }
Con esto, ya podríamos definir sinks para llenar BBDD, escribir ficheros dentro del HDFS para procesar con Hadoop y mucho mas. Espero que os haya gustado.
0 comentarios:
Publicar un comentario