The process of finding leads for a campaign involves a series of Celery workers passing data through different queues. Here's the typical flow and filtering logic:
-
API Trigger (
/campaigns/{id}/run)- Input: Campaign ID.
- Action: Kicks off the process by enqueuing the first task.
- Calls:
crawler.crawl_reddit_for_campaign(oncrawlqueue).
-
Crawler (
crawler.py::crawl_reddit_for_campaign)- Queue:
crawl - Input: Campaign ID.
- Action: Fetches keywords, searches Reddit (
asyncpraw) for submissions and comments. - Filtering (Initial Metadata Gate): For each found document:
- Deduplication: Skip if
reddit_idseen before for this campaign (RedisSISMEMBERwithREDDIT_DEDUP_TTL_SECONDSTTL). - Post Age: Skip if older than
REDDIT_MAX_POST_AGE_MONTHS. - Upvote Ratio: Skip if below
REDDIT_MIN_UPVOTE_RATIO. - NSFW: Skip if NSFW and
REDDIT_SKIP_NSFWis true. - Subreddit Subscribers: Skip if subreddit has fewer than
REDDIT_MIN_SUB_SUBSCRIBERS. - Subreddit Activity Ratio: Skip if
subscribers / hot_posts_count<REDDIT_MIN_SUB_RATIO(if > 0). - Author Karma: Skip if author karma is below
REDDIT_MIN_AUTHOR_KARMA. - (Rejection Logging: Filtered documents are logged to a CSV file specified by
REJECTION_LOG_DIR)
- Deduplication: Skip if
- Calls:
indexer.index_document(onindexqueue) - multiple times (for docs passing filters).batcher.process_embedding_batches(onbatchqueue) - once (after all keywords searched for the campaign).
- Queue:
-
Indexer (
indexer.py::index_document)- Queue:
index - Input: Document data (
doc_data) that passed crawler filters. - Action: Upserts document data (
reddit_id,title,text,score, etc.) into the Typesense collection for searching. Adds the document'sreddit_idto a Redis list for the campaign, accumulating IDs for the batcher. - Calls: None.
- Queue:
-
Batcher (
batcher.py::process_embedding_batches)- Queue:
batch - Input: Campaign ID.
- Action: Reads all
reddit_ids accumulated in the campaign's Redis list. Deduplicates this list to prevent processing the same document multiple times within subsequent stages. Fetches full document data from Typesense for these unique IDs. Groups documents into batches (e.g., size 128) and triggers the embedder for each batch. - Calls:
embedder.generate_embeddings_for_batch(onembedqueue) - multiple times (once per batch).
- Queue:
-
Embedder (
embedder.py::generate_embeddings_for_batch)- Queue:
embed - Input: Batch of document data (
doc_batch). - Action:
- Calculates the centroid embedding for the campaign's keywords.
- Performs a BM25 keyword search against Typesense (using campaign keywords, filtered by the
reddit_ids in the current batch) to retrieve relevance scores (text_match). - Retrieves document embeddings (e.g., from OpenAI's
text-embedding-3-small) for the batch. - Calculates cosine similarity between each document embedding and the keyword centroid.
- Initial Lead Creation: Upserts basic lead information (
campaign_id,reddit_id,embedding,reddit_url,author) into theleadsPostgreSQL table. At this stage,extracted_dataandrelevance_scoreare NULL.
- Filtering (Cosine Similarity Gate):
- Cosine Similarity: Keep document only if
cosine_similarity(doc_embedding, keyword_centroid) >= COSINE_THRESHOLD. - (Rejection Logging: Filtered documents are logged)
- Cosine Similarity: Keep document only if
- Calls:
ranker.hybrid_rank(onrankqueue) - multiple times (for docs passing cosine filter), passingdoc_data,cosine_similarity, andbm25_score.
- Queue:
-
Ranker (
ranker.py::hybrid_rank)- Queue:
rank - Input: Payload with
doc_data,cosine_similarity, andbm25_score. - Action: Calculates a fused score (log-scaled BM25 + weighted cosine) to represent overall relevance.
- Filtering (Heuristic Gate):
- Engagement: Keep only if
doc_data['score'] >= MIN_ENGAGEMENT_SCORE. - Keyword Hits: Keep only if number of unique campaign keywords in text >=
MIN_KEYWORD_HITSORcosine_similarity >= 0.30. - Fused Score: Keep only if
fused_score >= MIN_FUSED_SCORE_THRESHOLD. - (Rejection Logging: Filtered documents are logged)
- Engagement: Keep only if
- Calls:
extractor.llm_extract(onllmqueue) - multiple times (for docs passing all filters).
- Queue:
-
Extractor (
extractor.py::llm_extract)- Queue:
llm - Input: Payload with
doc_dataandfused_score(used asrelevance_score). - Action: Formats a prompt including the document text. Calls an LLM (e.g., GPT-4o mini) to extract structured lead information (e.g., pain point, contact info) based on the prompt. Parses the LLM's JSON response.
- Calls:
persist.save_lead(onpersistqueue) - multiple times.
- Queue:
-
Persist (
persist.py::save_lead)- Queue:
persist - Input: Payload with original
doc_data, LLMextracted_data, andrelevance_score(thefused_scorefrom the ranker). - Action:
- Final Lead Update: Upserts the complete lead information into the
leadsPostgreSQL table using thereddit_idandcampaign_id. This adds theextracted_dataJSON and the finalrelevance_score. - Increments the
leads_foundcounter for the campaign in Redis.
- Final Lead Update: Upserts the complete lead information into the
- Calls: None.
- Queue:
This pipeline structure allows for parallel processing and separation of concerns across different stages.
Values are sourced primarily from src/core/settings.py unless noted otherwise.
Crawler (crawler.py)
REDDIT_MAX_POST_AGE_MONTHS: 18.0REDDIT_MIN_UPVOTE_RATIO: 0.40 (was 0.75)REDDIT_SKIP_NSFW: TrueREDDIT_MIN_SUB_SUBSCRIBERS: 1000 (was 2000)REDDIT_MIN_SUB_RATIO: 0.0 (disabled, was 5.0)REDDIT_MIN_AUTHOR_KARMA: 15 (was 100)REDDIT_DEDUP_TTL_SECONDS: 604800 (7 days)
Embedder (embedder.py)
COSINE_THRESHOLD: 0.30 (hardcoded)
Ranker (ranker.py)
MIN_ENGAGEMENT_SCORE: 1 (hardcoded, lowered for testing)MIN_KEYWORD_HITS: 1 (hardcoded, lowered for testing)MIN_FUSED_SCORE_THRESHOLD: 0.10 (hardcoded, lowered for testing)
Other Settings
REJECTION_LOG_DIR: "/app/rejection_logs" (from settings.py, requires Docker volume mapping)