A next-generation AI application that combines Confluent Kafka for real-time data streaming with Google Cloud Vertex AI for advanced machine learning predictions. ConfluentBot demonstrates how to process continuous data streams with AI models to generate intelligent predictions, detect anomalies, and unlock real-world business value from data in motion.
This solution addresses the Confluent Challenge by leveraging:
- Confluent Kafka to ingest real-time data streams from multiple sources
- Google Cloud Vertex AI to apply advanced ML models for instant predictions
- Reactive Stream Processing to enrich data and generate insights
- Interactive Bot Interface for natural language access to streaming intelligence
????????????????????????????????????????????
? Bot Framework User Interface ?
? (Chat, Teams, Slack, Web) ?
????????????????????????????????????????????
?
????????????????????????????????????????????
? Stream Analytics Dialog & REST APIs ?
? Messages ? Health ? Predictions ? Metrics
????????????????????????????????????????????
?
????????????????????????????????????????????
? Stream Processing Pipeline (Real-Time) ?
? ??????????????? ??????????????????? ?
? ? Feature ?? ? Enrichment & ? ?
? ? Extraction ? ? Transformation ? ?
? ??????????????? ??????????????????? ?
? ? ?
? ???????????????????? ?
? ? Google Vertex ? ?
? ? AI Prediction ? ?
? ???????????????????? ?
? ? ?
? ???????????????????? ?
? ? Telemetry & ? ?
? ? Metrics ? ?
? ???????????????????? ?
???????????????????????????????????????????
?
???????????????????????????
? ?
???????????????????? ?????????????????????
? Confluent Kafka ? ? Google Cloud ?
? Real-Time Topics? ? Vertex AI ?
? (Transactions, ? ? Endpoints ?
? Events, ? ? (Model APIs) ?
? Metrics, ? ?????????????????????
? Anomalies) ?
????????????????????
- Multi-Topic Consumption: Simultaneously consume from transactions, events, metrics, and anomaly topics
- Event-Driven Architecture: Reactive subscription model for asynchronous processing
- Efficient Buffering: Configurable message buffers with statistics and health metrics
- Partition Tracking: Track offset, partition, and timestamp metadata
- Online Predictions: Real-time single-instance queries with <50ms latency
- Batch Predictions: High-throughput processing of multiple instances
- Automatic Conversion: Type conversion for numeric, categorical, and text features
- Confidence Extraction: Interpretable confidence scores for model outputs
- Feature Extraction: Parse JSON payloads and extract relevant fields
- Enrichment: Add contextual metadata (topic, partition, offset, processing delay)
- Normalization: Scale numeric features for optimal model performance
- End-to-End Monitoring: Track metrics from ingestion to prediction
- Throughput Metrics: Messages processed per second and trending
- Latency Analysis: p50, p75, p90, p95, p99 percentile calculations
- Accuracy Tracking: Prediction success rates and confidence distribution
- Health Status: HEALTHY, DEGRADED, SLOW, CRITICAL status indicators
- Historical Analysis: Time-windowed metric aggregation
- Natural Language Queries: Ask about stream data in plain English
- Topic Selection: Choose from transactions, events, metrics, or anomalies
- Analysis Types: Latest messages, health status, predictions, or anomalies
- Formatted Results: Rich, readable output with emoji indicators
GET /api/streamanalytics/messages/{topic}?count=10
GET /api/streamanalytics/health/{topic}
POST /api/streamanalytics/predict
GET /api/streamanalytics/system-health
GET /api/streamanalytics/metrics/{topic}?durationSeconds=300
GET /api/streamanalytics/pipeline-metrics
- .NET 6.0 SDK or later
- Confluent Kafka cluster (or Confluent Cloud)
- Google Cloud project with Vertex AI API enabled
- GCP Service Account with appropriate permissions
# Clone the repository
git clone https://github.com/Raiff1982/ConfluentBot.git
cd ConfluentBot
# Restore NuGet packages
dotnet restore
# Build the project
dotnet build
# Configure appsettings.json
# Update Kafka bootstrap servers and Vertex AI credentialsappsettings.json:
{
"Kafka": {
"BootstrapServers": "kafka.example.com:9092",
"GroupId": "ConfluentBot-Consumer-Group",
"Topics": ["transactions", "events", "metrics", "anomalies"],
"BufferSize": 100
},
"VertexAI": {
"ProjectId": "your-gcp-project-id",
"Location": "us-central1",
"EndpointId": "your-deployed-model-endpoint-id"
}
}Environment Variables (for Docker/K8s):
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json
export Kafka__BootstrapServers=kafka:9092
export VertexAI__ProjectId=your-project-id# Option 1: Direct execution
dotnet run
# Option 2: Using Visual Studio
# Open ConfluentBot.csproj and press F5
# Option 3: Docker
docker build -t confluentbot:latest .
docker run -p 3978:3978 confluentbot:latestThe bot will be available at http://localhost:3978
Test with Bot Framework Emulator:
- Open Bot Framework Emulator
- Connect to
http://localhost:3978/api/messages
Input Stream: Credit card transactions (100K+ events/sec)
Model: XGBoost fraud classifier (99.2% accuracy)
Output: Real-time fraud scores (0-1) and decision
Action: Block transaction or flag for review
Latency: <25ms per prediction
Business Impact: Prevent $5M+ in annual fraud
Input Stream: IoT sensor data from shipments (temp, humidity, location)
Model: Delivery delay predictor + anomaly detector
Output: ETA adjustments and risk scores
Action: Proactive customer notification
Latency: <100ms per event
Business Impact: Improve on-time delivery by 15%
Input Stream: Equipment metrics (temperature, vibration, power draw)
Model: Component failure classifier
Output: Time-to-failure estimates
Action: Schedule maintenance before failure
Latency: <75ms per prediction
Business Impact: Reduce unplanned downtime by 40%
Input Stream: Clickstream and user behavior events
Model: Churn probability + next action predictor
Output: Intervention recommendations
Action: Personalized offers or support outreach
Latency: <60ms per prediction
Business Impact: Reduce churn by 20%
Input Stream: Network traffic + security events
Model: Anomaly detector for intrusions/DDoS
Output: Threat classification and severity
Action: Automated blocking rules
Latency: <40ms per prediction
Business Impact: Detect threats in real-time
| Metric | Typical Value | Tuning |
|---|---|---|
| Throughput | 5,000-50,000 msg/sec | Increase replicas, batch size |
| Latency p50 | 15-30ms | Reduce model complexity |
| Latency p99 | 50-100ms | Optimize feature extraction |
| Prediction Accuracy | 88-99% | Retrain model with new data |
| System Availability | 99.9%+ | Multi-region deployment |
| Error Rate | <0.1% | Better error handling |
curl http://localhost:3978/api/streamanalytics/messages/transactions?count=5Response:
{
"success": true,
"result": [
{
"amount": 1500,
"category": "online_purchase",
"timestamp": 1705315200000,
"card_type": "credit"
}
],
"metadata": {
"topic": "transactions",
"message_count": 1
},
"completedAt": "2024-01-15T10:30:00Z"
}curl -X POST http://localhost:3978/api/streamanalytics/predict \
-H "Content-Type: application/json" \
-d '{
"topic": "transactions",
"analysisType": "prediction",
"parameters": {"batch_size": 5}
}'Response:
{
"success": true,
"result": [
{
"itemId": "msg-abc123",
"status": "SUCCESS",
"prediction": {
"fraudulent": 0.92,
"legitimate": 0.08
},
"processingDurationMs": 23.5
}
],
"metadata": {
"topic": "transactions",
"predictions_count": 1
}
}curl http://localhost:3978/api/streamanalytics/system-healthResponse:
{
"success": true,
"result": {
"uptimeSeconds": 3600,
"totalMessagesProcessed": 500000,
"totalPredictionsMade": 125000,
"totalErrors": 45,
"topicsCount": 4,
"averageLatencyMs": 34.2,
"averageConfidence": 0.887,
"overallErrorRate": 0.009,
"status": "HEALTHY",
"checkedAt": "2024-01-15T10:30:00Z"
}
}{
"Kafka": {
"BootstrapServers": "kafka.example.com:9093",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "Plain",
"SaslUsername": "${KAFKA_USERNAME}",
"SaslPassword": "${KAFKA_PASSWORD}"
}
}- Create Service Account with minimal required roles
- Enable Workload Identity for Kubernetes
- Rotate Credentials regularly
- Use VPC Service Controls for data residency
- Implement authentication/authorization (OAuth2, API Keys)
- Rate limit sensitive endpoints
- Enable CORS for trusted domains only
- Log all prediction requests
- Monitor for anomalous usage
# Build
dotnet build
# Run any tests (when available)
dotnet test
# Code coverage
dotnet-coverage collect -f cobertura -o coverage.cobertura.xml dotnet testFROM mcr.microsoft.com/dotnet/aspnet:6.0-alpine AS runtime
WORKDIR /app
COPY bin/Release/net6.0/publish .
ENV ASPNETCORE_URLS=http://+:3978
EXPOSE 3978
ENTRYPOINT ["dotnet", "ConfluentBot.dll"]apiVersion: apps/v1
kind: Deployment
metadata:
name: confluentbot
spec:
replicas: 3
template:
spec:
containers:
- name: confluentbot
image: confluentbot:latest
ports:
- containerPort: 3978
env:
- name: Kafka__BootstrapServers
valueFrom:
configMapKeyRef:
name: kafka-config
key: bootstrap-servers- STREAMING_ANALYTICS_GUIDE.md - Comprehensive architecture guide
- Confluent Kafka Documentation
- Google Cloud Vertex AI
- Bot Framework
- Time-series forecasting (ARIMA, Prophet)
- Advanced anomaly detection (Isolation Forest, Autoencoders)
- Model versioning and A/B testing
- Automated retraining pipelines
- Real-time dashboards (Grafana)
- Multi-model ensemble predictions
- Feature store integration
- Stream-SQL capabilities (KSQL)
MIT License - see LICENSE file for details
- Confluent for Kafka platform
- Google Cloud for Vertex AI
- Microsoft for Bot Framework
- Open source community
For questions or issues:
- Check Issues
- Review STREAMING_ANALYTICS_GUIDE.md
- Create a new issue with details
Transforming Data in Motion into Real-Time Intelligence ?
Built with ?? for the Confluent Challenge