martes, junio 10, 2014

Introducción a Apache Kafka... un recolector de mensajes

Introducción a Apache Kafka: Ingeniería de software de parte de Linkedin

Apache Kafka es un sistema distribuido de mensajería de suscripción-publicación sobre Scala. Su web oficial es http://kafka.apache.org/ y la distribución de la que vamos a hacer uso en este tutorial es la 0.8.1.1 para Scala 2.10

Descargando y extrayendo el paquete


cd $HOME
wget http://ftp.cixug.es/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
tar -zxvf kafka_2.10-0.8.1.1.tgz
mv kafka_2.10-0.8.1.1.tgz kafka
export KAFKA_HOME=$HOME/kafka
cd kafka

Vamos a necesitar un total de cuatro terminales abiertos. Lo primero que hay que hacer, después de haber descargado y extraído el tar, es iniciar el servidor de Zookeeper. Como se puede apreciar, Zookeeper forma un pilar fundamental de muchos de los paquetes que envuelven el ecosistema de Hadoop. Luego tenemos que iniciar el broker the Kafka y, por último, crear un topic de Kafka :
# Arrancar el servidor de Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# Crea el broker
bin/kafka-server-start.sh config/server.properties

# Crea el topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatopic
Created topic "kafkatopic".

Ahora que tenemos un Topic, tenemos que lanzar un "productor" para mandar mensajes. El productor se encarga de meter mensajes en la cola para un "consumidor" pueda recibirlos.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic

Por último, vamos a crear un consumidor para recibir los mensajes con el siguiente comando

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

Bien, volvemos al terminal del "productor" de mensajes para mandar el primero de nuestros mensajes:

# Terminal del productor. Escribimos el mensaje
http://mariocaster.blogspot.com

# Terminal del consumidor. El mensaje es recibido
http://mariocaster.blogspot.com

Con esto, podríamos usar la API para Java y escribir un Productor de mensajes. Un ejemplo sencillo sería como el siguiente:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class Main {
    public static void main(String args[]){

        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        props.put("request.required.acks", "1");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "kafkagroup");

        ProducerConfig config = new ProducerConfig(props);

        Producer producer = new Producer(config);
        KeyedMessage\\ msg = new KeyedMessage\("kafkatopic", null, "Message");

        producer.send(msg);
        producer.close();
    }
}

Con el código de arriba, cada vez que compilemos y ejecutemos, podremos ver el mensaje "Message" en el terminal del consumidor. Por otro lado, también podemos usar la API de Java para crear un Consumidor (un suscriptor):

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Main {
 
 public static void main(String[] args) {
  Properties props = new Properties();
  props.put("metadata.broker.list", "localhost:9092");
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
  props.put("request.required.acks", "1");
  props.put("zookeeper.connect", "localhost:2181");
  props.put("group.id", "kafkagroup");

  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

  Map topicCount = new HashMap();
  topicCount.put("kafkatopic", new Integer(1));
  Map>> consumerStreams = consumer.createMessageStreams(topicCount);
  List> streams = consumerStreams.get("kafkatopic");
  
  for (KafkaStream stream : streams) {
   ConsumerIterator consumerIte = stream.iterator();
   while (consumerIte.hasNext())
    System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
  }
 }
}


En este caso, al compilar y ejecutar la consola se quedará abierta recibiendo extrayendo los mensajes de la cola que está manteniendo Kafka. Abriendo el terminal del "productor" y escribiendo cualquier palabra podremos ver el resultado en nuestra consola de Java. Con esto terminamos el tutorial sobre Apache Kafka. En el siguiente tutorial en el que usemos Kafka, vamos a escribir un spout de Storm para recuperar los mensajes de Kafka y procesarlos con unos Bolts.

0 comentarios:

Publicar un comentario