Skip to content

Apache Kafka integration

Apache Kafka logo

Apache Kafka is a distributed streaming platform that enables you to build real-time data pipelines and streaming applications. The Aspire Apache Kafka integration enables you to connect to existing Kafka instances or create new instances from Aspire with the confluentinc/confluent-local container image.

The Apache Kafka hosting integration models a Kafka resource as the KafkaServerResource type. To access this type and APIs, add the 📦 Aspire.Hosting.Kafka NuGet package in your AppHost project:

Aspire CLI — Add Aspire.Hosting.Kafka package
aspire add kafka

The Aspire CLI is interactive, be sure to select the appropriate search result when prompted:

Aspire CLI — Example output prompt
Select an integration to add:
> kafka (Aspire.Hosting.Kafka)
> Other results listed as selectable options...

In your AppHost project, call AddKafka on the builder instance to add a Kafka server resource:

C# — AppHost.cs
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...

When Aspire adds a container image to the AppHost, it creates a new Kafka server instance on your local machine.

To add the Kafka UI to the Kafka server resource, call the WithKafkaUI method:

C# — AppHost.cs
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);

The Kafka UI is a free, open-source web UI to monitor and manage Apache Kafka clusters.

To change the Kafka UI host port, chain a call to the WithHostPort method:

C# — AppHost.cs
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);

Add Kafka server resource with data volume

Section titled “Add Kafka server resource with data volume”

To add a data volume to the Kafka server resource, call the WithDataVolume method:

C# — AppHost.cs
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);

The data volume is used to persist the Kafka server data outside the lifecycle of its container.

The Aspire Kafka integration deploys a container from the confluentinc/confluent-local image to your local container host. This image provides a simple Apache Kafka cluster that runs in KRaft mode and requires no further configuration. It’s ideal for developing and testing producers and consumers. However, this image is for local experimentation only and isn’t supported by Confluent.

In the following AppHost code, a local container is used in run mode. At other times, a connection string provides URLs and port numbers for the Kafka brokers:

var kafka = builder.ExecutionContext.IsRunMode
? builder.AddKafka("kafka").WithKafkaUI()
: builder.AddConnectionString("kafka");

When you reference a Kafka resource using WithReference, the following connection properties are made available to the consuming project:

The Kafka server resource exposes the following connection properties:

Property NameDescription
HostThe host-facing Kafka listener hostname or IP address
PortThe host-facing Kafka listener port

Example properties:

Host: localhost
Port: 9092

The Kafka hosting integration automatically adds a health check for the Kafka server resource. The health check verifies that a Kafka producer with the specified connection name is able to connect and persist a topic to the Kafka server.

To get started with the Aspire Apache Kafka client integration, install the 📦 Aspire.Confluent.Kafka NuGet package:

.NET CLI — Add Aspire.Confluent.Kafka package
dotnet add package Aspire.Confluent.Kafka

In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container:

builder.AddKafkaProducer<string, string>("kafka");

You can then retrieve the IProducer<TKey, TValue> instance using dependency injection:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}

To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method:

builder.AddKafkaConsumer<string, string>("kafka");

You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}

There might be situations where you want to register multiple producer or consumer instances with different connection names:

  • AddKeyedKafkaProducer: Registers a keyed Kafka producer.
  • AddKeyedKafkaConsumer: Registers a keyed Kafka consumer.

The Apache Kafka integration provides multiple options to configure the connection.

When using a connection string from the ConnectionStrings configuration section, provide the name when calling builder.AddKafkaProducer() or builder.AddKafkaConsumer():

builder.AddKafkaProducer<string, string>("kafka");

Example configuration:

{
"ConnectionStrings": {
"kafka": "broker:9092"
}
}

The Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys. Example appsettings.json:

{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}

The Config properties bind to instances of ProducerConfig and ConsumerConfig.

The Apache Kafka integration supports named configuration for multiple instances:

{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"kafka1": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
},
"kafka2": {
"DisableHealthChecks": true,
"Config": {
"Acks": "Leader"
}
}
}
}
}
}
}

Use the connection names when calling the registration methods:

builder.AddKafkaProducer<string, string>("kafka1");
builder.AddKafkaConsumer<string, string>("kafka2");

You can pass the Action<KafkaProducerSettings> delegate to set up options inline:

builder.AddKafkaProducer<string, string>(
"kafka",
static settings => settings.DisableHealthChecks = true);

To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>>:

builder.AddKafkaProducer<string, MyMessage>(
"kafka",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
});

By default, Aspire integrations enable health checks for all services. The Apache Kafka integration handles the following health check scenarios:

  • Adds the Aspire.Confluent.Kafka.Producer health check when DisableHealthChecks is false.
  • Adds the Aspire.Confluent.Kafka.Consumer health check when DisableHealthChecks is false.
  • Integrates with the /health HTTP endpoint.

The Apache Kafka integration uses the following log categories:

  • Aspire.Confluent.Kafka

The Apache Kafka integration doesn’t emit distributed traces.

The Apache Kafka integration emits the following metrics using OpenTelemetry:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received
FAQCollaborateCommunityDiscussWatch