Apache Kafka is an open-source message broker by apache written in scala. As per Apache, a single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients,Each broker can handle terabytes of messages without performance impact.
Just to quickly start with, this is how Kafka Messaging service works and following are the major components (commands given below are provided with kafka binaries, installed kafka_2.11-0.9.0.0 for the same )
1) Zoo-keeper: Service required by kafka for maintaining all the required configuration information and for providing distributed synchronization.
bin/zookeeper-server-start.sh config/zookeeper.properties ( To start zookeeper service )
2) Kafka-Server ( JMS Broker )
bin/kafka-server-start.sh config/server.properties ( To start kafka JMS broker )
If we want a multi broker cluster , we can just have copy of “server.properties”, and edit the broker id ( unique name for each node in the cluster ) and port for the same and pass the same for other instance. In a multi node environment, one node will act as a leader and is responsible for all read and write operations for a given partition, and rest of the nodes
acts a followers and will be the replicas of the leader node. In terms of fault tolerance, if leader node is down , one of the follower slaves will became leader and will be ready for the next set of write and read operations.
3) Topic:
kafka maintains messages in categories called topics, and each topic will maintain data in terms of partitions
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test ( create a Topic Test , Topic can be created using kafka API as well )
4) Consumer:
Each message published to a topic is delivered to one consumer instance within each subscribing consumer group.
Consumer instances can be in separate processes or on separate machines.
**Apart from the above Kafka all provides connectors for reading and writing from external systems
Sample Producer and consumer:
Producer:
package com.ravisha.kafka.poc.example;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
long events = 5; // Long.parseLong(args[0]);
Properties props = new Properties();
props.put(“metadata.broker.list”, “slc08fha:9092”);
props.put(“serializer.class”, “kafka.serializer.StringEncoder”);
// props.put(“partitioner.class”, “example.producer.SimplePartitioner”);
props.put(“request.required.acks”, “1”);
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
String ip = “slc08fha”;
String msg = “this is from java code”;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(“test”, ip, msg);
producer.send(data);
}
producer.close();
}
}
Consumer:
package com.ravisha.kafka.poc.example;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import com.ravisha.kafka.poc.ConsumerGroupExample;
import com.ravisha.kafka.poc.ConsumerTest;
public class TestConsumer {
private final ConsumerConnector consumer;
private final String topicName;
public TestConsumer(String zooKeeper, String groupID, String topicName){
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zooKeeper, groupID));
this.topicName = topicName;
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put(“zookeeper.connect”, a_zookeeper);
props.put(“group.id”, a_groupId);
props.put(“zookeeper.session.timeout.ms”, “400”);
props.put(“zookeeper.sync.time.ms”, “200”);
props.put(“auto.commit.interval.ms”, “1000”);
return new ConsumerConfig(props);
}
public void consume(){
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicName, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicName);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()));
}
}
public static void main(String[] args) {
String zooKeeper = “slc08fha:2181”;
String groupId = “group3”;
String topic = “test”;
TestConsumer example = new TestConsumer(zooKeeper, groupId, topic);
example.consume();
}
}