Showing posts with label plugin. Show all posts
Showing posts with label plugin. Show all posts

Saturday, February 25, 2017

Apache Kafka: Multiple ways for Produce or Push Message to Kafka topics

Today, I am going to describe what are the various ways in Apache kafka, for put the messages into topics. Apache Kafka have supports for several languages and also provide api's for Java, one of the reason is, Java is the primary language of JVM and most of the JVM based languages have full support for using Java libraries easily.

Kafka have a concept of topics, partitions etc. which you can explore from Apache kafka documentation or Confluent documentation. For put messages in kafka queue, kafka supports serialization and various formats for messages. Some of the formats kafka provides by default, but for kafka recommended format is Apache Avro. Avro is a lightweight and type safe format for serialized data. For more, you can explore apache avro.

Kafka have a concept of Producer/Consumer. Producer produce the data to queue and Consumer consume the data from queue. Today we are creating various Kafka Producers for produce data in kafka topic.

Prerequisite:

  • Install JDK 8.
  • Download Apache Kafka.
  • Download Zookeeper.
  • Download Confluent Kafka Kit. 
  • IDE
  • Build Tool ( We are using SBT) 

I. Simple Producer: 

First, we are creating kafka simple producer for producing messages to the kafka topic using java. 

public class SimpleProducer {

    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }


    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerRecord record1 = new ProducerRecord<>("CustomerCountry",
        "Record 1", "Japan1"
        );

        ProducerRecord record2 = new ProducerRecord<>("CustomerCountry",
                "Record 2", "Punjab1"
        );

        fireAndForget(record1);
        asyncSend(record2);

        Thread.sleep(10000);
    }
}

In this example, we are just produce the data into queue with Kafka Default serialization `StringSerializer`.

II. Apache Avro Serialization Generic Format: 

For using Apache Avro, we need to create schema for our messages, because that schema help us for deserialize messages with type safety. For using avro, there are various build tools plugins, provide's us for generate our POJO classes from avro schema file. For good practices, we must use that tools, because some time our messages may too complex and for manually, we are always going with a mistake.

NOTE: For using avro, we need to start confluent registry server, because that registry server is used to manage messages schema's and our messages are managed by kafka queues. For more details, please visit documentation.

For SBT, I am using sbt-avro plugin. Its depends on you, which build tool you are using or you can write manually also. 
Like we discuss, for avro we need to create schema first like in my example i have following schema: 

{"namespace": "com.harmeetsingh13.java",
    "type": "record",
    "name": "Customer",
    "fields": [{"name": "id","type": "int"},
            {"name": "name","type": "string"}]
}

By using sbt-avro plugin, my pojo class is generated automatically. Now, we are creating our Kafka Producer by using Avro.

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

In this example, the one thing we need to note about is that, for serializing our message key, we are still using Kafka  `StringSerializer` class, because when we deserialize our string value using Avro `KafkaAvroDeserializer` we are facing this issue:

Error deserializing Avro message for id 351 org.apache.kafka.common.errors.SerializationException: 
Error deserializing Avro message for id 351 Caused org.apache.kafka.common.errors.SerializationException: 
string specified by the writers schema could not be instantiated to find the readers schema

For more details, you look into this discussion. 

As we are discuss, There are various ways for  serialize messages to kafka queue using avro. I the above example we are using Generic way for message serialization but we can serialize messages with more specific way also. Which we are discuss in our next examples.

III: Apache Avro Serialization Specific Format One: 

Another avro example is for serialize messages with one specific way as below:

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

IV: Apache Avro Serialization Specific Format Two: 

Another way for serialize messages using avro as below: 

public class AvroSpecificProducerTwo {

    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    private static byte[] convertCustomerToAvroBytes(Customer customer) throws IOException {
        Parser parser = new Parser();
        Schema schema = parser.parse(AvroSpecificProducerOne.class
                .getClassLoader().getResourceAsStream("avro/customer.avsc"));

        SpecificDatumWriter writer = new SpecificDatumWriter<>(schema);
        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
            writer.write(customer, encoder);
            encoder.flush();

            return os.toByteArray();
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");


        byte[] customer1AvroBytes = convertCustomerToAvroBytes(customer1);
        byte[] customer2AvroBytes = convertCustomerToAvroBytes(customer2);

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerTwoTopic",
                "KeyOne", customer1AvroBytes
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerTwoTopic",
                "KeyOne", customer2AvroBytes
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

There are still multiple ways for serialize messages using avro or other message serializer for kafka. In the next post, we will discuss kafka consumers and various way for consume messages from kafka queue using avro.

For above examples, you can download code from github repo also. 

Thursday, May 22, 2014

Integrate JOOQ With Spring and Perform CRUD Operations.

Introduction:

In the last POST we discuss about  generate Java classes using JOOQ maven generator. These java files are represent table in database. Like hibernate Jboss tools are used to generate entities corresponding to the table. Today we perform CRUD Operations with the help of JOOQ. For Generating java classes, refer to my previous POST. In this we Integrate  JOOQ with SPRING framework. There are may blog post that help me to create this example and also the JOOQ website, which share lots of example. 

Step 1:

The Dependencies that we need : 


Step 2:

Create the class to handle JOOQ exception in standard exception. Mostly the framework follow the standard SQLException framework, in which the framework specific exception are wrap into standard SQL exception so it can maintain easily. Now we create the class for handle JOOQ exception and wrap into Standard Exception. 

import org.jooq.ExecuteContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultExecuteListener;
import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator;
import org.springframework.jdbc.support.SQLExceptionTranslator;
import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;

public class JOOQToSpringExceptionTransformer extends DefaultExecuteListener {
  private static final long serialVersionUID = -5749466061513426635L;
  @Override
  public void exception(ExecuteContext ctx) {
 SQLDialect dialect = ctx.configuration().dialect();
 SQLExceptionTranslator sqlExceptionTranslator = null;
 if(dialect != null){
  sqlExceptionTranslator = new SQLErrorCodeSQLExceptionTranslator(dialect.getName());
 }else{
  sqlExceptionTranslator = new SQLStateSQLExceptionTranslator();
 }
 
 ctx.exception(sqlExceptionTranslator.translate("JOOQ", ctx.sql(), ctx.sqlException()));
  }
}

In this :
DefaultExecuteListener : The DefaultExecuteListener class is the public default implementation of the ExecuteListener interface which provides listener methods for different life cycle events of a single query execution.

Step 3: 

In this we Cover Database Java Based Configuration: 

Properties File : application.properties
#Database Configuration
db.driver=com.mysql.jdbc.Driver
db.url=jdbc:mysql://localhost:3306/jooq_test
db.username=test
db.password=root

#jOOQ Configuration
jooq.sql.dialect=MYSQL

Database Configuration file: PersistenceConfiguration.java

import javax.sql.DataSource;

import org.apache.commons.dbcp2.BasicDataSource;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DataSourceConnectionProvider;
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.jooq.impl.DefaultExecuteListenerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
 * @author Programmers
 *
 */
@Configuration
@ComponentScan({"com.the13star.service.test", "com.the13star.service.impl", "com.the13star.dao.impl" })
@EnableTransactionManagement
@PropertySource("classpath:application.properties")
public class PersistenceContext {

 @Autowired
 private Environment environment;

 @Bean(destroyMethod = "close") // destroyMethod attribute is used to close the bean
 public DataSource dataSource() {
  BasicDataSource dataSource = new BasicDataSource();
  dataSource.setDriverClassName(environment.getRequiredProperty("db.driver").trim());
  dataSource.setUrl(environment.getRequiredProperty("db.url").trim());
  dataSource.setUsername(environment.getRequiredProperty("db.username").trim());
  dataSource.setPassword(environment.getRequiredProperty("db.password").trim());
  dataSource.setInitialSize(5);
  dataSource.setMaxTotal(5);
  return dataSource;
 }

 // To delay opening a jdbc connection until the first actual sql statement
 // happens use LazyConnectionDataSourceProxy
 @Bean
 public LazyConnectionDataSourceProxy lazyConnectionDataSource() {
  return new LazyConnectionDataSourceProxy(dataSource());
 }

 // Configure jOOQ's ConnectionProvider to use Spring's
 // TransactionAwareDataSourceProxy,
 // which can dynamically discover the transaction context
 /**
  * Configure the TransactionAwareDataSourceProxy bean. This bean ensures
  * that all JDBC connection are aware of Spring-managed transactions. In
  * other words, JDBC connections participates in thread-bound transactions
  */
 @Bean
 public TransactionAwareDataSourceProxy transactionAwareDataSource() {
  return new TransactionAwareDataSourceProxy(lazyConnectionDataSource());
 }

 /**
  * Configure the DataSourceTransactionManager bean. We must pass the
  * LazyConnectionDataSourceProxy bean as as constructor argument when we
  * create a new DataSourceTransactionManager object.
  */
 @Bean
 public DataSourceTransactionManager dataSourceTransactionManager() {
  return new DataSourceTransactionManager(lazyConnectionDataSource());
 }

 /**
  * Configure the DataSourceConnectionProvider bean. jOOQ will get the used
  * connections from the DataSource given as a constructor argument. We must
  * pass the TransactionAwareDataSourceProxy bean as a constructor argument
  * when we create a new DataSourceConnectionProvider object. This ensures
  * that the queries created jOOQ participate in Spring-managed transactions.
  */
 @Bean
 public DataSourceConnectionProvider connectionProvider() {
  return new DataSourceConnectionProvider(transactionAwareDataSource());
 }

 @Bean
 public JOOQToSpringExceptionTransformer jooqToSpringExceptionTranslator() {
  return new JOOQToSpringExceptionTransformer();
 }

 /**
  * Invoking an internal, package-private constructor for the example
  * Implement your own Configuration for more reliable behaviour
  */
 @Bean
 public DefaultConfiguration configuration() {
  DefaultConfiguration configuration = new DefaultConfiguration();
  configuration.set(connectionProvider());
  configuration.set(new DefaultExecuteListenerProvider(
    jooqToSpringExceptionTranslator()));

  String sqlDialect = environment.getRequiredProperty("jooq.sql.dialect");
  SQLDialect dialect = SQLDialect.valueOf(sqlDialect);
  configuration.set(dialect);

  return configuration;

 }

 /**
  * Configure the DSL object, optionally overriding jOOQ Exceptions with
  * Spring Exceptions. We use this bean when we are creating database queries
  * with jOOQ.
  */
 @Bean
 public DSLContext dslContext() {
  return new DefaultDSLContext(configuration());
 }

 /**
  * We use this bean to create the database schema for database when our
  * application is started (If you don’t use an embedded database, you don’t
  * have to configure this bean).
  */
 /*
 @Bean
 public DataSourceInitializer dataSourceInitializer() {
  DataSourceInitializer dataSourceInitializer = new DataSourceInitializer();
  dataSourceInitializer.setDataSource(dataSource());
  
  ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
  databasePopulator.addScript(new ClassPathResource(environment.getRequiredProperty("")));
  
  dataSourceInitializer.setDatabasePopulator(databasePopulator);
  return dataSourceInitializer;
 }*/
}

Step 4: 

Create Service layer : 
import java.util.List;

import com.the13star.dbmetadata.tables.records.UserDetailRecord;

public interface UserDetailService {
 public void saveUserDetail(int id, String name, int age);

 public List getAllUsers();

 public UserDetailRecord getUserByID(int i);

 public int updateUserById(UserDetailRecord userDetailRecord);

 public int deleteUserById(int id);
}

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.the13star.dao.UserDetailDao;
import com.the13star.dbmetadata.tables.records.UserDetailRecord;
import com.the13star.service.UserDetailService;

/**
 * @author Programmers
 *
 */
@Service
public class UserDetailServiceImpl implements UserDetailService {
 
 @Autowired
 private UserDetailDao userDetailDao;

 /* (non-Javadoc)
  * @see com.the13star.service.UserDetailService#saveUserDetail(int, java.lang.String, int)
  */
 @Override
 public void saveUserDetail(int id, String name, int age) {
  UserDetailRecord record = new UserDetailRecord();
  record.setId(id);
  record.setName(name);
  record.setAge(age);
  
  userDetailDao.insertNewUser(record);
 }

 @Override
 public List getAllUsers() {
  return userDetailDao.getAllUsers();
 }

 @Override
 public UserDetailRecord getUserByID(int id) {
  return userDetailDao.getUserByID(id);
 }

 @Override
 public int updateUserById(UserDetailRecord userDetailRecord) {
  return userDetailDao.updateUserById(userDetailRecord);
 }

 @Override
 public int deleteUserById(int id) {
  return userDetailDao.deleteUserById(id);
 }

}

Step 5:

Create Dao Layer : 
import java.util.List;

import com.the13star.dbmetadata.tables.records.UserDetailRecord;


/**
 * @author Programmers
 *
 */
public interface UserDetailDao {

 public void insertNewUser(UserDetailRecord userDetailRecord);

 public List getAllUsers();

 public UserDetailRecord getUserByID(int id);

 public int updateUserById(UserDetailRecord userDetailRecord);

 public int deleteUserById(int id);
}

import java.util.ArrayList;
import java.util.List;

import org.jooq.DSLContext;
import org.jooq.DeleteConditionStep;
import org.jooq.DeleteWhereStep;
import org.jooq.InsertValuesStep3;
import org.jooq.Result;
import org.jooq.UpdateConditionStep;
import org.jooq.UpdateSetFirstStep;
import org.jooq.UpdateSetMoreStep;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import com.the13star.dao.UserDetailDao;
import com.the13star.dbmetadata.tables.UserDetail;
import com.the13star.dbmetadata.tables.records.UserDetailRecord;

/**
 * @author Programmers
 *
 */
@Repository
public class UserDetailDaoImpl implements UserDetailDao {

 @Autowired
 DSLContext dslContext;

 public void insertNewUser(UserDetailRecord userDetailRecord) {
  InsertValuesStep3 userdetails = dslContext
    .insertInto(UserDetail.USER_DETAIL, UserDetail.USER_DETAIL.ID,
      UserDetail.USER_DETAIL.NAME, UserDetail.USER_DETAIL.AGE);
  userdetails.values(userDetailRecord.getId(),
    userDetailRecord.getName(), userDetailRecord.getAge());
  userdetails.execute();
 }

 @Override
 public List getAllUsers() {
  Result userDetails = dslContext
    .fetch(UserDetail.USER_DETAIL);
  return new ArrayList<>(userDetails);
 }

 @Override
 public UserDetailRecord getUserByID(int id) {
  return dslContext.fetchOne(UserDetail.USER_DETAIL,
    UserDetail.USER_DETAIL.ID.equal(id));
 }

 @Override
 public int updateUserById(UserDetailRecord userDetailRecord) {
  UpdateSetFirstStep updateSetFirstStep = dslContext
    .update(UserDetail.USER_DETAIL);
  UpdateSetMoreStep updateSetMoreStep = updateSetFirstStep
    .set(UserDetail.USER_DETAIL.NAME, userDetailRecord.getName())
    .set(UserDetail.USER_DETAIL.AGE, userDetailRecord.getAge());
  UpdateConditionStep updateConditionStep = updateSetMoreStep
    .where(UserDetail.USER_DETAIL.ID.equal(userDetailRecord.getId()));
  return updateConditionStep.execute();
 }

 @Override
 public int deleteUserById(int id) {
  DeleteWhereStep deleteWhereStep = dslContext.delete(UserDetail.USER_DETAIL);
  DeleteConditionStep deleteConditionStep = deleteWhereStep.where(UserDetail.USER_DETAIL.ID.equal(id));
  return deleteConditionStep.execute();
 }

}

Step 6: 

Launch Our Code:

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.stereotype.Component;

import com.the13star.configurations.PersistenceContext;
import com.the13star.dbmetadata.tables.records.UserDetailRecord;
import com.the13star.service.UserDetailService;

/**
 * @author Programmers
 *
 */
@Component
public class UserDetailTest {
 
 @Autowired
 private UserDetailService userDetailService;
 /**
  * @param args
  */ 
 
 private void start() {
  //userDetailService.saveUserDetail(3, "MICKY", 21);
  List userDetails = userDetailService.getAllUsers();
  for(UserDetailRecord record : userDetails){
   System.out.println(record);
  }
  /*
  UserDetailRecord record = userDetailService.getUserByID(1);
  System.out.println(record);*/
  /*
  UserDetailRecord userDetailRecord = new UserDetailRecord();
  userDetailRecord.setId(3);
  userDetailRecord.setName("Micky");
  userDetailRecord.setAge(26);
  int result = userDetailService.updateUserById(userDetailRecord);*/
  /*
  int result = userDetailService.deleteUserById(2);
  System.out.println("Result : "+result);*/
 }
 
 public static void main(String[] args) {
  AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(PersistenceContext.class);
  UserDetailTest userDetailTest = applicationContext.getBean(UserDetailTest.class);
  userDetailTest.start();
  applicationContext.close();
 }

}

For downloading the example code, go to the this link