GenStage for continuous job processing

We currently use Elixir at uSwitch to process user-submitted forms. We send the data to a 3rd party API and then parse the output, saving the results to a database.

The first implementation we implemented was a recursive loop that looked like this:

Step 1: Long-running SQS request, as soon as at least one message is available, the request returns N messages
Step 2: For every message in the request, spawn a new process to handle it
Step 3: Go back to Step 1

This usually works great. But one issue we found is that this is a very greedy approach — if we received a spike in traffic (any time our comparison service is mentioned in the media), we'd try to spin up a lot of processes, each consuming some memory making it possible for the whole application to crash, when it eventually runs out of memory. Thanks to Supervisors, it would recover, but because the queue only got bigger in the meantime, the problem would repeat.

It was clear we needed an alternative approach. Our first hotfix was to sleep for a few seconds after Step 2, giving processes enough time to finish. We would then spin up multiples of the application to handle the load and minimise the effect of the artificial delay. This was only a temporary solution, as it creates another set of problems:

  • There is a delay between calls - if a message gets sent during that time, it has to wait regardless of the current workload
  • There is no guarantee that the jobs will finish in the allocated time - it should not be set in stone and instead adapt to the workload - i.e. jobs only starting when others finish.

GenStage

GenStage is a behaviour (in Elixir terms it’s an interface) that defines how events should be exchanged between Elixir processes. It’s suitable when your events go through stages - with possible side effects; an alternative way of looking at it would be to imagine a workflow similar to conveyor lines. GenStage defines the design for three types of stages: Producer (creating events), Producer-Consumer (consuming events from another stage as well as producing them for the next stage), Consumer (just consuming events). This allows you to specify the workflow and determine the demand each stage can handle (as usually the different stages will take different amount of time/processing power).

In our case, there's a Producer (SQS) and Consumer (form processor). The first iteration looked something like this:

defmodule Producer do  
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer, :ok}
  end

  def handle_demand(demand, _state) do
    messages = SQS.get_messages

    {:noreply, messages, :ok}
  end
end

defmodule Consumer do  
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
  end

  def handle_events(messages, _from, _state) do
    messages
    |> Enum.map(&Task.start_link(Processor, :process_message, [&1]))

    {:noreply, [], :the_state_does_not_matter}
  end
end  

When Consumers are started, they subscribe to the Producer and send demand for events. The handle_demand callback is called, SQS retrieves messages and passes them over to handle_events in Consumer which, in turn, starts the individual Tasks to process jobs.

Unfortunately, if the long-running SQS request does not return anything (and in our case the upper limit of 20s is reached), the demand is never fulfilled; and another request is never created.

Continuous polling for messages

The GenStage documentation covers the above scenario by using BroadcastDispatcher and keeping a queue and demand in the state of the producer. One part didn't quite fit our setup: having to manually call sync_notify to send events. We needed a way to continuously request data and send it to Consumers if we received any messages.

The solution to that is to create a callback with handle_cast that calls itself recursively, and call it once at startup:

def handle_cast(:check_for_messages, state) do  
  messages = SQS.get_messages

  GenStage.cast(__MODULE__, :check_for_messages)

  {:noreply, messages, state}
end

def handle_demand(demand, state) do  
  GenStage.cast(__MODULE__, :check_for_messages)

  {:noreply, [], state}
end  

When Consumers start, they call handle_demand, which then starts a recursive loop. It either returns some messages from SQS or an empty list; those are passed to the Consumer and the whole thing works.

Rate-limiting

The problem with continuous polling is that if a traffic spike occurs, it will continue to spawn new tasks until we run out of memory. To solve this, we've implemented DynamicSupervisor as the Consumer (renamed to ConsumerSupervisor in GenStage v0.11.0). This allows us to specify :max_demand, which dictates how many child processes can spawn.

In turn, the Producer doesn't know about this limit, so it will continue polling SQS and send events. Once max_demand is reached, it will start filling the internal buffer. In our case, we didn't want messages to be read from the queue until we knew we had the capacity to process them (as another instance of the application might be able to read them). By keeping the number of current demand from Consumers in Producer's state, we only request data from SQS when demand is there.

The combination of both these approaches looks like this:

defmodule Producer do  
  alias Experimental.GenStage
  use GenStage

  def start_link(state) do
    GenStage.start_link(__MODULE__, state, name: __MODULE__)
  end

  def init(state) do
    {:producer, state}
  end

  def handle_cast(:check_for_messages, 0) do
    {:noreply, [], 0}
  end
  def handle_cast(:check_messages, state) do
    messages = SQS.get_messages

    GenStage.cast(__MODULE__, :check_messages)

    {:noreply, messages, state - Enum.count(messages)}
  end

  def handle_demand(demand, state) do
    GenStage.cast(__MODULE__, :check_messages)

    {:noreply, [], demand+state}
  end
end

defmodule Consumer do  
  alias Experimental.DynamicSupervisor
  use DynamicSupervisor

  def start_link() do
    DynamicSupervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    children = [
      worker(Processor, [], restart: :temporary),
    ]

    {:ok, children, strategy: :one_for_one, subscribe_to: [{Producer, max_demand: 10, min_demand: 1}]}
  end
end

defmodule Processor do  
  def start_link(message) do
    Task.start_link(__MODULE__, :process_message, [message])
  end

  def process_message(message) do
    # Do work here
    Logger.debug("Sleeping for 15s")
    :timer.sleep(15_000)
  end
end  

The important part is keeping the number of available processes in state. As messages arrive, this number gets reduced. When we have zero available processes, SQS requests stop. When the processes finish, the demand increases again.

This would ensure that at max, we'd have 10 tasks processing SQS messages. As soon as a SQS request returned messages, they'd be sent to new tasks, while a new SQS request would start, eliminating any delay between them.

It's also worth noting that our SQS request would return no more than one message at a time - otherwise we would have issues with potentially negative amount of consumers available.

All of this would be Supervised when the application is booted up:

def start(_type, _args) do  
  import Supervisor.Spec

  children = [
    worker(Producer, []),
    worker(Consumer, [])
  ]

  {:ok, pid} = Supervisor.start_link(children, strategy: :one_for_one)
end  

Cast vs send()

It's possible that GenStage.cast/2 is not the best choice for the GenStage process to send messages to itself. Perhaps it's more suitable instead for external processes - while you can use Kernel.send/2 internally.

def handle_info(:check_messages, 0), do: {:noreply, [], 0}  
def handle_info(:check_messages, state) do  
  Logger.debug("Reading from SQS... got #{state} workers ready")

  messages = SQS.receive_messages

  Process.send(self(), :check_messages, [])

  {:noreply, messages, state - Enum.count(messages)}
end

def handle_demand(demand, state) do  
  send(self(), :check_messages)

  {:noreply, [], demand+state}
end  

As far as we could tell, the behaviour is the same.

Summary

There's a lot of moving parts to this architecture:

Supervisor -> Producer + (ConsumerSupervisor -> Processors)

However, for the amount of code needed, a lot of which was boilerplate, it certainly felt quite easy to accomplish a rate-limited continuous background process that can recover when any of its parts fail.

Thank you to Elixir Slack community for help figuring this out, in particular @sschneider and @hellopatrick. Any feedback is appreciated, please reach out to me (@tomasztomczyk) in Slack!