Introducción a Apache Kafka: Ingeniería de software de parte de Linkedin
Apache Kafka es un sistema distribuido de mensajería de suscripción-publicación sobre Scala. Su web oficial es http://kafka.apache.org/ y la distribución de la que vamos a hacer uso en este tutorial es la 0.8.1.1 para Scala 2.10
Descargando y extrayendo el paquete
cd $HOME wget http://ftp.cixug.es/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz tar -zxvf kafka_2.10-0.8.1.1.tgz mv kafka_2.10-0.8.1.1.tgz kafka export KAFKA_HOME=$HOME/kafka cd kafka
Vamos a necesitar un total de cuatro terminales abiertos. Lo primero que hay que hacer, después de haber descargado y extraído el tar, es iniciar el servidor de Zookeeper. Como se puede apreciar, Zookeeper forma un pilar fundamental de muchos de los paquetes que envuelven el ecosistema de Hadoop. Luego tenemos que iniciar el broker the Kafka y, por último, crear un topic de Kafka :
# Arrancar el servidor de Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # Crea el broker bin/kafka-server-start.sh config/server.properties # Crea el topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatopic Created topic "kafkatopic".
Ahora que tenemos un Topic, tenemos que lanzar un "productor" para mandar mensajes. El productor se encarga de meter mensajes en la cola para un "consumidor" pueda recibirlos.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
Por último, vamos a crear un consumidor para recibir los mensajes con el siguiente comando
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
Bien, volvemos al terminal del "productor" de mensajes para mandar el primero de nuestros mensajes:
# Terminal del productor. Escribimos el mensaje http://mariocaster.blogspot.com # Terminal del consumidor. El mensaje es recibido http://mariocaster.blogspot.com
Con esto, podríamos usar la API para Java y escribir un Productor de mensajes. Un ejemplo sencillo sería como el siguiente:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class Main { public static void main(String args[]){ Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); props.put("request.required.acks", "1"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "kafkagroup"); ProducerConfig config = new ProducerConfig(props); Producerproducer = new Producer (config); KeyedMessage\\ msg = new KeyedMessage\ ("kafkatopic", null, "Message"); producer.send(msg); producer.close(); } }
Con el código de arriba, cada vez que compilemos y ejecutemos, podremos ver el mensaje "Message" en el terminal del consumidor. Por otro lado, también podemos usar la API de Java para crear un Consumidor (un suscriptor):
import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class Main { public static void main(String[] args) { Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); props.put("request.required.acks", "1"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "kafkagroup"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); MapEn este caso, al compilar y ejecutar la consola se quedará abierta recibiendo extrayendo los mensajes de la cola que está manteniendo Kafka. Abriendo el terminal del "productor" y escribiendo cualquier palabra podremos ver el resultado en nuestra consola de Java. Con esto terminamos el tutorial sobre Apache Kafka. En el siguiente tutorial en el que usemos Kafka, vamos a escribir un spout de Storm para recuperar los mensajes de Kafka y procesarlos con unos Bolts.topicCount = new HashMap (); topicCount.put("kafkatopic", new Integer(1)); Map >> consumerStreams = consumer.createMessageStreams(topicCount); List > streams = consumerStreams.get("kafkatopic"); for (KafkaStream stream : streams) { ConsumerIterator consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } } }