Python streaming system for building pipelines with REST APIs, scheduled tasks, and message queues (RabbitMQ, ZeroMQ, Kafka).
ANK lets you compose processors into chains and pipelines. Each processor is a link in the chain; connect them to form data flows.
Features:
- Pipeline and streaming processing
- REST API interface (Flask)
- Scheduled tasks (cron-style)
- Message queues: RabbitMQ, ZeroMQ, Kafka, Redis
| Use Case | Description |
|---|---|
| ETL pipelines | Extract, transform, load data between systems (DB → queue → API) |
| Event processing | Consume events from Kafka/RabbitMQ, process, and forward |
| API backends | REST endpoints that trigger pipelines or queue operations |
| Scheduled jobs | Cron-style tasks (reports, cleanup, sync) |
| Batch processing | Join multiple messages, process in batches, split results |
| Microservice glue | Connect services via queues without heavy orchestration |
| Data transformation | Chain processors for validation, enrichment, formatting |
| Project | Focus | When to prefer |
|---|---|---|
| Celery | Distributed task queue, async workers | Complex async tasks, retries, rate limiting |
| Luigi | Data pipelines, dependency graphs | DAGs, file-based workflows, Hadoop/Spark |
| Apache Airflow | Workflow orchestration | Complex DAGs, scheduling, monitoring UI |
| Prefect | Modern workflow | Cloud-native, observability, dynamic flows |
| Kafka Streams | Stream processing (JVM) | High-throughput, exactly-once, stateful streams |
| ANK | Lightweight pipelines, queues, REST | Simple chains, minimal setup, YAML config |
- Python 3.10+
From PyPI:
pip install ankFrom Docker (no local Python required):
# Build from source
git clone git@github.com:sunary/ank.git
cd ank
docker build -t ank .
# Or pull from Docker Hub (when published)
# docker pull sunary/ankFrom source:
git clone git@github.com:sunary/ank.git
cd ank
pip install -e .With ank installed (pip/Docker):
ank create TestService
# or with app type:
ank create TestService -c APIApp
cd TestServiceWith Docker only (no local install):
docker run --rm -v $(pwd):/workspace -w /workspace ank create TestService
cd TestServiceApp types: BaseApp (pipeline, default), APIApp (REST API), ScheduleApp (cron-based).
Edit processor.py:
from ank.components.pipe_app import PipeApp
class ExampleApp(PipeApp):
def start(self):
for i in range(100):
self.chain_process({'content': i})
def process(self, message=None):
return message['content'] + 1Edit services.yml to wire processors and dependencies:
services:
StartApp:
class: processor.StartApp
arguments: [$Redis, '%batch_size%']
Redis:
class: redis.Redis
arguments: ['%redis_host%', '%redis_port%']
LogApp:
class: ank.components.log_app.LogApp
arguments: ~
chains:
- StartApp
- LogAppEdit settings.yml with your parameters:
parameters:
redis_host: localhost
redis_port: 6379
batch_size: 100ank run -fs settings.ymlOr programmatically:
from ank.program_loader import main
main(file_setting='settings.yml')| Pattern | Description |
|---|---|
| 1–1 | Single processor → single processor |
| 1–n | One processor → multiple branching processors |
| n–1 | Multiple processors → merge into one |
| n–n | Multiple processors → multiple processors |
| Join | Combine messages: [msg1, msg2, msg3] |
| Split | Split message into multiple messages |
For branching processors, return a message with a flags key to control routing:
def process(self, message=None):
return {
'content': result,
'flags': [True, False] # Process 1st branch only
}[True, True]→ process both branches[True, False]→ process 1st branch only[False, True]→ process 2nd branch only[False, False]→ stop chain
| App | Description |
|---|---|
| PipeApp | Base pipeline processor |
| APIApp | REST API interface (Flask) |
| ScheduleApp | Scheduled tasks (cron format) |
| Component | Description |
|---|---|
| LogApp | Log every message |
| JoinApp | Join multiple messages into one |
| SplitApp | Split message into multiple |
| *Consumer | Consume from queue (Redis, Kafka, RabbitMQ, ZMQ) |
| *Producer | Produce to queue |
| Command | Description |
|---|---|
ank create <name> [-c BaseApp|APIApp|ScheduleApp] |
Create new service |
ank gen_setting -fs _setting.yml |
Generate settings template |
ank gen_processor -fs settings.yml |
Generate processor from settings |
ank run -fs settings.yml |
Run service |
ank run -d |
Run as daemon (background) |
ank test |
Run tests |
ank build |
Build Docker image |
services:
ServiceName:
class: module.ClassName
arguments: [$OtherService, '%param%'] # ~ for no args
chains:
- Service1
- Service2
- [BranchA, BranchB] # branching$Service— inject another service%param%— value fromsettings.yml
Parameters referenced in services.yml are read from settings.yml. Use ank gen_setting to create a template.
See the examples/ directory:
streaming_app/— pipeline with Join/Splitapi_app/— REST APIschedule_app/— scheduled tasks
Use ank without installing Python locally:
# Create new app (output in current directory)
docker run --rm -v $(pwd):/workspace -w /workspace ank create MyApp
# Run a service
docker run --rm -v $(pwd):/workspace -w /workspace ank run -fs settings.yml
# Generate settings
docker run --rm -v $(pwd):/workspace -w /workspace ank gen_setting -fs _setting.ymlFrom your project directory (after ank create):
ank build
docker run --entrypoint /bin/sh <image_id>See LICENSE.