Using Apache Kafka for Asynchronous Communication in Microservices

Development

Reading Time: 4 minutes

While microservice architecture might not be a silver bullet for all systems, it definitely has its advantages, especially when building a complex system with a lot of different components. Of course, if you’re considering microservices, you have to give serious thought to how the different services will communicate.

In this post, we’ll look at how to set up an Apache Kafka instance, create a user service to publish data to topics, and build a notification service to consume data from those topics. Specifically, we’re going to build a two-factor authentication app where a user will register, receive a mail with a verification code, and complete their registration using the code. The source code can be found here.

Why Apache Kafka?

Kafka is a distributed streaming platform created by LinkedIn in 2011 to handle high throughput, low latency transmission, and processing of streams of records in real time. It’s three major capabilities make it ideal for this use case:

  • Publishing and subscribing to streams of records. In this respect, it is similar to a message queue or enterprise messaging system.
  • Storing streams of records in a fault-tolerant way.
  • Processing streams of records as they occur.

Setting Up Apache Kafka

Before starting this tutorial, the following will be required:

  • Docker for Mac or Docker for Windows
  • Knowledge of Docker Compose
  • Knowledge of Node.js

We will be using the Wurstmeister Kafka Docker image. Note that Kafka uses Zookeeper for coordination between different Kafka nodes.

A docker-compose.yml similiar to the one below is used to pull the images for Kafka and Zookeeper. One of the required configuration options for the Kafka service is KAFKA_ZOOKEEPER_CONNECT, which tells Kafka where to find the Zookeeper instance.

version: '2.1'
  services:
    zookeeper:
      container_name: zookeeper
      image: wurstmeister/zookeeper
      ports:
        - "2181:2181"
    kafka:
      container_name: kafka
      image: wurstmeister/kafka
      ports:
        - "9092"
      depends_on:
        - "zookeeper"
      environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Publishing Data to Kafka Topics

To publish data to a Kafka topic, we are going to create a user service that provides two endpoints:

  • /api/register – Stores user details in an In-Memory store node-cache and publishes user data to a Kafka topic, user_account_created.
  • /api/verify – Verifies that the provided code is correct and publishes user data to a Kafka topic, user_account_verified.

We use the node-rdkafka NPM package to create a producer that connects to Kafka from our node app:

  let producerReady;
  producer = new kafka.Producer({
    debug: 'all',
    'client.id': 'user-api',
    'metadata.broker.list': KAFKA_BROKER_LIST,
    'compression.codec': 'gzip',
    'retry.backoff.ms': 200,
    'message.send.max.retries': 10,
    'socket.keepalive.enable': true,
    'queue.buffering.max.messages': 100000,
    'queue.buffering.max.ms': 1000,
    'batch.num.messages': 1000000,
    dr_cb: true
  });
  producer.connect({}, err => {
    if (err) {
      logger.error('connect', err);
    }
  });
  producerReady = new Promise((resolve, reject) => {
    producer.on('ready', () => {
      logger.info('producer ready');
      resolve(producer);
    });
  });

We create a new promise object that resolves to a producer that is ready to start publishing data. This is used in our sendMessage function, which publishes data to a Kafka topic partition:

  KafkaService.prototype.sendMessage = function sendMessage(
    topic,
    payload,
    partition = 0
  ) {
    return producerReady
      .then(producer => {
        const message = Buffer.from(JSON.stringify(payload));
        producer.produce(topic, partition, message);
      })
      .catch(error => logger.error('unable to send message', error));
  };

Consuming Data from Kafka Topics

To consume data from our Kafka topic, we are going to create a notification service that listens for data coming from our topics and sends an email with either a verification code or success message depending on the topic it got the data from.

We create a consumer that connects to Kafka, where KAFKA_BROKER_LIST is a comma-separated list of all Kafka instances.

  process.stdin.resume(); // keep process alive

  require('dotenv').config();

  const Kafka = require('node-rdkafka');

  const logger = require('./logger');

  const sendMail = require('./email');

  const KAFKA_BROKER_LIST = process.env.KAFKA_BROKER_LIST;

  const consumer = new Kafka.KafkaConsumer({
    //'debug': 'all',
    'metadata.broker.list': KAFKA_BROKER_LIST,
    'group.id': 'notification-service',
    'enable.auto.commit': false
  });

The consumer object returned by node-rdkafka is an instance of a readable stream. We wait for the ready event to subscribe to our topics, user_account_created and user_account_verified, and listen for data in those topics:

  const topics = [
    'user_account_created',
    'user_account_verified'
  ];

  //counter to commit offsets every numMessages are received
  let counter = 0;
  let numMessages = 5;

  consumer.on('ready', function(arg) {
    logger.info('consumer ready.' + JSON.stringify(arg));

    consumer.subscribe(topics);
    //start consuming messages
    consumer.consume();
  });

  consumer.on('data', function(metadata) {
    counter++;

    //committing offsets every numMessages
    if (counter % numMessages === 0) {
      logger.info('calling commit');
      consumer.commit(metadata);
    }

    // Output the actual message contents
    const data = JSON.parse(metadata.value.toString());
    logger.info('data value', data);

    if(metadata.topic === 'user_account_created'){
      const to = data.email;
      const subject = 'Verify Account';
      const content = `Hello ${data.first_name}, 
      Please use this code ${data.code} to complete your verification`;
      sendMail(subject, content,to);
    }else if(metadata.topic === 'user_account_verified') {
      const to = data.email;
      const subject = 'Account Verified';
      const content = `Hello ${data.first_name}, 
      You have successfully been verified`;
      sendMail(subject, content,to);
    }

  });

  consumer.on('disconnected', function(arg) {
    logger.info('consumer disconnected. ' + JSON.stringify(arg));
  });

  //logging all errors
  consumer.on('event.error', function(err) {
    logger.error('Error from consumer', err, 'code: ', err.code);
  });

  //starting the consumer
  consumer.connect();

The data event handler is called when a message is published to any of the topics we are listening to. Here we parse the incoming message and check the metadata object to know which topic the received data is for, so we can carry out the appropriate action.

Conclusion

Our two-factor authentication app demonstrates the communication pattern between only two microservices using Apache Kafka (there are other systems like RabbitMQ, ZeroMQ), but by decoupling communication between those services, we add flexibility for the future. For example, let’s say we add a recommendation service in the future that needs to send out recommendations whenever a new user is signed on; it simply subscribes to the user_account_verified topic, and there would be no need to change the user service.

Resources

Subscribe via Email

Over 60,000 people from companies like Netflix, Apple, Spotify and O'Reilly are reading our articles.
Subscribe to receive a weekly newsletter with articles around Continuous Integration, Docker, and software development best practices.



We promise that we won't spam you. You can unsubscribe any time.

Join the Discussion

Leave us some comments on what you think about this topic or if you like to add something.

  • Pingback: Last Week in Stream Processing & Analytics – 27.2.2018 | Enjoy IT - SOA, Java, Event-Driven Computing and Integration()

  • Сергій Назаревич

    ok. This is good, but not complied solution for microservices…
    My advice: Kafka, Kafka-REST, Kafka Registry from Confluent Platform plus CAS which do transparent authorizations over each service and microservice…
    Alternatively, You could use JHipster project where is Kafka + Spring Registry or Council + UAA Server

    • Wale Martins

      Hi @Aqueelone:disqus, thanks for mentioning those Kafka solutions, they should definitely be considered when setting up a production grade Kafka service

  • Hi, thank you for this article. It is my first read about the apache kafta, but it looks like redux to me, what is the major advantage of difference?

    • Wale Martins

      Hi @thiagogsr:disqus thanks for giving it a read.
      Redux is a library for managing the state/data of frontend applications built with Javascript, it provides a store where different components on a web page can access data centrally and reducers that respond to actions to return a new application state, while Apache Kafka is a platform that provides real time data flow between multiple backend application either publishing events or consuming them.