Microservice Communication with Queues

Development

Reading Time: 9 minutes

Microservices are small programs that handle one task. A microservice that is never used is useless though — it’s the system on the whole that provides value to the user. Microservices work together by communicating messages back and forth so that they can accomplish the larger task.

Communication is key, but there are a variety of ways this can be accomplished. A pretty standard way is through a RESTful API, passing JSON back and forth over HTTP. In one sense, this is great; it’s a form of communication that’s well understood. However, this method isn’t without flaws because it adds other factors, such as HTTP status codes and receiving/parsing requests and responses.

What other ways might microservices communicate back and forth? In this article, we’re going to explore the use of a queue, more specifically RabbitMQ.

What Does RabbitMQ Do?

RabbitMQ provides a language-agnostic way for programs to send messages to each other. In simple terms, it allows a “Publisher/Producer” to send a message and allows for a “Consumer” to listen for those messages.

In one of its simpler models, it resembles what many Rails developers are used to with Sidekiq: the ability to distribute asynchronous tasks among one or more workers. Sidekiq is one of the first things I install on all my Rails projects. I don’t think RabbitMQ would necessarily take its place, especially for things that work more easily within a Rails environment: sending emails, interacting with Rails models, etc.

It doesn’t stop there though. RabbitMQ can also handle Pub/Sub functionality, where a single “event” can be published and one or more consumers can subscribe to that event. You can take this further where consumers can subscribe only to specific events and/or events that match the pattern they’re watching for.

Finally, RabbitMQ can allow for RPCs (Remote Procedure Calls), where you’re looking for an answer right away from another program… basically calling a function that exists in another program.

In this article, we’ll be taking a look at both the “Topic” or pattern-based Pub/Sub approach, as well as how an RPC can be accomplished.

Sign up for a free Codeship Account

Event-based and asynchronous

The first example we’ll be working with today is a sports news provider who receives incoming data about scores, goals, players, teams, etc. It has to parse the data, store it, and perform various tasks depending on the incoming data.

To make things a little clearer, let’s imagine that, in one of the incoming data streams, we’ll be notified about soccer goals.

When we discover that a goal has happened, there are a number of things that we need to do:

  • Parse and normalize the information
  • Store the details locally
  • Update the “box-score” for the game that the goal took place in
  • Update the league leaderboard showing who the top goal scorer is
  • Notify all subscribers (push notification) of a particular league, team, or player
  • And any number of other tasks or analysis that we need to do

Do we need to do all of those tasks in order? Should the program in charge of processing incoming data need to know about all of these tasks and how to accomplish them? I suggest that other than parsing/normalizing the incoming data and maybe even saving it locally, the rest of the tasks can be done asynchronously and that the program shouldn’t really know or care about all of these other tasks.

What we can do is have the parser program emit an event (soccer.mls.goal for example), along with its accompanying information:

{
  league: 'MLS',
  team: 'Toronto FC',
  player: 'Sebastian Giovinco',
  opponent: 'New York City FC',
  time: '14:21'
}

The parser can then forget about it! It’s done its work of emitting the event. The rest of the work will be done by any number of consumers who have subscribed to this specific event.

Producing in Ruby

To produce or emit events in Ruby, the first thing we need to do is install the bunny client, which allows Ruby to communicate with RabbitMQ. For an example, here is some fake incoming data that needs to trigger the goal event for soccer.

# Imagine the parsing happens here :)
soccer = Soccer.new
soccer.emit_goal(
  league: 'MLS',
  team: 'Toronto FC',
  player: 'Sebastian Giovinco',
  opponent: 'New York City FC',
  time: '14:21'
)

Let’s next take a look at the emit_goal function inside of the Soccer class, which builds the event slug and packages the data together to be included in the event being emitted:

class Soccer
  include EventEmitter

  def emit_goal(raw_details)
    slug = "soccer.#{raw_details[:league]}.goal".downcase # "soccer.mls.goal"
    payload = raw_details.slice(:league, :team, :player, :opponent, :time)
    emit('live_events', slug, payload)
  end
end

The 'live_events' string has to do with which Exchange to publish the event to. An Exchange is basically like a router that decides which Queue(s) the event should be placed into. The emit method is inside of a Module I created to simplify emitting events:

module EventEmitter
  def emit(topic, slug, payload)
    conn = Bunny.new
    conn.start

    ch = conn.create_channel
    x = ch.topic(topic)

    x.publish(payload.to_json, routing_key: slug)
    puts " [OUT] #{slug}:#{payload}"

    conn.close
  end
end

It receives the topic, event slug, and event payload and sends that information to RabbitMQ.

Consuming in Ruby

So far we have produced an event, but without a consumer to consume it, the event will be lost. Let’s create a Ruby consumer that is listening for all soccer goal events.

screen-shot-2016-11-07-at-1-29-34-pm

You may have noticed that what I was calling the event slug (or the routing_key) looked like "soccer.mls.goal". Picking a pattern to follow is important, because consumers can choose which events to listen for based on a pattern such as "soccer.*.goal": all soccer goals regardless of the league.

The consumer in this case will be some code which updates the leaderboard for the top goal scorers in the league. It is kicked off by running a Ruby file with this line:

SoccerLeaderboard.new.live_updates

The SoccerLeaderboard class has a method called live_updates which will call a receive method provided be an included Module. It will provide the topic, the pattern of event slug/routing_key to listen for, and a block of code to be called any time there is a new event to process.

class SoccerLeaderboard
  include EventReceiver

  def live_updates
    receive('live_events', 'soccer.*.goal') do |payload|
      puts "#{payload['player']} has scored a new goal."
    end
  end
end

The EventReceiver Module is a little larger, but for the most part it’s just setting up a connection to RabbitMQ and telling it what it wants to listen for.

module EventReceiver
  def receive(topic, pattern, &block)
    conn = Bunny.new
    conn.start

    ch = conn.create_channel
    x = ch.topic(topic)
    q = ch.queue("", exclusive: true)
    q.bind(x, routing_key: pattern)

    puts " [INFO] Waiting for events. To exit press CTRL+C"

    begin
      q.subscribe(:block => true) do |delivery_info, properties, body|
        puts " [IN] #{delivery_info.routing_key}:#{body}"
        block.call(JSON.parse(body))
      end
    rescue Interrupt => _
      ch.close
      conn.close
    end
  end
end

Consuming in Elixir

I mentioned that RabbitMQ is language agnostic. What I mean by this is that we can not only have a consumer in Ruby listening for events, but we can have a consumer in Elixir listening for events at the same time.

In Elixir, the package I used to connect to RabbitMQ was amqp. One gotcha was that it relies on amqp_client which was giving me problems with Erlang 19. To solve that, I had to link directly to the GitHub repository because it doesn’t appear that the fix has been published to Hex yet.

defp deps do
  [
    {:amqp_client, git: "https://github.com/dsrosario/amqp_client.git", branch: "erlang_otp_19", override: true},
    {:amqp, "~> 0.1.5"}
  ]
end

The code to listen for events in Elixir looks like the following code below. Most of the code inside of the start_listening method is just creating a connection to RabbitMQ and telling it what to subscribe to. The wait_for_messages is where the event processing takes place.

defmodule GoalNotifications do
  def start_listening do
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Exchange.declare(channel, "live_events", :topic)
    {:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
    AMQP.Queue.bind(channel, queue_name, "live_events", routing_key: "*.*.goal")
    AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)
    IO.puts " [INFO] Waiting for messages. To exit press CTRL+C, CTRL+C"
    wait_for_messages(channel)
  end

  def wait_for_messages(channel) do
    receive do
      {:basic_deliver, payload, meta} ->
      IO.puts " [x] Received [#{meta.routing_key}] #{payload}"

      wait_for_messages(channel)
    end
  end
end


GoalNotifications.start_listening

RPC… when you need an answer right away

Remote Procedure Calls can be accomplished with RabbitMQ, but I’ll be honest: It’s more involved than the examples above for more of a typical Pub/Sub approach. To me, it felt like each side (producer/consumer) has to act as both a producer and a consumer.

The flow is a little like this:

  • Program A asks Program B for some information, providing a unique ID for the request
  • Program A listens for responses that match the same unique ID
  • Program B receives request, does the work and provides a response with the same unique ID
  • Program A runs callback once matching unique ID is found in response from Program B

In this example, we’ll be talking about a product’s inventory… an answer we need to know right away to be sure that there is stock available for a customer to purchase.

inventory = ProductInventory.new('abc123').inventory
puts "Product has inventory of #{inventory}"

The ProductInventory class is quite simple, mostly because I’ve hidden the complexity of the RPC call inside of a class called RemoteCall.

class ProductInventory
  attr_accessor :product_sku

  def initialize(product_sku)
    @product_sku = product_sku
  end

  def inventory
    RemoteCall.new('inventory').response(product_sku)
  end
end

Now let’s take a look at how RemoteCall is handling it:

require 'bunny'
require 'securerandom'

class RemoteCall
  attr_reader :lock, :condition
  attr_accessor :conn, :channel, :exchange, :reply_queue,
    :remote_response, :call_id, :queue_name

  def initialize(queue_name)
    @queue_name = queue_name
    @conn = Bunny.new
    @conn.start
    @channel = conn.create_channel
    @exchange = channel.default_exchange
    @reply_queue = channel.queue('', exclusive: true)
  end

  def response(payload)
    @lock = Mutex.new
    @condition = ConditionVariable.new
    response_callback(reply_queue)

    self.call_id = SecureRandom.uuid
    puts "Awaiting call with correlation ID #{call_id}"
    exchange.publish(payload,
      routing_key: queue_name,
      correlation_id: call_id,
      reply_to: reply_queue.name
    )
    lock.synchronize { condition.wait(lock) }

    remote_response
  end

  private

  def response_callback(reply_queue)
    that = self
    reply_queue.subscribe do |delivery_info, properties, payload|
      if properties[:correlation_id] == that.call_id
        that.remote_response = payload
        that.lock.synchronize { that.condition.signal }
      end
    end
  end
end

So if all that code was for the Producer, what does the Consumer look like? It’s kicked off with:

server = InventoryServer.new
server.start

And the InventoryServer looks like:

require 'bunny'

class InventoryServer
  QUEUE_NAME = 'inventory'.freeze
  attr_reader :conn

  def initialize
    @conn = Bunny.new
  end

  def start
    conn.start
    channel = conn.create_channel
    queue = channel.queue(QUEUE_NAME)
    exchange = channel.default_exchange
    subscribe(queue, exchange)
  rescue Interrupt => _
    channel.close
    conn.close
  end

  def subscribe(queue, exchange)
    puts "Listening for inventory calls"

    queue.subscribe(block: true) do |delivery_info, properties, payload|
      puts "Received call with correlation ID #{properties.correlation_id}"
      product_sku = payload
      response = self.class.inventory(product_sku)
      exchange.publish(response.to_s,
        routing_key: properties.reply_to,
        correlation_id: properties.correlation_id
      )
    end
  end

  def self.inventory(product_sku)
    42
  end
end

Wow… that’s a lot of work to make an RPC! RabbitMQ has a great guide explaining how this works in a variety of different languages.

Conclusion

Microservices don’t always need to communicate synchronously, and they don’t always need to communicate over HTTP/JSON either. They can, but next time you’re thinking about how they should speak to each other, why not consider doing it asynchronously using RabbitM? It comes with a great interface for monitoring the activity of the queue and has fantastic client support in a variety of popular languages. It’s fast, reliable, and scalable.

Microservices aren’t free though… I think it’s worthwhile considering whether the extra complexity involved in setting up separate services and providing them a way to communicate couldn’t be better handled using something like Sidekiq and writing clean, modular code.

Subscribe via Email

Over 60,000 people from companies like Netflix, Apple, Spotify and O'Reilly are reading our articles.
Subscribe to receive a weekly newsletter with articles around Continuous Integration, Docker, and software development best practices.



We promise that we won't spam you. You can unsubscribe any time.

Join the Discussion

Leave us some comments on what you think about this topic or if you like to add something.

  • Pingback: Microservice Communication with Queues – Scapbi's Weblog()

  • edelpero

    I’ll start by saying nice article.

    Regarding using RabbitMQ with Microservices I would like to share with you my experience and opinions.

    I love RabbitMQ for Pub/Sub, mostly because of it flexibility when you use topic exchanges.

    I prefer avoid using RabbitMQ for RPC unless you want to keep the messages within rabbit in case a request fail or something.

    Using HTTP/JSON scales better and it’s more simple to maintain. Adding RabbitMQ in the middle of two services adds an extra dependency to the architecture of the app and when you experience latency issues, this will kill you.

    Also another thing that I consider a good practice is to use different RabbitMQ layers per microservice, it will be something as mailbox for processes on Erlang. This way if some layer fails, the problem will be isolated and won’t affect other parts of your system. This also gives you flexibility to perform all other kind of operations.

    Again, thanks for writing this article.

  • Tiago Cardoso

    It is an interesting post indeed. I developed a variation of this “microservice” approach, where one service would live waiting for certain requests on a requests rabbitmq channel, enqueue a sidekiq job to redis, fetch the job from redis, process the job in a worker, and enqueue it to a responses channel in rabbitmq to be consumed by the other service which requested it in the first place.
    Now what’s obvious in this setup is the network indirection overhead, or “pub to mq, sub from mq, enqueue to redis/sidekiq, dequeue from redis/sidekiq”. The “problem” with this setup is the guarantees offered by the sidekiq service (retry jobs queue, dead jobs queue, jobs running queue, nice webUI), which just can’t be reproduced with a plain “mq + workers pool” setup without a significant ammount of work.
    So this setup is based only on feature richness. One could stay only with the queue approach, if all the other guarantees are negligible. But even that can be too much sometimes. What if I can have my producers talk directly with the consumer and viceversa? That would be the zeroMQ approach. It eliminates the overhead of enqueueing to a 3rd party, but it’s probably harder to guarantee availability and integration of the messages. Again, a trade-off.

  • randito

    I used Ruby and RabbitMQ on my last project. We ended up using the library Hutch — https://github.com/gocardless/hutch — to add some nice syntactic sugar on top of everything to avoid the nitty gritty of setting up queues, communication, etc.

    It ended up being very nice. There was a consumer class that used a worker class — passing along messages to a #work method.

    We also used a convention that I came up with (but probably stole from somewhere). Where if events were sent over a ‘project.function.action’ type queue then results were sent over the ‘project.function.action.results’ queue and errors on the ‘project.function.action.error’ queue with logs sent over ‘project.function.action.log’. It let us create simple ‘*.error’ error handlers and ‘*.log’ loggers. Plus, you can easily chain message processors together by listening to a specific ‘.result’ channel.

    But, the sad part is… we struggled with the “query” or “RPC” side of things. Never ended up implementing it with the RabbmitMQ side being eclipsed with HTTP for queries.

  • Asynchronous Messaging is a bit like a face tattoo; you better be sure you really want it.

    • Douwe de vries

      Can you elaborate?

      because i’m thinking of using RabbitMQ for communicating events between microserves and don’t know the pros and cons of implementing this.

      • Sure. There’s a lot more than I could reasonably cover in this forum but in a nutshell, like all tech choices, Async Messaging solves some problems and creates others. Is reacting to/capturing your events critical or best efforts for example? The answer to that will will have a significant impact on how you configure your queues. And then there is the configuration and management of RabbitMQ, not a trivial task. I’m not saying don’t, I’m saying don’t take that decision lightly. As well as looking at RabbitMQ you might also want to take a look at ZeroMQ and perhaps even EventSourcing (I’m taking a wild punt here, this may be well out of scope for what you are trying to do).

        • Douwe de vries

          thank you for your insights. I’m going to use this in our discussion if we really need/want it.

          Perhaps start with some proof-of-concepts to see what works and doesn’t work.

  • fmvilas

    Very nice article. You might want to have a look at my post on Nordic APIs: http://nordicapis.com/asynchronous-apis-in-choreographed-microservices/.

    We have a Slack channel for Async APIs, you can join here: https://async-apis-slack.herokuapp.com/