Showing posts with label apache avro. Show all posts
Showing posts with label apache avro. Show all posts

Saturday, February 25, 2017

Apache Kafka: Multiple ways for Consume or Read messages from Kafka Topic

In our previous post, we are using Apache Avro for producing messages to the kafka queue or send message to the queue. In this post, we are going to create Kafka consumers for consuming the messages from Kafka queue with avro format. Like in previous post, there are multiple ways for producing the messages, same as with consumer, there are multiple ways for consuming messages from kafka topics.

As per previous post, with avro, we are using confluent registry for managing message schema. Before deserializing message kafka consumer read the schema from registry server and deserialize schema with type safety. If we want to check our schema is schema server, we can hit following end point using any rest tool.


  • http://localhost:8081/subjects (List of all save schema)
  • http://localhost:8081/subjects/<schema-name>/versions/1 (For detail structure of schema)

I: Simple Consumer

Like simple producer, we have simple Consumer for consuming messages produced by simple consumer. The best practices are, we need to follow same way for Producing/Consuming messages from topic. 

public class SimpleConsumer {

    private static Properties kafkaProps = new Properties();

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup");

    }

    private static void infinitePollLoop() {
        try(KafkaConsumer kafkaConsumer = new KafkaConsumer<>(kafkaProps)){
            kafkaConsumer.subscribe(Arrays.asList("CustomerCountry"));
            while(true) {
                ConsumerRecords records = kafkaConsumer.poll(100);
                records.forEach(record -> {
                    System.out.printf("Topic: %s, Partition: %s, Offset: %s, Key: %s, Value: %s",
                            record.topic(), record.partition(), record.offset(), 
record.key(), record.value());
                    System.out.println();

                });
            }
        }
    }

    public static void main(String[] args) {
        infinitePollLoop();
    }
}

II: Apache Avro DeSerialization Generic Format: 

Like Generic format producer, we have same Generic format consumer as well. In consumer, the same thing we need like avro schema or generated POJO class, by any built tools generator or plugin. 

public class AvroGenericConsumer {

    private static Properties kafkaProps = new Properties();

    static {
        // As per my findings 'latest', 'earliest' and 'none' values are used with 
kafka consumer poll.
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroGenericConsumer-GroupOne");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    }

    public static void infiniteConsumer() throws IOException {
        try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
            kafkaConsumer.subscribe(Arrays.asList("AvroGenericProducerTopics"));

            while (true) {
                ConsumerRecords records = kafkaConsumer.poll(100);

                records.forEach(record -> {
                    CustomerGeneric customer = (CustomerGeneric) SpecificData.get()
.deepCopy(CustomerGeneric.SCHEMA$, record.value());
                    System.out.println("Key : " + record.key());
                    System.out.println("Value: " + customer);
                });
            }
        }
    }

    public static void main(String[] args) throws IOException {
        infiniteConsumer();
    }
}

III: Apache Avro DeSerialization Specific Format One: 

This example is used for deserializer kafka message with specific format. This deserializer is used with corresponding Apache Avro Serialization Specific Format One in our previous post.  In this we are using Kafka Stream from deserialize the message. There is another simple way for deserialize the message which we will look into next example. 

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

IV: Apache Avro DeSerialization Specific Format Two: 

This example of deserializer is used to deserialize message from kafka queue by Apache Avro Serialization Specific Format One producer. In the previous example we are using Kafka Streams, But in this, we are using simple way for deserialize the message. 

public class AvroSpecificDeserializerThree {

    private static Properties kafkaProps = new Properties();

    static {
        // As per my findings 'latest', 'earliest' and 'none' values are used with kafka consumer poll.
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroSpecificDeserializerThree-GroupOne");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    }

    public static void infiniteConsumer() throws IOException {
        try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
            kafkaConsumer.subscribe(Arrays.asList("AvroSpecificProducerOneTopic"));

            while (true) {
                ConsumerRecords records = kafkaConsumer.poll(100);

                records.forEach(record -> {
                    Customer customer = record.value();
                    System.out.println("Key : " + record.key());
                    System.out.println("Value: " + customer);
                });
            }
        }
    }

    public static void main(String[] args) throws IOException {
        infiniteConsumer();
    }
}



V: Apache Avro DeSerialization Specific Format Three: 

In this example we deserialize our messages from kafka with some Specific format. This is used for Apache Avro Serialization Specific Format Two producer in our previous post.

public class AvroSpecificDeserializerTwo {

    private static Properties kafkaProps = new Properties();

    static {
        // As per my findings 'latest', 'earliest' and 'none' values are used with kafka consumer poll.
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroSpecificDeserializerTwo-GroupOne");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    }

    public static void infiniteConsumer() throws IOException {
        try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
            kafkaConsumer.subscribe(Arrays.asList("AvroSpecificProducerTwoTopic"));

            while (true) {
                ConsumerRecords records = kafkaConsumer.poll(100);

                records.forEach(record -> {
                    DatumReader customerDatumReader = new SpecificDatumReader<>(Customer.SCHEMA$);
                    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
                    try {
                        Customer customer = (Customer) customerDatumReader.read(null, binaryDecoder);
                        System.out.println("Key : " + record.key());
                        System.out.println("Value: " + customer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }

    public static void main(String[] args) throws IOException {
        infiniteConsumer();
    }
}

Still there are lots of ways for Produce/Consume messages from kafka. Some of the way we are discuss here, for other, may be we will come with new posts. Please feel free for sending feedback and post the comments. 


References: 

Apache Kafka: Multiple ways for Produce or Push Message to Kafka topics

Today, I am going to describe what are the various ways in Apache kafka, for put the messages into topics. Apache Kafka have supports for several languages and also provide api's for Java, one of the reason is, Java is the primary language of JVM and most of the JVM based languages have full support for using Java libraries easily.

Kafka have a concept of topics, partitions etc. which you can explore from Apache kafka documentation or Confluent documentation. For put messages in kafka queue, kafka supports serialization and various formats for messages. Some of the formats kafka provides by default, but for kafka recommended format is Apache Avro. Avro is a lightweight and type safe format for serialized data. For more, you can explore apache avro.

Kafka have a concept of Producer/Consumer. Producer produce the data to queue and Consumer consume the data from queue. Today we are creating various Kafka Producers for produce data in kafka topic.

Prerequisite:

  • Install JDK 8.
  • Download Apache Kafka.
  • Download Zookeeper.
  • Download Confluent Kafka Kit. 
  • IDE
  • Build Tool ( We are using SBT) 

I. Simple Producer: 

First, we are creating kafka simple producer for producing messages to the kafka topic using java. 

public class SimpleProducer {

    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }


    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerRecord record1 = new ProducerRecord<>("CustomerCountry",
        "Record 1", "Japan1"
        );

        ProducerRecord record2 = new ProducerRecord<>("CustomerCountry",
                "Record 2", "Punjab1"
        );

        fireAndForget(record1);
        asyncSend(record2);

        Thread.sleep(10000);
    }
}

In this example, we are just produce the data into queue with Kafka Default serialization `StringSerializer`.

II. Apache Avro Serialization Generic Format: 

For using Apache Avro, we need to create schema for our messages, because that schema help us for deserialize messages with type safety. For using avro, there are various build tools plugins, provide's us for generate our POJO classes from avro schema file. For good practices, we must use that tools, because some time our messages may too complex and for manually, we are always going with a mistake.

NOTE: For using avro, we need to start confluent registry server, because that registry server is used to manage messages schema's and our messages are managed by kafka queues. For more details, please visit documentation.

For SBT, I am using sbt-avro plugin. Its depends on you, which build tool you are using or you can write manually also. 
Like we discuss, for avro we need to create schema first like in my example i have following schema: 

{"namespace": "com.harmeetsingh13.java",
    "type": "record",
    "name": "Customer",
    "fields": [{"name": "id","type": "int"},
            {"name": "name","type": "string"}]
}

By using sbt-avro plugin, my pojo class is generated automatically. Now, we are creating our Kafka Producer by using Avro.

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

In this example, the one thing we need to note about is that, for serializing our message key, we are still using Kafka  `StringSerializer` class, because when we deserialize our string value using Avro `KafkaAvroDeserializer` we are facing this issue:

Error deserializing Avro message for id 351 org.apache.kafka.common.errors.SerializationException: 
Error deserializing Avro message for id 351 Caused org.apache.kafka.common.errors.SerializationException: 
string specified by the writers schema could not be instantiated to find the readers schema

For more details, you look into this discussion. 

As we are discuss, There are various ways for  serialize messages to kafka queue using avro. I the above example we are using Generic way for message serialization but we can serialize messages with more specific way also. Which we are discuss in our next examples.

III: Apache Avro Serialization Specific Format One: 

Another avro example is for serialize messages with one specific way as below:

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

IV: Apache Avro Serialization Specific Format Two: 

Another way for serialize messages using avro as below: 

public class AvroSpecificProducerTwo {

    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    private static byte[] convertCustomerToAvroBytes(Customer customer) throws IOException {
        Parser parser = new Parser();
        Schema schema = parser.parse(AvroSpecificProducerOne.class
                .getClassLoader().getResourceAsStream("avro/customer.avsc"));

        SpecificDatumWriter writer = new SpecificDatumWriter<>(schema);
        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
            writer.write(customer, encoder);
            encoder.flush();

            return os.toByteArray();
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");


        byte[] customer1AvroBytes = convertCustomerToAvroBytes(customer1);
        byte[] customer2AvroBytes = convertCustomerToAvroBytes(customer2);

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerTwoTopic",
                "KeyOne", customer1AvroBytes
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerTwoTopic",
                "KeyOne", customer2AvroBytes
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

There are still multiple ways for serialize messages using avro or other message serializer for kafka. In the next post, we will discuss kafka consumers and various way for consume messages from kafka queue using avro.

For above examples, you can download code from github repo also.