Apache Kafka integration
Apache Kafka is a distributed streaming platform that enables you to build real-time data pipelines and streaming applications. The Aspire Apache Kafka integration enables you to connect to existing Kafka instances or create new instances from Aspire with the confluentinc/confluent-local container image.
Hosting integration
Section titled “Hosting integration”The Apache Kafka hosting integration models a Kafka resource as the KafkaServerResource type. To access this type and APIs, add the 📦 Aspire.Hosting.Kafka NuGet package in your AppHost project:
aspire add kafkaThe Aspire CLI is interactive, be sure to select the appropriate search result when prompted:
Select an integration to add:
> kafka (Aspire.Hosting.Kafka)> Other results listed as selectable options...#:package Aspire.Hosting.Kafka@*<PackageReference Include="Aspire.Hosting.Kafka" Version="*" />Add Kafka server resource
Section titled “Add Kafka server resource”In your AppHost project, call AddKafka on the builder instance to add a Kafka server resource:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>() .WithReference(kafka);
// After adding all resources, run the app...When Aspire adds a container image to the AppHost, it creates a new Kafka server instance on your local machine.
Add Kafka UI
Section titled “Add Kafka UI”To add the Kafka UI to the Kafka server resource, call the WithKafkaUI method:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka") .WithKafkaUI();
builder.AddProject<Projects.ExampleProject>() .WithReference(kafka);The Kafka UI is a free, open-source web UI to monitor and manage Apache Kafka clusters.
Change the Kafka UI host port
Section titled “Change the Kafka UI host port”To change the Kafka UI host port, chain a call to the WithHostPort method:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka") .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>() .WithReference(kafka);Add Kafka server resource with data volume
Section titled “Add Kafka server resource with data volume”To add a data volume to the Kafka server resource, call the WithDataVolume method:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka") .WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>() .WithReference(kafka);The data volume is used to persist the Kafka server data outside the lifecycle of its container.
Work with larger Kafka clusters
Section titled “Work with larger Kafka clusters”The Aspire Kafka integration deploys a container from the confluentinc/confluent-local image to your local container host. This image provides a simple Apache Kafka cluster that runs in KRaft mode and requires no further configuration. It’s ideal for developing and testing producers and consumers. However, this image is for local experimentation only and isn’t supported by Confluent.
In the following AppHost code, a local container is used in run mode. At other times, a connection string provides URLs and port numbers for the Kafka brokers:
var kafka = builder.ExecutionContext.IsRunMode ? builder.AddKafka("kafka").WithKafkaUI() : builder.AddConnectionString("kafka");Connection properties
Section titled “Connection properties”When you reference a Kafka resource using WithReference, the following connection properties are made available to the consuming project:
Kafka server
Section titled “Kafka server”The Kafka server resource exposes the following connection properties:
| Property Name | Description |
|---|---|
Host | The host-facing Kafka listener hostname or IP address |
Port | The host-facing Kafka listener port |
Example properties:
Host: localhostPort: 9092Hosting integration health checks
Section titled “Hosting integration health checks”The Kafka hosting integration automatically adds a health check for the Kafka server resource. The health check verifies that a Kafka producer with the specified connection name is able to connect and persist a topic to the Kafka server.
Client integration
Section titled “Client integration”To get started with the Aspire Apache Kafka client integration, install the 📦 Aspire.Confluent.Kafka NuGet package:
dotnet add package Aspire.Confluent.Kafka#:package Aspire.Confluent.Kafka@*<PackageReference Include="Aspire.Confluent.Kafka" Version="*" />Add Kafka producer
Section titled “Add Kafka producer”In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container:
builder.AddKafkaProducer<string, string>("kafka");You can then retrieve the IProducer<TKey, TValue> instance using dependency injection:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService{ // Use producer...}Add Kafka consumer
Section titled “Add Kafka consumer”To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method:
builder.AddKafkaConsumer<string, string>("kafka");You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService{ // Use consumer...}Add keyed Kafka producers or consumers
Section titled “Add keyed Kafka producers or consumers”There might be situations where you want to register multiple producer or consumer instances with different connection names:
AddKeyedKafkaProducer: Registers a keyed Kafka producer.AddKeyedKafkaConsumer: Registers a keyed Kafka consumer.
Configuration
Section titled “Configuration”The Apache Kafka integration provides multiple options to configure the connection.
Use a connection string
Section titled “Use a connection string”When using a connection string from the ConnectionStrings configuration section, provide the name when calling builder.AddKafkaProducer() or builder.AddKafkaConsumer():
builder.AddKafkaProducer<string, string>("kafka");Example configuration:
{ "ConnectionStrings": { "kafka": "broker:9092" }}Use configuration providers
Section titled “Use configuration providers”The Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys. Example appsettings.json:
{ "Aspire": { "Confluent": { "Kafka": { "Producer": { "DisableHealthChecks": false, "Config": { "Acks": "All" } } } } }}The Config properties bind to instances of ProducerConfig and ConsumerConfig.
Use named configuration
Section titled “Use named configuration”The Apache Kafka integration supports named configuration for multiple instances:
{ "Aspire": { "Confluent": { "Kafka": { "Producer": { "kafka1": { "DisableHealthChecks": false, "Config": { "Acks": "All" } }, "kafka2": { "DisableHealthChecks": true, "Config": { "Acks": "Leader" } } } } } }}Use the connection names when calling the registration methods:
builder.AddKafkaProducer<string, string>("kafka1");builder.AddKafkaConsumer<string, string>("kafka2");Use inline delegates
Section titled “Use inline delegates”You can pass the Action<KafkaProducerSettings> delegate to set up options inline:
builder.AddKafkaProducer<string, string>( "kafka", static settings => settings.DisableHealthChecks = true);To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>>:
builder.AddKafkaProducer<string, MyMessage>( "kafka", static producerBuilder => { var messageSerializer = new MyMessageSerializer(); producerBuilder.SetValueSerializer(messageSerializer); });Client integration health checks
Section titled “Client integration health checks”By default, Aspire integrations enable health checks for all services. The Apache Kafka integration handles the following health check scenarios:
- Adds the
Aspire.Confluent.Kafka.Producerhealth check whenDisableHealthChecksisfalse. - Adds the
Aspire.Confluent.Kafka.Consumerhealth check whenDisableHealthChecksisfalse. - Integrates with the
/healthHTTP endpoint.
Observability and telemetry
Section titled “Observability and telemetry”Logging
Section titled “Logging”The Apache Kafka integration uses the following log categories:
Aspire.Confluent.Kafka
Tracing
Section titled “Tracing”The Apache Kafka integration doesn’t emit distributed traces.
Metrics
Section titled “Metrics”The Apache Kafka integration emits the following metrics using OpenTelemetry:
Aspire.Confluent.Kafkamessaging.kafka.network.txmessaging.kafka.network.transmittedmessaging.kafka.network.rxmessaging.kafka.network.receivedmessaging.publish.messagesmessaging.kafka.message.transmittedmessaging.receive.messagesmessaging.kafka.message.received