Carotte 🥕
A type-safe RabbitMQ client for Gleam that provides a clean, idiomatic interface for message queue operations on the Erlang VM.
Features
- Type-safe API - Leverage Gleam’s type system for safe message handling
- High Performance - Built on top of the battle-tested
amqp_clientErlang library - Idiomatic Gleam - Clean, functional API with everything in a single module
- Complete Feature Set - Support for exchanges, queues, publishing, consuming, and more
- Supervised Consumers - OTP-based consumer supervision with automatic restarts
- Connection Helpers - Built-in reconnection support and connection monitoring
- Async Operations - Non-blocking operations with
_asyncvariants - Flexible Message Acknowledgment - Manual acknowledgment support for reliable message processing
- Full Headers Support - Send and receive message headers with type-safe accessors
- Operation-Specific Error Types - Granular error types for precise error handling
Installation
gleam add carotte
Quick Start
import carotte
import gleam/erlang/process
import gleam/io
pub fn main() {
// Connect to RabbitMQ
let assert Ok(client) =
carotte.ClientConfig(
..carotte.default_client(),
host: "localhost",
port: 5672,
)
|> carotte.start()
// Open a channel
let assert Ok(ch) = carotte.open_channel(client)
// Declare an exchange
let assert Ok(_) =
carotte.Exchange(..carotte.exchange("my_exchange"), exchange_type: carotte.Direct)
|> carotte.declare_exchange(ch)
// Declare a durable queue
let assert Ok(_) =
carotte.QueueConfig(..carotte.default_queue("my_queue"), durable: True)
|> carotte.declare_queue(ch)
// Bind queue to exchange
let assert Ok(_) =
carotte.bind_queue(
channel: ch,
queue: "my_queue",
exchange: "my_exchange",
routing_key: "my_routing_key",
)
// Publish a message (payload is BitArray)
let assert Ok(_) =
carotte.publish(
channel: ch,
exchange: "my_exchange",
routing_key: "my_routing_key",
payload: <<"Hello, RabbitMQ!">>,
options: [],
)
// Start a consumer supervisor
let consumers = process.new_name("consumers")
let assert Ok(consumer) = carotte.start_consumer(consumers)
// Subscribe to messages (supervised) - returns consumer_tag string
let assert Ok(consumer_tag) =
carotte.subscribe(
consumer,
channel: ch,
queue: "my_queue",
callback: fn(msg, _deliver) {
// msg.payload is BitArray - convert to string if needed
let assert Ok(text) = bit_array.to_string(msg.payload)
io.println("Received: " <> text)
// Messages are auto-acknowledged by default
},
)
// Clean up
let assert Ok(_) = carotte.unsubscribe(channel: ch, consumer_tag:)
let assert Ok(_) = carotte.close(client)
}
Core Concepts
Connection Management
Create and configure a RabbitMQ connection:
import gleam/time/duration
let assert Ok(client) =
carotte.ClientConfig(
..carotte.default_client(),
username: "admin",
password: "secret",
host: "rabbitmq.example.com",
virtual_host: "/production",
heartbeat: duration.seconds(30),
connection_timeout: duration.seconds(60),
)
|> carotte.start()
// Check connection status
assert carotte.is_connected(client) == True
// Reconnect if needed
case carotte.is_connected(client) {
True -> client
False -> {
let assert Ok(new_client) = carotte.reconnect(client)
new_client
}
}
Exchanges
Carotte supports all RabbitMQ exchange types:
// Create a durable topic exchange
carotte.Exchange(
..carotte.exchange("logs"),
exchange_type: carotte.Topic,
durable: True,
)
|> carotte.declare_exchange(channel)
// Available exchange types:
// - Direct: Route based on exact routing key match
// - Topic: Route based on routing key patterns
// - Fanout: Route to all bound queues
// - Headers: Route based on message headers
Queues
Declare and configure queues using record update syntax:
carotte.QueueConfig(
..carotte.default_queue("task_queue"),
durable: True, // Survive broker restart
exclusive: True, // Only one consumer allowed
auto_delete: True, // Delete when last consumer disconnects
)
|> carotte.declare_queue(channel)
Publishing Messages
Publish messages with various options. The payload is a BitArray, which allows sending any binary data:
import gleam/bit_array
import gleam/time/duration
// For text/JSON, convert string to BitArray
let json_payload = bit_array.from_string(json.to_string(user_data))
carotte.publish(
channel: ch,
exchange: "notifications",
routing_key: "user.signup",
payload: json_payload,
options: [
carotte.Persistent(True),
carotte.ContentType("application/json"),
carotte.MessageHeaders(
carotte.headers_from_list([
#("user_id", carotte.StringHeader("123")),
#("retry_count", carotte.IntHeader(0)),
])
),
carotte.Expiration(duration.seconds(60)), // Message expires in 60 seconds
]
)
Supervised Consumers
Carotte integrates with gleam_otp for proper OTP supervision of consumers. The recommended approach is to add the consumer supervisor to your application’s supervision tree using consumer_supervised:
import gleam/erlang/process
import gleam/otp/static_supervisor
pub fn start_app() {
// Create a name for the consumer supervisor at program startup
let consumers_name = process.new_name("consumers")
// Create the child specification
let consumer_spec = carotte.consumer_supervised(consumers_name)
// Add to your application's supervision tree
let assert Ok(_) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(consumer_spec)
|> static_supervisor.start()
// Later, get the consumer reference to subscribe
let consumer = carotte.named_consumer(consumers_name)
// Subscribe to queues (consumers are supervised) - returns consumer_tag
let assert Ok(consumer_tag) =
carotte.subscribe(
consumer,
channel: ch,
queue: "work_queue",
callback: fn(payload, deliver) {
// payload.payload is BitArray - convert to string for text messages
let assert Ok(text) = bit_array.to_string(payload.payload)
io.println("Processing: " <> text)
io.println("Exchange: " <> deliver.exchange)
io.println("Routing key: " <> deliver.routing_key)
// If callback crashes, consumer will be restarted by supervisor
}
)
}
Standalone mode (for simpler use cases without a supervision tree):
// Start supervisor directly (linked to calling process)
let consumers = process.new_name("consumers")
let assert Ok(consumer) = carotte.start_consumer(consumers)
let assert Ok(consumer_tag) = carotte.subscribe(consumer, channel: ch, queue: "my_queue", callback: handler)
Manual Acknowledgment
For more control over message acknowledgment:
let assert Ok(consumer_tag) =
carotte.subscribe_with_options(
consumer,
channel: ch,
queue: "work_queue",
callback: fn(msg, deliver) {
// Process the message
case process_message(msg) {
Ok(_) -> {
// Acknowledge on success
let assert Ok(_) = carotte.ack_single(ch, deliver.delivery_tag)
}
Error(_) -> {
// Don't ack - message will be redelivered
}
}
Nil
},
options: [carotte.AutoAck(False)],
)
// Acknowledge multiple messages at once
let assert Ok(_) = carotte.ack(ch, deliver.delivery_tag, True)
Message Headers
Carotte supports reading and writing message headers. Headers can contain various types of values:
// Available header types
carotte.BoolHeader(True)
carotte.IntHeader(42)
carotte.FloatHeader(3.14)
carotte.StringHeader("hello")
carotte.ListHeader([carotte.IntHeader(1), carotte.IntHeader(2)])
Sending headers:
carotte.publish(
channel: ch,
exchange: "my_exchange",
routing_key: "my_key",
payload: <<"Hello!">>,
options: [
carotte.MessageHeaders(
carotte.headers_from_list([
#("user_id", carotte.StringHeader("123")),
#("priority", carotte.IntHeader(1)),
])
),
],
)
Reading headers from received messages:
carotte.subscribe(
consumer,
channel: ch,
queue: "my_queue",
callback: fn(payload, _deliver) {
// Convert headers to a list of name-value pairs
let headers = carotte.headers_to_list(payload.headers)
// Find a specific header
let user_id = list.find(headers, fn(h) { h.0 == "user_id" })
case user_id {
Ok(#(_, carotte.StringHeader(id))) -> io.println("User: " <> id)
_ -> io.println("No user_id header found")
}
},
)
Error Handling
Carotte provides operation-specific error types for precise error handling. Each operation category has its own error type, making it easy to handle errors appropriately.
Error Types
| Error Type | Used By | Variants |
|---|---|---|
ConnectionError | start, close, reconnect | ConnectionBlocked, ConnectionClosed, ConnectionAuthFailure, ConnectionRefused, ConnectionTimeout, NotConnected, AlreadyConnected, ReconnectionFailed, ConnectionUnknownError |
ChannelError | open_channel | ChannelClosed, ChannelProcessNotFound, ChannelConnectionClosed, ChannelUnknownError |
ExchangeError | declare_exchange, delete_exchange, bind_exchange, unbind_exchange | ExchangeNotFound, ExchangeAccessRefused, ExchangePreconditionFailed, ExchangeChannelClosed, ExchangeUnknownError |
QueueError | declare_queue, delete_queue, bind_queue, unbind_queue, purge_queue, queue_status | QueueNotFound, QueueAccessRefused, QueuePreconditionFailed, QueueResourceLocked, QueueChannelClosed, QueueUnknownError |
PublishError | publish | PublishNoRoute, PublishChannelClosed, PublishUnknownError |
ConsumeError | subscribe, unsubscribe, ack | ConsumeInitTimeout, ConsumeInitFailed, ConsumeProcessNotFound, ConsumeChannelClosed, ConsumeUnknownError |
Handling Errors
// Connection errors
case carotte.start(client_config) {
Ok(client) -> process_messages(client)
Error(carotte.ConnectionAuthFailure(msg)) -> {
io.println("Authentication failed: " <> msg)
}
Error(carotte.ConnectionTimeout(msg)) -> {
io.println("Connection timeout: " <> msg)
}
Error(other) -> {
io.println("Connection error: " <> carotte.describe_connection_error(other))
}
}
// Queue errors
case carotte.declare_queue(my_queue, channel) {
Ok(queue) -> use_queue(queue)
Error(carotte.QueueAccessRefused(msg)) -> {
io.println("Access refused: " <> msg)
}
Error(carotte.QueuePreconditionFailed(msg)) -> {
io.println("Queue configuration mismatch: " <> msg)
}
Error(other) -> {
io.println("Queue error: " <> carotte.describe_queue_error(other))
}
}
// Publish errors
case carotte.publish(channel:, exchange:, routing_key:, payload:, options: [carotte.Mandatory(True)]) {
Ok(_) -> io.println("Message published")
Error(carotte.PublishNoRoute(msg)) -> {
io.println("No route for message: " <> msg)
}
Error(other) -> {
io.println("Publish error: " <> carotte.describe_publish_error(other))
}
}
Error Description Functions
Each error type has a corresponding describe_*_error function that converts the error to a human-readable string:
carotte.describe_connection_error(err) // ConnectionError -> String
carotte.describe_channel_error(err) // ChannelError -> String
carotte.describe_exchange_error(err) // ExchangeError -> String
carotte.describe_queue_error(err) // QueueError -> String
carotte.describe_publish_error(err) // PublishError -> String
carotte.describe_consume_error(err) // ConsumeError -> String
Advanced Features
Asynchronous Operations
Most operations have async variants for non-blocking execution:
// Async queue declaration
carotte.declare_queue_async(my_queue, channel)
// Async exchange deletion
carotte.delete_exchange_async(channel:, exchange: "old_exchange", if_unused: True)
// Async queue binding
carotte.bind_queue_async(
channel:,
queue: "my_queue",
exchange: "my_exchange",
routing_key: "key"
)
Queue Management
Perform administrative operations on queues:
// Get queue status
let assert Ok(status) = carotte.queue_status(channel:, queue: "my_queue")
io.println("Messages: " <> int.to_string(status.message_count))
io.println("Consumers: " <> int.to_string(status.consumer_count))
// Purge all messages from a queue
let assert Ok(message_count) = carotte.purge_queue(channel:, queue: "my_queue")
// Delete a queue
let assert Ok(_) = carotte.delete_queue(
channel:,
queue: "my_queue",
if_unused: True, // Only delete if no consumers
if_empty: True // Only delete if empty
)
Quality of Service (QoS)
Control message prefetch for load balancing across consumers:
// Limit to 10 unacknowledged messages per consumer
let assert Ok(_) = carotte.set_qos(ch, 10, False)
// This ensures messages are evenly distributed across multiple consumers
// Without QoS, one fast consumer might get all messages
QoS is essential for production deployments with multiple consumers. It prevents any single consumer from being overwhelmed while others sit idle.
Pulling Messages (Alternative to Subscribing)
For polling scenarios or one-off message retrieval, use get_message:
case carotte.get_message(ch, queue: "my_queue", auto_ack: True) {
Ok(Some(#(payload, deliver))) -> {
let assert Ok(text) = bit_array.to_string(payload.payload)
io.println("Got: " <> text)
}
Ok(None) -> io.println("Queue is empty")
Error(e) -> io.println("Error: " <> carotte.describe_consume_error(e))
}
Note: For continuous consumption, subscribe() is more efficient. Use get_message() for:
- Polling queues at intervals
- One-off message retrieval
- Testing and debugging
Transactions
Ensure atomic publishing of message batches:
// Enable transaction mode on channel
let assert Ok(_) = carotte.tx_select(ch)
// Publish multiple messages
let assert Ok(_) = carotte.publish(channel: ch, exchange: "orders", routing_key: "new", payload: <<"order 1">>, options: [])
let assert Ok(_) = carotte.publish(channel: ch, exchange: "orders", routing_key: "new", payload: <<"order 2">>, options: [])
let assert Ok(_) = carotte.publish(channel: ch, exchange: "orders", routing_key: "new", payload: <<"order 3">>, options: [])
// Commit - all messages delivered atomically
let assert Ok(_) = carotte.tx_commit(ch)
// Or rollback to discard all messages
// let assert Ok(_) = carotte.tx_rollback(ch)
Transactions ensure all-or-nothing delivery. Either all messages are delivered, or none are.
Channel Management
Close channels when done to free resources:
// Open a channel
let assert Ok(ch) = carotte.open_channel(client)
// Use the channel...
// Close it when done
let assert Ok(_) = carotte.close_channel(ch)
// Connection remains open for other channels
RabbitMQ best practice: Use one connection with multiple channels rather than multiple connections.
Exchange Bindings
Create complex routing topologies:
// Bind exchange to exchange
carotte.bind_exchange(
channel:,
source: "raw_logs",
destination: "processed_logs",
routing_key: "*.error"
)
// Unbind when no longer needed
carotte.unbind_exchange(
channel:,
source: "raw_logs",
destination: "processed_logs",
routing_key: "*.error"
)
Development
# Run tests (requires local RabbitMQ on localhost:5672)
gleam test
# Build documentation
gleam docs build
# Format code
gleam format
Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
- Built on top of the robust amqp_client Erlang library
- Inspired by RabbitMQ clients in other languages
- Thanks to the Gleam community for their support and feedback