Local development with LocalStack and Elixir Broadway
In this blog post, we’ll focus on how to create a simple Broadway app that can read and write messages using the SQS LocalStack service.
LocalStack is a fully functional local AWS cloud implementation that allows us to develop and test our cloud & serverless apps offline. Broadway is a library for building concurrent multi-stage data ingestion and processing pipelines in Elixir.
Creating a new mix application
Start by creating a new mix application with an application supervision tree.
mix new sqs_demo --sup
cd sqs_demo
Configuring LocalStack
Then install LocalStack cli, which allows you to easily manage LocalStack services.
brew install localstack
We’ll use the docker version of LocalStack. For this, we’ll create a new docker compose file in the root of our project. This blog post assumes that you have docker installed on your machine.
Create the docker-compose.yml
file with the following:
version: "3.8"
services:
localstack:
container_name: "localstack_main"
image: localstack/localstack
network_mode: bridge
ports:
- "127.0.0.1:4566:4566"
environment:
- SERVICES=sqs
- INIT_SCRIPTS_DIR=/docker-entrypoint-initaws.d
- DEBUG=true
- DATA_DIR=/tmp/localstack/data
- HOST_TMP_FOLDER=/tmp/localstack
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- ./aws:/docker-entrypoint-initaws.d
- "/tmp/localstack:/tmp/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
Also, we need an initialization script. That’s what we’ll do next.
Adding a LocalStack initialization script
Create a shell script to create a new SQS queue when initializing LocalStack.
mkdir aws
touch aws/init.sh
This creates a folder and a file named init.sh
. In init.sh
add
awslocal sqs create-queue --queue-name="sqs-demo"
Then, save the file.
We are telling LocalStack to create a new queue with the name sqs-demo
Testing the queue manually
First, we need to start LocalStack. Open a new shell, navigate to the root of the project and type
docker-compose up
you should see the LocalStack docker service initializing.
In a different shell we’re going to ssh into a LocalStack container. We’ll use LocalStack CLI to do so. Remember to navigate to the root of the project.
localstack ssh
Send a message to the queue with
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/sqs-demo \
--message-body '{ "hello":"world" }'
It should return something like this:
{
"MD5OfMessageBody": "befc643223f5103570d18fed795d336d",
"MessageId": "4f64ff51-cf5d-47be-07e8-3fd07871448a"
}
And read the message back with
awslocal sqs receive-message \
--queue-url http://localhost:4566/000000000000/sqs-demo
You should get this message back:
{
"Messages": [
{
"MessageId": "4f64ff51-cf5d-47be-07e8-3fd07871448a",
"ReceiptHandle": "psqicjvtungvqvbqunrzpbdjvfolyyujamfagzixdpaybcinpfbccneknktljiwccsfpeztrsnkdakonydvyqhqpmlkowbzxynsijhmtqphtwschxsajgzuwjvmyhnnobvomfstbgnvgsbcrcqegzubjhjnzqaqzhlzvmztyskuxisbjwiqvyacok",
"MD5OfBody": "befc643223f5103570d18fed795d336d",
"Body": "{ \"hello\":\"world\" }"
}
]
}
Awesome! Now we’re able to manually send and read messages from the queue.
Before we continue, purge all the test messages from the queue with
awslocal sqs purge-queue \
--queue-url http://localhost:4566/000000000000/sqs-demo
Exit from the container with exit
.
Sending messages from Elixir
Sending messages manually is great, but it won’t scale. Let’s add some code to send messages from the Elixir app. For that, we’ll use ex_aws
and ex_aws_sqs
libraries.
In the mix.exs
file, add the following:
defp deps do
[
{:ex_aws, "~> 2.1"},
{:ex_aws_sqs, "~> 3.3"},
{:poison, "~> 3.0"},
{:hackney, "~> 1.9"},
{:saxy, "~> 1.1"}
]
end
And get the new dependencies with
mix deps.get
Create a new function to send messages to the queue. In the file lib/sqs_demo.ex
add a new function
def send_message do
config = [
scheme: "http://",
host: "localhost",
port: 4566,
access_key_id: "",
secret_access_key: ""
]
queue_url = "http://localhost:4566/000000000000/sqs-demo"
message = Poison.encode!(%{"foo" => "bar"})
queue_url
|> ExAws.SQS.send_message(message)
|> ExAws.request!(config)
end
Let’s see what each line is doing.
config = [
scheme: "http://",
host: "localhost",
port: 4566,
access_key_id: "",
secret_access_key: ""
]
It tells ExAWS to use our local version of AWS, LocalStack is running on localhost
and port 4566
.
queue_url = "http://localhost:4566/000000000000/sqs-demo"
That is the url of the queue created by the initialization script.
message = Poison.encode!(%{"foo" => "bar"})
And this is the message we want to send.
queue_url
|> ExAws.SQS.send_message(message)
|> ExAws.request!(config)
Lastly, this tells ExAws to send a message to SQS using the configuration defined at the beginning.
Let’s run the app. Make sure to have LocalStack running on the other shell.
iex -S mix
iex> SqsDemo.send_message()
You should get a response like
%{
body: %{
md5_of_message_attributes: "",
md5_of_message_body: "8fc97148f4cc01e99d0028c320d8971a",
message_id: "bd225fd3-4355-af0c-8e44-2bbd325270b9",
request_id: "AA55J52HEKUSHQIDN61UO1F3BBJODASKAY3PLE09OHFYEMM6MEWT"
},
headers: [
...
],
status_code: 200
}
Great! This means that we’re able to send messages from our Elixir app to the LocalStack SQS queue.
Reading messages with Broadway
Broadway is an Elixir library that makes working with message brokers a breeze. It’ll allow us to build a data-processing pipeline by ingesting messages from LocalStack SQS service, and process them on what in Broadway terms are processors and batchers.
We’ll use broadway_sqs
producer. So, open the mix.exs
file and add the following
def deps do
[
...
{:broadway_sqs, "~> 0.7"}
]
end
Get the new dependency
mix deps.get
Now let’s configure a new Broadway pipeline. Broadway uses this configuration to dynamically build and start all parts of the data-ingestion pipeline for us. We need to define three things:
-
:producer
contains the configuration about the source of events and by default the concurrency is 1, which means that only one producer process will be spawned. In our case, LocalStack SQS will be our producer. -
:processor
allows configuring the process part of the pipeline and is the one that will do most of the work. By default, the number of processor processes is given bySystem.schedulers_online() * 2
. Processors use the callbackhandle_message
to do the work, which is called once per message. -
:batchers
process messages in batches. The default number of batcher processes is 1 and the default batch size is 10. Batchers use the callbackhandle_batch
and are important because they mark messages as acknowledged.
Create the file lib/sqs_demo/test_pipeline.ex
and add the following:
defmodule SqsDemo.TestPipeline do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwaySQS.Producer,
queue_url: "http://localhost:4566/000000000000/sqs-demo",
config: [
scheme: "http://",
host: "localhost",
port: 4566,
access_key_id: "",
secret_access_key: ""
]
}
],
processors: [
default: []
],
batchers: [
default: []
]
)
end
@impl true
def handle_message(_, %Message{data: data} = message, _) do
IO.inspect data, label: "******* Message ********** "
message
end
@impl true
def handle_batch(_, messages, _, _) do
messages
end
end
We’re using the default processor and batchers configuration to keep it simple. Broadway supports a ton of customization. You can look at the documentation here.
Let’s see what we’re doing here
defmodule SqsDemo.TestPipeline do
use Broadway
Define a new module and tell it to use the Broadway behavior.
BroadwaySQS.Producer,
queue_url: "http://localhost:4566/000000000000/sqs-demo",
config: [
scheme: "http://",
host: "localhost",
port: 4566
]
Add a new SQS producer with the same configuration we used before to send the message to the queue.
@impl true
def handle_message(_, %Message{data: data} = message, _) do
IO.inspect data, label: "******* Message ******* "
message
end
This callback is called by the processor for each message, we inspect the messages and print it on the screen.
Before we can test our new pipeline, we need to add it to the application supervision tree.
Open lib/sqs_demo/application.ex
and update the children collection with
children = [
{SqsDemo.TestPipeline, []}
]
Now let’s test our new Broadway pipeline. Start a new iex session with iex -S mix
and call the send_message
function. Remember to have LocalStack running on the other shell.
iex> SqsDemo.test()
And you should see the messages printed on iex output
******* Message *******: "{\"foo\":\"bar\"}"
Perfect! It worked!
Recap
We were able to create a test application that is capable of sending and receiving messages from LocalStack SQS service using Broadway and ExAWS libraries. Moreover, we did all this without the need for AWS.
We’re running a local instance of LocalStack using docker.
We’ve created an SQS queue using an initialization script.
We’ve manually tested the LocalStack queue by ssh
ing into the docker container.
Using ex_aws
with ex_aws_sqs
we’ve sent messages to the queue from the Elixir app.
Using Broadway with BroadwaySQS producer, we’ve read the messages back from the queue.
I hope you find it useful. This setup is great for development and testing and doesn’t rely on AWS.