Showing posts with label kafka. Show all posts
Showing posts with label kafka. Show all posts

Saturday, February 25, 2017

Apache Kafka: Multiple ways for Produce or Push Message to Kafka topics

Today, I am going to describe what are the various ways in Apache kafka, for put the messages into topics. Apache Kafka have supports for several languages and also provide api's for Java, one of the reason is, Java is the primary language of JVM and most of the JVM based languages have full support for using Java libraries easily.

Kafka have a concept of topics, partitions etc. which you can explore from Apache kafka documentation or Confluent documentation. For put messages in kafka queue, kafka supports serialization and various formats for messages. Some of the formats kafka provides by default, but for kafka recommended format is Apache Avro. Avro is a lightweight and type safe format for serialized data. For more, you can explore apache avro.

Kafka have a concept of Producer/Consumer. Producer produce the data to queue and Consumer consume the data from queue. Today we are creating various Kafka Producers for produce data in kafka topic.

Prerequisite:

  • Install JDK 8.
  • Download Apache Kafka.
  • Download Zookeeper.
  • Download Confluent Kafka Kit. 
  • IDE
  • Build Tool ( We are using SBT) 

I. Simple Producer: 

First, we are creating kafka simple producer for producing messages to the kafka topic using java. 

public class SimpleProducer {

    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }


    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerRecord record1 = new ProducerRecord<>("CustomerCountry",
        "Record 1", "Japan1"
        );

        ProducerRecord record2 = new ProducerRecord<>("CustomerCountry",
                "Record 2", "Punjab1"
        );

        fireAndForget(record1);
        asyncSend(record2);

        Thread.sleep(10000);
    }
}

In this example, we are just produce the data into queue with Kafka Default serialization `StringSerializer`.

II. Apache Avro Serialization Generic Format: 

For using Apache Avro, we need to create schema for our messages, because that schema help us for deserialize messages with type safety. For using avro, there are various build tools plugins, provide's us for generate our POJO classes from avro schema file. For good practices, we must use that tools, because some time our messages may too complex and for manually, we are always going with a mistake.

NOTE: For using avro, we need to start confluent registry server, because that registry server is used to manage messages schema's and our messages are managed by kafka queues. For more details, please visit documentation.

For SBT, I am using sbt-avro plugin. Its depends on you, which build tool you are using or you can write manually also. 
Like we discuss, for avro we need to create schema first like in my example i have following schema: 

{"namespace": "com.harmeetsingh13.java",
    "type": "record",
    "name": "Customer",
    "fields": [{"name": "id","type": "int"},
            {"name": "name","type": "string"}]
}

By using sbt-avro plugin, my pojo class is generated automatically. Now, we are creating our Kafka Producer by using Avro.

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

In this example, the one thing we need to note about is that, for serializing our message key, we are still using Kafka  `StringSerializer` class, because when we deserialize our string value using Avro `KafkaAvroDeserializer` we are facing this issue:

Error deserializing Avro message for id 351 org.apache.kafka.common.errors.SerializationException: 
Error deserializing Avro message for id 351 Caused org.apache.kafka.common.errors.SerializationException: 
string specified by the writers schema could not be instantiated to find the readers schema

For more details, you look into this discussion. 

As we are discuss, There are various ways for  serialize messages to kafka queue using avro. I the above example we are using Generic way for message serialization but we can serialize messages with more specific way also. Which we are discuss in our next examples.

III: Apache Avro Serialization Specific Format One: 

Another avro example is for serialize messages with one specific way as below:

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

IV: Apache Avro Serialization Specific Format Two: 

Another way for serialize messages using avro as below: 

public class AvroSpecificProducerTwo {

    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    private static byte[] convertCustomerToAvroBytes(Customer customer) throws IOException {
        Parser parser = new Parser();
        Schema schema = parser.parse(AvroSpecificProducerOne.class
                .getClassLoader().getResourceAsStream("avro/customer.avsc"));

        SpecificDatumWriter writer = new SpecificDatumWriter<>(schema);
        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
            writer.write(customer, encoder);
            encoder.flush();

            return os.toByteArray();
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");


        byte[] customer1AvroBytes = convertCustomerToAvroBytes(customer1);
        byte[] customer2AvroBytes = convertCustomerToAvroBytes(customer2);

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerTwoTopic",
                "KeyOne", customer1AvroBytes
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerTwoTopic",
                "KeyOne", customer2AvroBytes
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

There are still multiple ways for serialize messages using avro or other message serializer for kafka. In the next post, we will discuss kafka consumers and various way for consume messages from kafka queue using avro.

For above examples, you can download code from github repo also. 

Tuesday, February 7, 2017

Building Microservices Based Enterprise Applications in Java Using Lagom - Part I

As we know, now days, most of the enterprise applications design as a "Microservices Architecture" because of scalability, sharding, loosely-coupling and many others reasons are there. On the other hand  JavaEE help us for building an Enterprise Applications. As we know, Java help us for building a good applications, but with JavaEE monolithic approach our applications are not scalable as compare to microservices.
That's why, Lagom comes into picture. Lagom provides a way for building an Enterprise Application with design of Microservices based architecture and also give us Responsive, Resilient, Elastic and Message Driven features or in other words a Reactive Approach. Lagom design philosophy as :

  1. Asynchronous.
  2. Distributed Persistent.
  3. Developer Productivity.

In this blog, we are building a sample application, for managing user module and performing CRUD on user. 

Note: Lagom gives us strict approach for designing applications with Domain Driven Design (DDD) manner and also follow CQRS for event sourcing.

Step-I: 

We are building a maven based project using Lagom and our project structure is as below: 


Step - II

As we know, Lagom follow DDD approach, so in maven based project we are creating submodules according to our domain. In, sample, we are managing user domain, so, we are creating two maven sub modules as "user-api" and "user-impl". user-api contains just specification and declarations of methods for our rest endpoints and in user-impl we actually implements, implementation of services. 
In user-api, we are creating a class "UserService" and declare our all end points as below code:

public interface UserService extends Service {

    ServiceCall> user(String id);

    ServiceCall newUser();

    ServiceCall updateUser();

    ServiceCall delete(String id);

    ServiceCall> currentState(String id);

    @Override
    default Descriptor descriptor() {

        return named("user").withCalls(
                restCall(GET, "/api/user/:id", this::user),
                restCall(POST, "/api/user", this::newUser),
                restCall(PUT, "/api/user", this::updateUser),
                restCall(DELETE, "/api/user/:id", this::delete),
                restCall(GET, "/api/user/current-state/:id", this::currentState)
        ).withAutoAcl(true);
    }
}

Step III

Now in our user-impl module we are giving implementation of all services. For design a services, Lagom provide us strict model for following DDD. According to DDD we need Entities, Commands, Events and more. Initially we need to define commands for our module as below: 


public interface UserCommand extends Jsonable {

    @Value
    @Builder
    @JsonDeserialize
    final class CreateUser implements UserCommand, PersistentEntity.ReplyType {
        User user;
    }

    @Value
    @Builder
    @JsonDeserialize
    final class UpdateUser implements UserCommand, PersistentEntity.ReplyType {
        User user;
    }

    @Value
    @Builder
    @JsonDeserialize
    final class DeleteUser implements UserCommand, PersistentEntity.ReplyType {
        User user;
    }

    @Immutable
    @JsonDeserialize
    final class UserCurrentState implements UserCommand, PersistentEntity.ReplyType> {}
}
For designing a pojos in Java, I am using Lombok library for creating immutables classes and remove boilerplate code. Lagom also provide us other options as well mention in documentations.

Note: You can use any library, but before using, configure your IDE according to library.

Step IV

We are following CQRS, so we need to define events as well: 

public interface UserEvent extends Jsonable, AggregateEvent {

    @Override
    default AggregateEventTagger aggregateTag() {
        return UserEventTag.INSTANCE;
    }

    @ValueEvent
    @Builder
    @JsonDeserialize
    final class UserCreated implements UserEvent, CompressedJsonable {
        User user;
        String entityId;
    }

    @Value
    @Builder
    @JsonDeserialize
    final class UserUpdated implements UserEvent, CompressedJsonable {
        User user;
        String entityId;
    }

    @Value
    @Builder
    @JsonDeserialize
    final class UserDeleted implements UserEvent, CompressedJsonable {
        User user;
        String entityId;
    }
}

Our events must be in compress form, because we need to persist hole event in db. For more details go through lagom documentation.

Step V

We need to define our entity, and define behaviors according to commands and events as below: 

public class UserEntity extends PersistentEntity {

    @Override
    public Behavior initialBehavior(Optional snapshotState) {

        // initial behaviour of user
        BehaviorBuilder behaviorBuilder = newBehaviorBuilder(
                UserState.builder().user(Optional.empty())
                        .timestamp(LocalDateTime.now().toString()).build()
        );

        behaviorBuilder.setCommandHandler(CreateUser.class, (cmd, ctx) ->
                ctx.thenPersist(UserCreated.builder().user(cmd.getUser())
                        .entityId(entityId()).build(), evt -> ctx.reply(Done.getInstance()))
        );

        behaviorBuilder.setEventHandler(UserCreated.class, evt ->
                UserState.builder().user(Optional.of(evt.getUser()))
                        .timestamp(LocalDateTime.now().toString()).build()
        );

        behaviorBuilder.setCommandHandler(UpdateUser.class, (cmd, ctx) ->
                ctx.thenPersist(UserUpdated.builder().user(cmd.getUser()).entityId(entityId()).build()
                        , evt -> ctx.reply(Done.getInstance()))
        );

        behaviorBuilder.setEventHandler(UserUpdated.class, evt ->
                UserState.builder().user(Optional.of(evt.getUser()))
                        .timestamp(LocalDateTime.now().toString()).build()
        );

        behaviorBuilder.setCommandHandler(DeleteUser.class, (cmd, ctx) ->
                ctx.thenPersist(UserDeleted.builder().user(cmd.getUser()).entityId(entityId()).build(),
                        evt -> ctx.reply(cmd.getUser()))
        );

        behaviorBuilder.setEventHandler(UserDeleted.class, evt ->
                UserState.builder().user(Optional.empty())
                        .timestamp(LocalDateTime.now().toString()).build()
        );

        behaviorBuilder.setReadOnlyCommandHandler(UserCurrentState.class, (cmd, ctx) ->
                ctx.reply(state().getUser())
        );

        return behaviorBuilder.build();
    }
}

By default, Lagom recommendations are using Cassandra for events storing. But Lagom also support RDBMS as well. For more details please click on this link.
Here we define our implementation of commands and events. In simple words, here we decide what will happen with specific command and what will happen with specific event and maintain the state in memory.

Step VI

Sometimes, we also need to persist our data separately as well as events, in that case lagom provide us "ReadSideProcessor" abstract class perform operations according to events happened just like below: 

  
public class UserEventProcessor extends ReadSideProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(UserEventProcessor.class);

    private final CassandraSession session;
    private final CassandraReadSide readSide;

    private PreparedStatement writeUsers;
    private PreparedStatement deleteUsers;

    @Inject
    public UserEventProcessor(final CassandraSession session, final CassandraReadSide readSide) {
        this.session = session;
        this.readSide = readSide;
    }

    @Override
    public PSequence> aggregateTags() {
        LOGGER.info(" aggregateTags method ... ");
        return TreePVector.singleton(UserEventTag.INSTANCE);
    }

    @Override
    public ReadSideHandler buildHandler() {
        LOGGER.info(" buildHandler method ... ");
        return readSide.builder("users_offset")
                .setGlobalPrepare(this::createTable)
                .setPrepare(evtTag -> prepareWriteUser()
                        .thenCombine(prepareDeleteUser(), (d1, d2) -> Done.getInstance())
                )
                .setEventHandler(UserCreated.class, this::processPostAdded)
                .setEventHandler(UserUpdated.class, this::processPostUpdated)
                .setEventHandler(UserDeleted.class, this::processPostDeleted)
                .build();
    }

    // Execute only once while application is start
    private CompletionStage createTable() {
        return session.executeCreateTable(
                "CREATE TABLE IF NOT EXISTS users ( " +
                        "id TEXT, name TEXT, age INT, PRIMARY KEY(id))"
        );
    }

    /*
    * START: Prepare statement for insert user values into users table.
    * This is just creation of prepared statement, we will map this statement with our event
    */
    private CompletionStage prepareWriteUser() {
        return session.prepare(
                "INSERT INTO users (id, name, age) VALUES (?, ?, ?)"
        ).thenApply(ps -> {
            setWriteUsers(ps);
            return Done.getInstance();
        });
    }

    private void setWriteUsers(PreparedStatement statement) {
        this.writeUsers = statement;
    }

    // Bind prepare statement while UserCreate event is executed
    private CompletionStage> processPostAdded(UserCreated event) {
        BoundStatement bindWriteUser = writeUsers.bind();
        bindWriteUser.setString("id", event.getUser().getId());
        bindWriteUser.setString("name", event.getUser().getName());
        bindWriteUser.setInt("age", event.getUser().getAge());
        return CassandraReadSide.completedStatements(Arrays.asList(bindWriteUser));
    }
    /* ******************* END ****************************/

    /* START: Prepare statement for update the data in users table.
    * This is just creation of prepared statement, we will map this statement with our event
    */
    private CompletionStage> processPostUpdated(UserUpdated event) {
        BoundStatement bindWriteUser = writeUsers.bind();
        bindWriteUser.setString("id", event.getUser().getId());
        bindWriteUser.setString("name", event.getUser().getName());
        bindWriteUser.setInt("age", event.getUser().getAge());
        return CassandraReadSide.completedStatements(Arrays.asList(bindWriteUser));
    }
    /* ******************* END ****************************/

    /* START: Prepare statement for delete the the user from table.
    * This is just creation of prepared statement, we will map this statement with our event
    */
    private CompletionStage prepareDeleteUser() {
        return session.prepare(
                "DELETE FROM users WHERE id=?"
        ).thenApply(ps -> {
            setDeleteUsers(ps);
            return Done.getInstance();
        });
    }

    private void setDeleteUsers(PreparedStatement deleteUsers) {
        this.deleteUsers = deleteUsers;
    }

    private CompletionStage> processPostDeleted(UserDeleted event) {
        BoundStatement bindWriteUser = deleteUsers.bind();
        bindWriteUser.setString("id", event.getUser().getId());
        return CassandraReadSide.completedStatements(Arrays.asList(bindWriteUser));
    }
    /* ******************* END ****************************/
}

Fore more details, please click on link.

Step VII

Finally we are going to define implementation of our rest service endpoints as below: 


public class UserServiceImpl implements UserService {

    private final PersistentEntityRegistry persistentEntityRegistry;
    private final CassandraSession session;

    @Inject
    public UserServiceImpl(final PersistentEntityRegistry registry, ReadSide readSide, CassandraSession session) {
        this.persistentEntityRegistry = registry;
        this.session = session;

        persistentEntityRegistry.register(UserEntity.class);
        readSide.register(UserEventProcessor.class);
    }

    @Override
    public ServiceCall> user(String id) {
        return request -> {
            CompletionStage> userFuture =
                    session.selectAll("SELECT * FROM users WHERE id = ?", id)
                            .thenApply(rows ->
                                    rows.stream()
                                            .map(row -> User.builder().id(row.getString("id"))
                                                    .name(row.getString("name")).age(row.getInt("age"))
                                                    .build()
                                            )
                                            .findFirst()
                            );
            return userFuture;
        };
    }

    @Override
    public ServiceCall newUser() {
        return user -> {
            PersistentEntityRef ref = userEntityRef(user);
            return ref.ask(CreateUser.builder().user(user).build());
        };
    }

    @Override
    public ServiceCall updateUser() {
        return user -> {
            PersistentEntityRef ref = userEntityRef(user);
            return ref.ask(UpdateUser.builder().user(user).build());
        };
    }

    @Override
    public ServiceCall delete(String id) {
        return request -> {
            User user = User.builder().id(id).build();
            PersistentEntityRef ref = userEntityRef(user);
            return ref.ask(DeleteUser.builder().user(user).build());
        };
    }

    @Override
    public ServiceCall> currentState(String id) {
        return request -> {
            User user = User.builder().id(id).build();
            PersistentEntityRef ref = userEntityRef(user);
            return ref.ask(new UserCurrentState());
        };
    }

    private PersistentEntityRef userEntityRef(User user) {
        return persistentEntityRegistry.refFor(UserEntity.class, user.getId());
    }
}

There are still lots of things Lagom provide us, for developing enterprise applications. Which we will cover in our next blog.

Full source of this example, please click on link.  

Wednesday, January 25, 2017

Short Interview With SMACK Tech Stack !!!

Hello guy's, today's we conduct short interview with SMACK about its architecture and there uses. Let's start with of some introduction.

Interviewer: How would you describe your self ?
SMACK: I am SMACK (Spark, Mesos, Akka, Cassandra and Kafka) and belongs to all open source technologies. Mesosphere and Cisco collaboration bundles these technologies together and create a product called Infinity.  Which is used to solved pipeline data challenges where the speed of response is matters like fraud detection system.

Interviewer: Why SMACK ?
SMACK: Now day's modern data-processing challenges are :
  • Data is getting bigger or more accurately, the number of data source is increasing.
  • Today, many modern business models data from one hour ago is practically obsolete.
  • Data analysis becomes to slow to get any return on investment info.
  • One modern requirement is to have horizontal scaling with low cost.
  • We live in a age where data freshness matters many times more than the amount or size of data.
There are many challenges we are facing, SMACK exist because one technology doesn't make an architecture. SMACK is a pipelined architecture model for data processing.

Interviewer: Lambda Architecture is data processing architecture and have advantage of both batch and stream processing methods, So, how SMACK different ?
SMACK: Yes, Lambda Architecture have these features, but most of the lambdas solutions cannot meet two needs at the same time :
  1. Handles a massive data stream in real time.
  2. Handles multiple and different data models from multiple data source.
For these. Apache Spark is responsible for real time analysis for both historical and recent and from massive information torrent and all such information and analysis results are persisted in Apache Cassandra. So, in the case of failure we can recover the real time data from any point of time. With lambda Architecture it's not always possible.

Interviewer: SMACK can you briefly describe about you technologies ?
SMACK: Yes sure, as we discussed SMACK is basically used for Pipeline data architecture for online data stream processing. There are lots of books and articles are available on each and every technology but we are using every technology for some specific purpose like:
  • Apache Spark: Processing Engine.
  • Akka: The Model.
  • Apache Kafka: The Broker.
  • Apache Cassandra: The Storage.
  • Apache Mesos: The Container.
See, all are Apache projects with the exception of Akka.

Interviewer: Is, SMACK is only solution ?
SMACK: No, You can replace individual components as per you requirements like Yarn could be used as the cluster scheduler instead of Mesos and Apache Flink would be suitable batch and stream processing alternatives to Akka. There are many alternatives to SMACK.

Interviewer: Could you discuss one of your case study with us ?
SMACK: Yes, But not know. I need to go with my technologies for some hangout, that we will discuss further in our next interview.

Interviewer: Can I take one picture of you ?
SMACK: Yes sure, cheeezzzz .....


References: