Production-ready Pub/Sub library and standalone service for Go
Works both as a library for embedding in your application AND as a standalone microservice with REST API.
- 📨 Reliable Message Delivery - Guaranteed delivery with exponential backoff retry
- 🔄 Exponential Backoff - 30s → 1m → 2m → 4m → 8m → 16m → 30m (max)
- 💀 Dead Letter Queue (DLQ) - Automatic handling of failed messages after 5 attempts
- 📊 DLQ Statistics - Track failure reasons and resolution metrics
- 🎯 Domain-Driven Design - Rich domain models with business logic
- 🗄️ Repository Pattern - Clean data access abstraction
- 🔌 Pluggable - Bring your own Logger, Notification system
- ⚙️ Options Pattern - Modern Go API (2025 best practices)
- 🏗️ Clean Architecture - Services, Repositories, Models separation
- ✅ Battle-Tested - Production-proven in FreiCON Railway Management System
- 🐬 MySQL - Full support with Relica adapters
- 🐘 PostgreSQL - Full support with Relica adapters
- 🪶 SQLite - Full support with Relica adapters
- ⚡ Zero Dependencies - Relica query builder (no ORM bloat)
- 📚 As Library - Embed in your Go application
- 🐳 As Service - Standalone PubSub server with REST API
- ☸️ Docker Ready - Production Dockerfile + docker-compose
- 🌐 Cloud Native - 12-factor app, ENV config, health checks
go get github.com/coregx/pubsub@latest# Using Docker (recommended)
cd cmd/pubsub-server
docker-compose up -d
# Or build from source
go build ./cmd/pubsub-server# Windows
cd cmd/pubsub-server
start.bat
# Linux/Mac
cd cmd/pubsub-server
docker-compose up -dAccess API at http://localhost:8080
See Server Documentation for API endpoints.
package main
import (
"context"
"database/sql"
"time"
"github.com/coregx/pubsub"
"github.com/coregx/pubsub/adapters/relica"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// Connect to database
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/pubsub?parseTime=true")
// Create repositories (production-ready Relica adapters!)
repos := relica.NewRepositories(db, "mysql")
// Create services
publisher, _ := pubsub.NewPublisher(
pubsub.WithPublisherRepositories(
repos.Message, repos.Queue, repos.Subscription, repos.Topic,
),
pubsub.WithPublisherLogger(logger),
)
// Publish message
result, _ := publisher.Publish(context.Background(), pubsub.PublishRequest{
TopicCode: "user.signup",
Identifier: "user-123",
Data: `{"userId": 123, "email": "user@example.com"}`,
})
// Create worker for background processing
worker, _ := pubsub.NewQueueWorker(
pubsub.WithRepositories(repos.Queue, repos.Message, repos.Subscription, repos.DLQ),
pubsub.WithDelivery(transmitterProvider, gateway),
pubsub.WithLogger(logger),
)
// Run worker (processes queue every 30 seconds)
worker.Run(context.Background(), 30*time.Second)
}import "github.com/coregx/pubsub/migrations"
// Apply all migrations
if err := migrations.ApplyAll(db); err != nil {
log.Fatal(err)
}# MySQL
mysql -u user -p database < migrations/mysql/001_core_tables.sql
mysql -u user -p database < migrations/mysql/002_retry_fields.sql
mysql -u user -p database < migrations/mysql/003_dead_letter_queue.sql
# PostgreSQL
psql -U user -d database -f migrations/postgres/001_core_tables.sql
...
# SQLite
sqlite3 pubsub.db < migrations/sqlite/001_core_tables.sql
...┌─────────────────────────────────────┐
│ Your Application │
│ (or REST API for standalone) │
└─────────────┬───────────────────────┘
│
┌─────────────▼───────────────────────┐
│ Services Layer │
│ - Publisher │
│ - SubscriptionManager │
│ - QueueWorker │
└─────────────┬───────────────────────┘
│
┌─────────────▼───────────────────────┐
│ Relica Adapters │
│ (Production-Ready Implementations) │
│ - Zero dependencies │
│ - MySQL / PostgreSQL / SQLite │
└─────────────┬───────────────────────┘
│
┌─────────────▼───────────────────────┐
│ Database │
└─────────────────────────────────────┘
When running as standalone service, PubSub-Go exposes these endpoints:
POST /api/v1/publish
Content-Type: application/json
{
"topicCode": "user.signup",
"identifier": "optional-dedup-key",
"data": {
"userId": 123,
"email": "user@example.com"
}
}POST /api/v1/subscribe
{
"subscriberId": 1,
"topicCode": "user.signup",
"identifier": "webhook-receiver-1"
}GET /api/v1/subscriptions?subscriberId=1DELETE /api/v1/subscriptions/123GET /api/v1/healthSee API Documentation for full details.
// Options Pattern (2025 best practice)
worker, err := pubsub.NewQueueWorker(
pubsub.WithRepositories(queueRepo, msgRepo, subRepo, dlqRepo),
pubsub.WithDelivery(transmitterProvider, gateway),
pubsub.WithLogger(logger),
pubsub.WithBatchSize(100), // optional
pubsub.WithRetryStrategy(customStrategy), // optional
pubsub.WithNotifications(notifService), // optional
)# Server
SERVER_HOST=0.0.0.0
SERVER_PORT=8080
# Database
DB_DRIVER=mysql
DB_HOST=localhost
DB_PORT=3306
DB_USER=pubsub
DB_PASSWORD=your_password
DB_NAME=pubsub
DB_PREFIX=pubsub_
# Worker
PUBSUB_BATCH_SIZE=100
PUBSUB_WORKER_INTERVAL=30
PUBSUB_ENABLE_NOTIFICATIONS=trueSee .env.example for all options.
1. PUBLISH
Publisher → Topic → Create Message
→ Find Active Subscriptions
→ Create Queue Items (one per subscription)
2. WORKER (Background)
QueueWorker → Find Pending/Retryable Items (batch)
→ Deliver to Subscribers (via webhooks/gateway)
→ On Success: Mark as SENT
→ On Failure: Retry with exponential backoff
→ After 5 failures: Move to DLQ
3. DLQ (Dead Letter Queue)
Failed items → Manual review
→ Resolve or Delete
Attempt 1: Immediate
Attempt 2: +30 seconds
Attempt 3: +1 minute
Attempt 4: +2 minutes
Attempt 5: +4 minutes
Attempt 6: +8 minutes (moves to DLQ after this)
# Run all tests
go test ./...
# With coverage
go test ./... -cover
# Model tests (95.9% coverage)
go test ./model/... -cover
# Integration tests (requires database)
go test ./adapters/relica/... -covercd cmd/pubsub-server
docker-compose up -d# Build image
docker build -t pubsub-server:0.1.0 -f cmd/pubsub-server/Dockerfile .
# Run with environment
docker run -d \
-p 8080:8080 \
-e DB_DRIVER=mysql \
-e DB_HOST=mysql \
-e DB_PASSWORD=secret \
pubsub-server:0.1.0- Basic Example - Simple QueueWorker setup with Relica
- Server Example - Full standalone service
- Core PubSub functionality
- Relica adapters (MySQL/PostgreSQL/SQLite)
- Publisher + SubscriptionManager services
- Standalone REST API server
- Docker support
- Health checks
- Delivery providers (HTTP webhooks, gRPC)
- Message encryption
- Rate limiting
- Metrics (Prometheus)
- Admin UI
- OpenAPI/Swagger docs
- Authentication/Authorization
- Multi-tenancy
- Message replay
- Full test coverage (>90%)
This is an alpha release. Contributions welcome!
- Fork the repository
- Create feature branch (
git checkout -b feature/amazing) - Commit changes (
git commit -m 'feat: add amazing feature') - Push to branch (
git push origin feature/amazing) - Open Pull Request
MIT License - see LICENSE file for details.
- Relica - Type-safe query builder (github.com/coregx/relica)
- FreiCON - Original production testing ground
- CoreGX Ecosystem - Part of CoreGX microservices suite
- 🐛 Issues: GitHub Issues
- 📖 Documentation: Wiki
- 💬 Discussions: GitHub Discussions
This is a pre-release version (v0.1.0 development). The library is production-ready and battle-tested in FreiCON Railway Management System with 95.9% test coverage and zero linter issues. APIs may evolve before v1.0.0 LTS release.
📦 Dependencies
This library uses Relica for type-safe database operations. All dependencies are properly published and available through Go modules.
Made with ❤️ by CoreGX Team