In this blog post, we’ll focus on how to create a simple Broadway app that can read and write messages using the SQS LocalStack service.

LocalStack is a fully functional local AWS cloud implementation that allows us to develop and test our cloud & serverless apps offline. Broadway is a library for building concurrent multi-stage data ingestion and processing pipelines in Elixir.

Creating a new mix application

Start by creating a new mix application with an application supervision tree.

mix new sqs_demo --sup
cd sqs_demo

Configuring LocalStack

Then install LocalStack cli, which allows you to easily manage LocalStack services.

brew install localstack

We’ll use the docker version of LocalStack. For this, we’ll create a new docker compose file in the root of our project. This blog post assumes that you have docker installed on your machine.

Create the docker-compose.yml file with the following:

version: "3.8"

services:
  localstack:
    container_name: "localstack_main"
    image: localstack/localstack
    network_mode: bridge
    ports:
      - "127.0.0.1:4566:4566"
    environment:
      - SERVICES=sqs
      - INIT_SCRIPTS_DIR=/docker-entrypoint-initaws.d
      - DEBUG=true
      - DATA_DIR=/tmp/localstack/data
      - HOST_TMP_FOLDER=/tmp/localstack
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - ./aws:/docker-entrypoint-initaws.d
      - "/tmp/localstack:/tmp/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"

Also, we need an initialization script. That’s what we’ll do next.

Adding a LocalStack initialization script

Create a shell script to create a new SQS queue when initializing LocalStack.

mkdir aws
touch aws/init.sh

This creates a folder and a file named init.sh. In init.sh add

awslocal sqs create-queue --queue-name="sqs-demo"

Then, save the file.

We are telling LocalStack to create a new queue with the name sqs-demo

Testing the queue manually

First, we need to start LocalStack. Open a new shell, navigate to the root of the project and type

docker-compose up

you should see the LocalStack docker service initializing.

In a different shell we’re going to ssh into a LocalStack container. We’ll use LocalStack CLI to do so. Remember to navigate to the root of the project.

localstack ssh

Send a message to the queue with

awslocal sqs send-message \
   --queue-url http://localhost:4566/000000000000/sqs-demo \
   --message-body '{ "hello":"world" }'

It should return something like this:

{
    "MD5OfMessageBody": "befc643223f5103570d18fed795d336d",
    "MessageId": "4f64ff51-cf5d-47be-07e8-3fd07871448a"
}

And read the message back with

awslocal sqs receive-message \
   --queue-url http://localhost:4566/000000000000/sqs-demo

You should get this message back:

{
    "Messages": [
        {
            "MessageId": "4f64ff51-cf5d-47be-07e8-3fd07871448a",
            "ReceiptHandle": "psqicjvtungvqvbqunrzpbdjvfolyyujamfagzixdpaybcinpfbccneknktljiwccsfpeztrsnkdakonydvyqhqpmlkowbzxynsijhmtqphtwschxsajgzuwjvmyhnnobvomfstbgnvgsbcrcqegzubjhjnzqaqzhlzvmztyskuxisbjwiqvyacok",
            "MD5OfBody": "befc643223f5103570d18fed795d336d",
            "Body": "{ \"hello\":\"world\" }"
        }
    ]
}

Awesome! Now we’re able to manually send and read messages from the queue.

Before we continue, purge all the test messages from the queue with

awslocal sqs purge-queue \
   --queue-url http://localhost:4566/000000000000/sqs-demo

Exit from the container with exit.

Sending messages from Elixir

Sending messages manually is great, but it won’t scale. Let’s add some code to send messages from the Elixir app. For that, we’ll use ex_aws and ex_aws_sqs libraries.

In the mix.exs file, add the following:

defp deps do
  [
    {:ex_aws, "~> 2.1"},
    {:ex_aws_sqs, "~> 3.3"},
    {:poison, "~> 3.0"},
    {:hackney, "~> 1.9"},
    {:saxy, "~> 1.1"}
  ]
end

And get the new dependencies with

mix deps.get

Create a new function to send messages to the queue. In the file lib/sqs_demo.ex add a new function

def send_message do
  config = [
    scheme: "http://", 
    host: "localhost", 
    port: 4566,
    access_key_id: "",
    secret_access_key: ""
]
  queue_url = "http://localhost:4566/000000000000/sqs-demo"
  message = Poison.encode!(%{"foo" => "bar"})

  queue_url
  |> ExAws.SQS.send_message(message)
  |> ExAws.request!(config)
end

Let’s see what each line is doing.

config = [
  scheme: "http://", 
  host: "localhost", 
  port: 4566,
  access_key_id: "",
  secret_access_key: ""
]

It tells ExAWS to use our local version of AWS, LocalStack is running on localhost and port 4566.

queue_url = "http://localhost:4566/000000000000/sqs-demo"

That is the url of the queue created by the initialization script.

message = Poison.encode!(%{"foo" => "bar"})

And this is the message we want to send.

queue_url
|> ExAws.SQS.send_message(message)
|> ExAws.request!(config)

Lastly, this tells ExAws to send a message to SQS using the configuration defined at the beginning.

Let’s run the app. Make sure to have LocalStack running on the other shell.

iex -S mix
iex> SqsDemo.send_message()

You should get a response like

%{
  body: %{
    md5_of_message_attributes: "",
    md5_of_message_body: "8fc97148f4cc01e99d0028c320d8971a",
    message_id: "bd225fd3-4355-af0c-8e44-2bbd325270b9",
    request_id: "AA55J52HEKUSHQIDN61UO1F3BBJODASKAY3PLE09OHFYEMM6MEWT"
  },
  headers: [
    ...
  ],
  status_code: 200
}

Great! This means that we’re able to send messages from our Elixir app to the LocalStack SQS queue.

Reading messages with Broadway

Broadway is an Elixir library that makes working with message brokers a breeze. It’ll allow us to build a data-processing pipeline by ingesting messages from LocalStack SQS service, and process them on what in Broadway terms are processors and batchers.

We’ll use broadway_sqs producer. So, open the mix.exs file and add the following

def deps do
  [
    ...
    {:broadway_sqs, "~> 0.7"}
  ]
end

Get the new dependency

mix deps.get

Now let’s configure a new Broadway pipeline. Broadway uses this configuration to dynamically build and start all parts of the data-ingestion pipeline for us. We need to define three things:

  • :producer contains the configuration about the source of events and by default the concurrency is 1, which means that only one producer process will be spawned. In our case, LocalStack SQS will be our producer.

  • :processor allows configuring the process part of the pipeline and is the one that will do most of the work. By default, the number of processor processes is given by System.schedulers_online() * 2. Processors use the callback handle_message to do the work, which is called once per message.

  • :batchers process messages in batches. The default number of batcher processes is 1 and the default batch size is 10. Batchers use the callback handle_batch and are important because they mark messages as acknowledged.

Create the file lib/sqs_demo/test_pipeline.ex and add the following:

defmodule SqsDemo.TestPipeline do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
          module: {
          BroadwaySQS.Producer,
            queue_url: "http://localhost:4566/000000000000/sqs-demo",
            config: [
            scheme: "http://",
            host: "localhost",
            port: 4566,
            access_key_id: "",
            secret_access_key: ""
          ]
          }
      ],
      processors: [
           default: []
      ],
      batchers: [
          default: []
      ]
    )
  end

  @impl true
  def handle_message(_, %Message{data: data} = message, _) do
    IO.inspect data, label: "******* Message ********** "

    message
  end

  @impl true
  def handle_batch(_, messages, _, _) do
    messages
  end
end

We’re using the default processor and batchers configuration to keep it simple. Broadway supports a ton of customization. You can look at the documentation here.

Let’s see what we’re doing here

defmodule SqsDemo.TestPipeline do
  use Broadway

Define a new module and tell it to use the Broadway behavior.

BroadwaySQS.Producer,
queue_url: "http://localhost:4566/000000000000/sqs-demo",
config: [
  scheme: "http://",
  host: "localhost",
  port: 4566
]

Add a new SQS producer with the same configuration we used before to send the message to the queue.

@impl true
def handle_message(_, %Message{data: data} = message, _) do
  IO.inspect data, label: "******* Message ******* "

  message
end

This callback is called by the processor for each message, we inspect the messages and print it on the screen.

Before we can test our new pipeline, we need to add it to the application supervision tree.

Open lib/sqs_demo/application.ex and update the children collection with

children = [
  {SqsDemo.TestPipeline, []}
]

Now let’s test our new Broadway pipeline. Start a new iex session with iex -S mix and call the send_message function. Remember to have LocalStack running on the other shell.

iex> SqsDemo.test()

And you should see the messages printed on iex output

******* Message *******: "{\"foo\":\"bar\"}"

Perfect! It worked!

Recap

We were able to create a test application that is capable of sending and receiving messages from LocalStack SQS service using Broadway and ExAWS libraries. Moreover, we did all this without the need for AWS.

We’re running a local instance of LocalStack using docker. We’ve created an SQS queue using an initialization script. We’ve manually tested the LocalStack queue by sshing into the docker container. Using ex_aws with ex_aws_sqs we’ve sent messages to the queue from the Elixir app. Using Broadway with BroadwaySQS producer, we’ve read the messages back from the queue.

I hope you find it useful. This setup is great for development and testing and doesn’t rely on AWS.