Carotte 🥕

Package Version Hex Docs

A type-safe RabbitMQ client for Gleam that provides a clean, idiomatic interface for message queue operations on the Erlang VM.

Features

Installation

gleam add carotte

Quick Start

import carotte
import gleam/erlang/process
import gleam/io

pub fn main() {
  // Connect to RabbitMQ
  let assert Ok(client) =
    carotte.ClientConfig(
      ..carotte.default_client(),
      host: "localhost",
      port: 5672,
    )
    |> carotte.start()

  // Open a channel
  let assert Ok(ch) = carotte.open_channel(client)

  // Declare an exchange
  let assert Ok(_) =
    carotte.Exchange(..carotte.exchange("my_exchange"), exchange_type: carotte.Direct)
    |> carotte.declare_exchange(ch)

  // Declare a durable queue
  let assert Ok(_) =
    carotte.QueueConfig(..carotte.default_queue("my_queue"), durable: True)
    |> carotte.declare_queue(ch)

  // Bind queue to exchange
  let assert Ok(_) =
    carotte.bind_queue(
      channel: ch,
      queue: "my_queue",
      exchange: "my_exchange",
      routing_key: "my_routing_key",
    )

  // Publish a message (payload is BitArray)
  let assert Ok(_) =
    carotte.publish(
      channel: ch,
      exchange: "my_exchange",
      routing_key: "my_routing_key",
      payload: <<"Hello, RabbitMQ!">>,
      options: [],
    )

  // Start a consumer supervisor
  let consumers = process.new_name("consumers")
  let assert Ok(consumer) = carotte.start_consumer(consumers)

  // Subscribe to messages (supervised) - returns consumer_tag string
  let assert Ok(consumer_tag) =
    carotte.subscribe(
      consumer,
      channel: ch,
      queue: "my_queue",
      callback: fn(msg, _deliver) {
        // msg.payload is BitArray - convert to string if needed
        let assert Ok(text) = bit_array.to_string(msg.payload)
        io.println("Received: " <> text)
        // Messages are auto-acknowledged by default
      },
    )

  // Clean up
  let assert Ok(_) = carotte.unsubscribe(channel: ch, consumer_tag:)
  let assert Ok(_) = carotte.close(client)
}

Core Concepts

Connection Management

Create and configure a RabbitMQ connection:

import gleam/time/duration

let assert Ok(client) =
  carotte.ClientConfig(
    ..carotte.default_client(),
    username: "admin",
    password: "secret",
    host: "rabbitmq.example.com",
    virtual_host: "/production",
    heartbeat: duration.seconds(30),
    connection_timeout: duration.seconds(60),
  )
  |> carotte.start()

// Check connection status
assert carotte.is_connected(client) == True

// Reconnect if needed
case carotte.is_connected(client) {
  True -> client
  False -> {
    let assert Ok(new_client) = carotte.reconnect(client)
    new_client
  }
}

Exchanges

Carotte supports all RabbitMQ exchange types:

// Create a durable topic exchange
carotte.Exchange(
  ..carotte.exchange("logs"),
  exchange_type: carotte.Topic,
  durable: True,
)
|> carotte.declare_exchange(channel)

// Available exchange types:
// - Direct: Route based on exact routing key match
// - Topic: Route based on routing key patterns
// - Fanout: Route to all bound queues
// - Headers: Route based on message headers

Queues

Declare and configure queues using record update syntax:

carotte.QueueConfig(
  ..carotte.default_queue("task_queue"),
  durable: True,       // Survive broker restart
  exclusive: True,     // Only one consumer allowed
  auto_delete: True,   // Delete when last consumer disconnects
)
|> carotte.declare_queue(channel)

Publishing Messages

Publish messages with various options. The payload is a BitArray, which allows sending any binary data:

import gleam/bit_array
import gleam/time/duration

// For text/JSON, convert string to BitArray
let json_payload = bit_array.from_string(json.to_string(user_data))

carotte.publish(
  channel: ch,
  exchange: "notifications",
  routing_key: "user.signup",
  payload: json_payload,
  options: [
    carotte.Persistent(True),
    carotte.ContentType("application/json"),
    carotte.MessageHeaders(
      carotte.headers_from_list([
        #("user_id", carotte.StringHeader("123")),
        #("retry_count", carotte.IntHeader(0)),
      ])
    ),
    carotte.Expiration(duration.seconds(60)), // Message expires in 60 seconds
  ]
)

Supervised Consumers

Carotte integrates with gleam_otp for proper OTP supervision of consumers. The recommended approach is to add the consumer supervisor to your application’s supervision tree using consumer_supervised:

import gleam/erlang/process
import gleam/otp/static_supervisor

pub fn start_app() {
  // Create a name for the consumer supervisor at program startup
  let consumers_name = process.new_name("consumers")

  // Create the child specification
  let consumer_spec = carotte.consumer_supervised(consumers_name)

  // Add to your application's supervision tree
  let assert Ok(_) =
    static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(consumer_spec)
    |> static_supervisor.start()

  // Later, get the consumer reference to subscribe
  let consumer = carotte.named_consumer(consumers_name)

  // Subscribe to queues (consumers are supervised) - returns consumer_tag
  let assert Ok(consumer_tag) =
    carotte.subscribe(
      consumer,
      channel: ch,
      queue: "work_queue",
      callback: fn(payload, deliver) {
        // payload.payload is BitArray - convert to string for text messages
        let assert Ok(text) = bit_array.to_string(payload.payload)
        io.println("Processing: " <> text)
        io.println("Exchange: " <> deliver.exchange)
        io.println("Routing key: " <> deliver.routing_key)
        // If callback crashes, consumer will be restarted by supervisor
      }
    )
}

Standalone mode (for simpler use cases without a supervision tree):

// Start supervisor directly (linked to calling process)
let consumers = process.new_name("consumers")
let assert Ok(consumer) = carotte.start_consumer(consumers)

let assert Ok(consumer_tag) = carotte.subscribe(consumer, channel: ch, queue: "my_queue", callback: handler)

Manual Acknowledgment

For more control over message acknowledgment:

let assert Ok(consumer_tag) =
  carotte.subscribe_with_options(
    consumer,
    channel: ch,
    queue: "work_queue",
    callback: fn(msg, deliver) {
      // Process the message
      case process_message(msg) {
        Ok(_) -> {
          // Acknowledge on success
          let assert Ok(_) = carotte.ack_single(ch, deliver.delivery_tag)
        }
        Error(_) -> {
          // Don't ack - message will be redelivered
        }
      }
      Nil
    },
    options: [carotte.AutoAck(False)],
  )

// Acknowledge multiple messages at once
let assert Ok(_) = carotte.ack(ch, deliver.delivery_tag, True)

Message Headers

Carotte supports reading and writing message headers. Headers can contain various types of values:

// Available header types
carotte.BoolHeader(True)
carotte.IntHeader(42)
carotte.FloatHeader(3.14)
carotte.StringHeader("hello")
carotte.ListHeader([carotte.IntHeader(1), carotte.IntHeader(2)])

Sending headers:

carotte.publish(
  channel: ch,
  exchange: "my_exchange",
  routing_key: "my_key",
  payload: <<"Hello!">>,
  options: [
    carotte.MessageHeaders(
      carotte.headers_from_list([
        #("user_id", carotte.StringHeader("123")),
        #("priority", carotte.IntHeader(1)),
      ])
    ),
  ],
)

Reading headers from received messages:

carotte.subscribe(
  consumer,
  channel: ch,
  queue: "my_queue",
  callback: fn(payload, _deliver) {
    // Convert headers to a list of name-value pairs
    let headers = carotte.headers_to_list(payload.headers)

    // Find a specific header
    let user_id = list.find(headers, fn(h) { h.0 == "user_id" })

    case user_id {
      Ok(#(_, carotte.StringHeader(id))) -> io.println("User: " <> id)
      _ -> io.println("No user_id header found")
    }
  },
)

Error Handling

Carotte provides operation-specific error types for precise error handling. Each operation category has its own error type, making it easy to handle errors appropriately.

Error Types

Error TypeUsed ByVariants
ConnectionErrorstart, close, reconnectConnectionBlocked, ConnectionClosed, ConnectionAuthFailure, ConnectionRefused, ConnectionTimeout, NotConnected, AlreadyConnected, ReconnectionFailed, ConnectionUnknownError
ChannelErroropen_channelChannelClosed, ChannelProcessNotFound, ChannelConnectionClosed, ChannelUnknownError
ExchangeErrordeclare_exchange, delete_exchange, bind_exchange, unbind_exchangeExchangeNotFound, ExchangeAccessRefused, ExchangePreconditionFailed, ExchangeChannelClosed, ExchangeUnknownError
QueueErrordeclare_queue, delete_queue, bind_queue, unbind_queue, purge_queue, queue_statusQueueNotFound, QueueAccessRefused, QueuePreconditionFailed, QueueResourceLocked, QueueChannelClosed, QueueUnknownError
PublishErrorpublishPublishNoRoute, PublishChannelClosed, PublishUnknownError
ConsumeErrorsubscribe, unsubscribe, ackConsumeInitTimeout, ConsumeInitFailed, ConsumeProcessNotFound, ConsumeChannelClosed, ConsumeUnknownError

Handling Errors

// Connection errors
case carotte.start(client_config) {
  Ok(client) -> process_messages(client)
  Error(carotte.ConnectionAuthFailure(msg)) -> {
    io.println("Authentication failed: " <> msg)
  }
  Error(carotte.ConnectionTimeout(msg)) -> {
    io.println("Connection timeout: " <> msg)
  }
  Error(other) -> {
    io.println("Connection error: " <> carotte.describe_connection_error(other))
  }
}

// Queue errors
case carotte.declare_queue(my_queue, channel) {
  Ok(queue) -> use_queue(queue)
  Error(carotte.QueueAccessRefused(msg)) -> {
    io.println("Access refused: " <> msg)
  }
  Error(carotte.QueuePreconditionFailed(msg)) -> {
    io.println("Queue configuration mismatch: " <> msg)
  }
  Error(other) -> {
    io.println("Queue error: " <> carotte.describe_queue_error(other))
  }
}

// Publish errors
case carotte.publish(channel:, exchange:, routing_key:, payload:, options: [carotte.Mandatory(True)]) {
  Ok(_) -> io.println("Message published")
  Error(carotte.PublishNoRoute(msg)) -> {
    io.println("No route for message: " <> msg)
  }
  Error(other) -> {
    io.println("Publish error: " <> carotte.describe_publish_error(other))
  }
}

Error Description Functions

Each error type has a corresponding describe_*_error function that converts the error to a human-readable string:

carotte.describe_connection_error(err)  // ConnectionError -> String
carotte.describe_channel_error(err)     // ChannelError -> String
carotte.describe_exchange_error(err)    // ExchangeError -> String
carotte.describe_queue_error(err)       // QueueError -> String
carotte.describe_publish_error(err)     // PublishError -> String
carotte.describe_consume_error(err)     // ConsumeError -> String

Advanced Features

Asynchronous Operations

Most operations have async variants for non-blocking execution:

// Async queue declaration
carotte.declare_queue_async(my_queue, channel)

// Async exchange deletion
carotte.delete_exchange_async(channel:, exchange: "old_exchange", if_unused: True)

// Async queue binding
carotte.bind_queue_async(
  channel:,
  queue: "my_queue",
  exchange: "my_exchange",
  routing_key: "key"
)

Queue Management

Perform administrative operations on queues:

// Get queue status
let assert Ok(status) = carotte.queue_status(channel:, queue: "my_queue")
io.println("Messages: " <> int.to_string(status.message_count))
io.println("Consumers: " <> int.to_string(status.consumer_count))

// Purge all messages from a queue
let assert Ok(message_count) = carotte.purge_queue(channel:, queue: "my_queue")

// Delete a queue
let assert Ok(_) = carotte.delete_queue(
  channel:,
  queue: "my_queue",
  if_unused: True,  // Only delete if no consumers
  if_empty: True    // Only delete if empty
)

Quality of Service (QoS)

Control message prefetch for load balancing across consumers:

// Limit to 10 unacknowledged messages per consumer
let assert Ok(_) = carotte.set_qos(ch, 10, False)

// This ensures messages are evenly distributed across multiple consumers
// Without QoS, one fast consumer might get all messages

QoS is essential for production deployments with multiple consumers. It prevents any single consumer from being overwhelmed while others sit idle.

Pulling Messages (Alternative to Subscribing)

For polling scenarios or one-off message retrieval, use get_message:

case carotte.get_message(ch, queue: "my_queue", auto_ack: True) {
  Ok(Some(#(payload, deliver))) -> {
    let assert Ok(text) = bit_array.to_string(payload.payload)
    io.println("Got: " <> text)
  }
  Ok(None) -> io.println("Queue is empty")
  Error(e) -> io.println("Error: " <> carotte.describe_consume_error(e))
}

Note: For continuous consumption, subscribe() is more efficient. Use get_message() for:

Transactions

Ensure atomic publishing of message batches:

// Enable transaction mode on channel
let assert Ok(_) = carotte.tx_select(ch)

// Publish multiple messages
let assert Ok(_) = carotte.publish(channel: ch, exchange: "orders", routing_key: "new", payload: <<"order 1">>, options: [])
let assert Ok(_) = carotte.publish(channel: ch, exchange: "orders", routing_key: "new", payload: <<"order 2">>, options: [])
let assert Ok(_) = carotte.publish(channel: ch, exchange: "orders", routing_key: "new", payload: <<"order 3">>, options: [])

// Commit - all messages delivered atomically
let assert Ok(_) = carotte.tx_commit(ch)

// Or rollback to discard all messages
// let assert Ok(_) = carotte.tx_rollback(ch)

Transactions ensure all-or-nothing delivery. Either all messages are delivered, or none are.

Channel Management

Close channels when done to free resources:

// Open a channel
let assert Ok(ch) = carotte.open_channel(client)

// Use the channel...

// Close it when done
let assert Ok(_) = carotte.close_channel(ch)

// Connection remains open for other channels

RabbitMQ best practice: Use one connection with multiple channels rather than multiple connections.

Exchange Bindings

Create complex routing topologies:

// Bind exchange to exchange
carotte.bind_exchange(
  channel:,
  source: "raw_logs",
  destination: "processed_logs",
  routing_key: "*.error"
)

// Unbind when no longer needed
carotte.unbind_exchange(
  channel:,
  source: "raw_logs",
  destination: "processed_logs",
  routing_key: "*.error"
)

Development

# Run tests (requires local RabbitMQ on localhost:5672)
gleam test

# Build documentation
gleam docs build

# Format code
gleam format

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

Support

Search Document