💡 Inspiration

Every Data Engineer knows the pain of "dirty data." Typos in user locations (e.g., "Nw York"), inconsistent product names ("Lptop" vs "Laptop"), and missing fields ruin downstream analytics and ML models. Traditionally, cleaning this data requires maintaining complex, brittle rule-based systems or manual intervention.

I was inspired by the potential of Generative AI to understand context: Can I replace hundreds of hard-coded cleaning rules with a single, intelligent AI prompt integrated directly into a real-time data stream?

⚙️ What it does

Stream Refinery is an intelligent ETL (Extract, Transform, Load) pipeline that turns chaotic raw data into high-quality analytical assets in milliseconds.

  1. Ingests a stream of simulated raw transactions containing intentional errors (typos, non-standard formats).
  2. Processes each message in real-time using a Python consumer.
  3. Cleans & Enriches the data by sending it to Google's Gemini 2.5 Flash model. The AI intelligently fixes typos (e.g., changing "San Fran" to "San Francisco"), standardizes capitalization, and adds an enrichment status tag.
  4. Delivers the validated, clean JSON back to a dedicated Confluent Cloud topic, ready for use by analytics dashboards or data warehouses.
  5. Visualizes the transformation via a live Streamlit Dashboard, showing the "Dirty" vs. "Clean" data side-by-side in real-time.

🏗️ How I built it (The Architecture)

I followed a decoupled microservices architecture powered by Confluent Cloud:

  • The "Dirty" Source: A Python script acts as a producer, generating mock financial transaction data with randomized errors and sending it to a raw-data Kafka topic in Confluent.
  • The AI Processor: A separate Python consumer subscribes to the raw-data topic. It acts as the bridge between the streaming world and the AI world. It utilizes the google-generativeai library to send asynchronous requests to the Gemini 2.5 Flash model.
  • Prompt Engineering: I crafted a strict Data Engineering prompt to ensure the AI preserves essential fields (IDs, timestamps) while only fixing specific target fields, preventing AI "hallucinations."
  • The Real-Time Dashboard: I built a frontend using Streamlit that consumes both the raw-data and clean-data topics simultaneously. It utilizes session state to display a scrolling history stack, allowing users to visually compare the input and output streams instantly.
  • The "Clean" Sink: Once the AI returns the cleaned JSON, the processor validates its structure and produces it to a final clean-data topic in Confluent.

🧠 Challenges I ran into

  • AI Hallucinations & Data Integrity: Initially, Gemini would sometimes fix typos but accidentally drop other crucial fields like the transaction_id. I had to refine the prompt significantly, adding strict rules like "PRESERVE all original fields" to ensure data integrity.
  • Rate Limiting in a Streaming Context: Streaming data moves fast, but AI APIs have rate limits. I would have hit the Google AI Free Tier limit (approx. 15 RPM) quickly. I solved this by implementing smart throttling in the consumer to pace the requests without breaking the pipeline.

🏅 Accomplishments that I'm proud of

I am proud of building a truly end-to-end working pipeline that solves a real-world business problem. Seeing a raw message come in with "Lptop, NY" on the dashboard and instantly transform into "Laptop, New York" in the clean column feels like magic. It successfully demonstrates the power of combining "Data in Motion" (Confluent) with modern Generative AI.

Built With

Share this project:

Updates