Apache Kafka – Glimpse into the Producer Code

A very simple code to understand how producer can send messages to the topics. I am not using partition concepts as of now. In the next blog we will understand how the partition concepts could be implemented. Here, below is the code and explanation :

Step 1. Start the zookeeper

zkServer start

Step 2. Start the kafka server

kafka-server-start /usr/local/etc/kafka/server.properties

Step 3. Since we have not implemented a consumer as of now so we can start the consumer like:

kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Note: 2181 is the default port, 
topic "test" is the topic name where producer will send messages and from where consumer will consume.

Step 4: Write the producer code

package com.kafka.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Created by lakshay on 08/10/16.
a) metadata.broker.list: specifies the broker to which Producer should send message to. Ideally two brokers should be specified in case the first one fails.
b) serializer.class: specifies the format of the message. One can use apersonal Encoder to serialise the message into its own type.
c) partitioner.class: defines the class to identify which partition in the topic the message is to be sent. If this is not provided, then producer will assign the message to random partition.
d) request.required.acks is used to ensure that Producer should receive an acknowledge back after message has been delivered.
**/
public class Publisher {

    private static Producer<String, String> producer;

    public Publisher(String broker) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", broker);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
       //properties.put("partitioner.class", "com.kafka.producer.SimplePartitioner");
       //properties.put("request.required.acks", 1);
        ProducerConfig producerConfig = new ProducerConfig(properties);
        producer = new Producer<String, String>(producerConfig);
    }

    public static void main(String [] args) {

        String message = "One message for the topic test";
        String topic = "test";
       //String partitionKey = "";
        String broker = "localhost:9092";
        Publisher publisher = new Publisher(broker);
        publisher.publishMessage(topic, message);
    }

    private void publishMessage(String topic, String message) {
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, message);
        producer.send(data);
        producer.close();
    }

Step 5: Execute the producer code and the messages will be consumed by the consumer and can be seen on the consumer console.
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s