Skip to content

Ashes47/Reddit-search

Repository files navigation

Reddit-search

Worker Pipeline Overview

The process of finding leads for a campaign involves a series of Celery workers passing data through different queues. Here's the typical flow and filtering logic:

  1. API Trigger (/campaigns/{id}/run)

    • Input: Campaign ID.
    • Action: Kicks off the process by enqueuing the first task.
    • Calls: crawler.crawl_reddit_for_campaign (on crawl queue).
  2. Crawler (crawler.py::crawl_reddit_for_campaign)

    • Queue: crawl
    • Input: Campaign ID.
    • Action: Fetches keywords, searches Reddit (asyncpraw) for submissions and comments.
    • Filtering (Initial Metadata Gate): For each found document:
      • Deduplication: Skip if reddit_id seen before for this campaign (Redis SISMEMBER with REDDIT_DEDUP_TTL_SECONDS TTL).
      • Post Age: Skip if older than REDDIT_MAX_POST_AGE_MONTHS.
      • Upvote Ratio: Skip if below REDDIT_MIN_UPVOTE_RATIO.
      • NSFW: Skip if NSFW and REDDIT_SKIP_NSFW is true.
      • Subreddit Subscribers: Skip if subreddit has fewer than REDDIT_MIN_SUB_SUBSCRIBERS.
      • Subreddit Activity Ratio: Skip if subscribers / hot_posts_count < REDDIT_MIN_SUB_RATIO (if > 0).
      • Author Karma: Skip if author karma is below REDDIT_MIN_AUTHOR_KARMA.
      • (Rejection Logging: Filtered documents are logged to a CSV file specified by REJECTION_LOG_DIR)
    • Calls:
      • indexer.index_document (on index queue) - multiple times (for docs passing filters).
      • batcher.process_embedding_batches (on batch queue) - once (after all keywords searched for the campaign).
  3. Indexer (indexer.py::index_document)

    • Queue: index
    • Input: Document data (doc_data) that passed crawler filters.
    • Action: Upserts document data (reddit_id, title, text, score, etc.) into the Typesense collection for searching. Adds the document's reddit_id to a Redis list for the campaign, accumulating IDs for the batcher.
    • Calls: None.
  4. Batcher (batcher.py::process_embedding_batches)

    • Queue: batch
    • Input: Campaign ID.
    • Action: Reads all reddit_ids accumulated in the campaign's Redis list. Deduplicates this list to prevent processing the same document multiple times within subsequent stages. Fetches full document data from Typesense for these unique IDs. Groups documents into batches (e.g., size 128) and triggers the embedder for each batch.
    • Calls: embedder.generate_embeddings_for_batch (on embed queue) - multiple times (once per batch).
  5. Embedder (embedder.py::generate_embeddings_for_batch)

    • Queue: embed
    • Input: Batch of document data (doc_batch).
    • Action:
      • Calculates the centroid embedding for the campaign's keywords.
      • Performs a BM25 keyword search against Typesense (using campaign keywords, filtered by the reddit_ids in the current batch) to retrieve relevance scores (text_match).
      • Retrieves document embeddings (e.g., from OpenAI's text-embedding-3-small) for the batch.
      • Calculates cosine similarity between each document embedding and the keyword centroid.
      • Initial Lead Creation: Upserts basic lead information (campaign_id, reddit_id, embedding, reddit_url, author) into the leads PostgreSQL table. At this stage, extracted_data and relevance_score are NULL.
    • Filtering (Cosine Similarity Gate):
      • Cosine Similarity: Keep document only if cosine_similarity(doc_embedding, keyword_centroid) >= COSINE_THRESHOLD.
      • (Rejection Logging: Filtered documents are logged)
    • Calls: ranker.hybrid_rank (on rank queue) - multiple times (for docs passing cosine filter), passing doc_data, cosine_similarity, and bm25_score.
  6. Ranker (ranker.py::hybrid_rank)

    • Queue: rank
    • Input: Payload with doc_data, cosine_similarity, and bm25_score.
    • Action: Calculates a fused score (log-scaled BM25 + weighted cosine) to represent overall relevance.
    • Filtering (Heuristic Gate):
      • Engagement: Keep only if doc_data['score'] >= MIN_ENGAGEMENT_SCORE.
      • Keyword Hits: Keep only if number of unique campaign keywords in text >= MIN_KEYWORD_HITS OR cosine_similarity >= 0.30.
      • Fused Score: Keep only if fused_score >= MIN_FUSED_SCORE_THRESHOLD.
      • (Rejection Logging: Filtered documents are logged)
    • Calls: extractor.llm_extract (on llm queue) - multiple times (for docs passing all filters).
  7. Extractor (extractor.py::llm_extract)

    • Queue: llm
    • Input: Payload with doc_data and fused_score (used as relevance_score).
    • Action: Formats a prompt including the document text. Calls an LLM (e.g., GPT-4o mini) to extract structured lead information (e.g., pain point, contact info) based on the prompt. Parses the LLM's JSON response.
    • Calls: persist.save_lead (on persist queue) - multiple times.
  8. Persist (persist.py::save_lead)

    • Queue: persist
    • Input: Payload with original doc_data, LLM extracted_data, and relevance_score (the fused_score from the ranker).
    • Action:
      • Final Lead Update: Upserts the complete lead information into the leads PostgreSQL table using the reddit_id and campaign_id. This adds the extracted_data JSON and the final relevance_score.
      • Increments the leads_found counter for the campaign in Redis.
    • Calls: None.

This pipeline structure allows for parallel processing and separation of concerns across different stages.

Filter Parameters

Values are sourced primarily from src/core/settings.py unless noted otherwise.

Crawler (crawler.py)

  • REDDIT_MAX_POST_AGE_MONTHS: 18.0
  • REDDIT_MIN_UPVOTE_RATIO: 0.40 (was 0.75)
  • REDDIT_SKIP_NSFW: True
  • REDDIT_MIN_SUB_SUBSCRIBERS: 1000 (was 2000)
  • REDDIT_MIN_SUB_RATIO: 0.0 (disabled, was 5.0)
  • REDDIT_MIN_AUTHOR_KARMA: 15 (was 100)
  • REDDIT_DEDUP_TTL_SECONDS: 604800 (7 days)

Embedder (embedder.py)

  • COSINE_THRESHOLD: 0.30 (hardcoded)

Ranker (ranker.py)

  • MIN_ENGAGEMENT_SCORE: 1 (hardcoded, lowered for testing)
  • MIN_KEYWORD_HITS: 1 (hardcoded, lowered for testing)
  • MIN_FUSED_SCORE_THRESHOLD: 0.10 (hardcoded, lowered for testing)

Other Settings

  • REJECTION_LOG_DIR: "/app/rejection_logs" (from settings.py, requires Docker volume mapping)

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors