Skip to content

ritesxh/finlake

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

FinLake — Real-Time Financial Lakehouse Platform

A production-grade financial data Lakehouse built on Apache Kafka, Apache Spark, and Apache Iceberg — implementing a Bronze → Silver → Gold medallion architecture with bi-temporal data modelling, automated schema evolution, and a data quality reconciliation engine.

CI Python License: MIT


Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                        DATA SOURCES                                  │
│  Binance WebSocket (real-time)   yfinance (historical backfill)     │
└──────────────┬──────────────────────────────┬───────────────────────┘
               │                              │
               ▼                              ▼
┌──────────────────────────┐    ┌─────────────────────────────────────┐
│  Kafka + Schema Registry │    │       Spark Batch Backfill          │
│  Topic: market.ticks     │    │       (Avro → Bronze Iceberg)       │
│  Schema: Avro (v1/v2)    │    └─────────────────────────────────────┘
└──────────────┬───────────┘
               │ Spark Structured Streaming
               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    BRONZE LAYER  (raw ingest)                        │
│   Iceberg table: finlake.bronze.market_ticks                        │
│   Partitioned by: date, symbol  |  Format: Parquet                  │
│   Bi-temporal: valid_from/to (event time) + sys_from/to (load time) │
└──────────────┬──────────────────────────────────────────────────────┘
               │ Spark Batch (hourly)
               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    SILVER LAYER  (cleansed, normalised)              │
│   OHLCV aggregates  |  Schema-validated  |  Deduped                 │
│   Natural key: (symbol, date)  |  Surrogate key: tick_id (UUID)     │
└──────────────┬──────────────────────────────────────────────────────┘
               │ Spark Batch (daily)
               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    GOLD LAYER  (curated data products)               │
│   30/90-day returns  |  Volatility  |  Rolling Sharpe ratio         │
│   Ready for: analytics, reporting, AI/ML use cases                  │
└──────────────┬──────────────────────────────────────────────────────┘
               │
               ▼
┌──────────────────────────┐    ┌────────────────────────────┐
│  Snowflake (BI queries)  │    │  Reconciliation Engine     │
│  External Iceberg tables │    │  MISSING/DUPLICATE/VALUE/  │
│  + Dashboards            │    │  SCHEMA_DRIFT detection    │
└──────────────────────────┘    └────────────────────────────┘

Key Features

Feature Detail
Real-time ingestion Binance WebSocket → Kafka → Spark Structured Streaming
Bi-temporal model valid_from/to (business time) + sys_from/to (system time) for full auditability
Schema evolution Avro + Confluent Schema Registry; backward/forward compatible
Medallion architecture Bronze (raw) → Silver (cleansed) → Gold (curated) on Apache Iceberg
Data quality Reconciliation engine classifies: MISSING, DUPLICATE, VALUE_DRIFT, SCHEMA_DRIFT
Partitioning Iceberg hidden partitioning by date + symbol for query performance
Surrogate keys UUIDs in Silver/Gold; natural keys preserved from source
CI/CD GitHub Actions: lint → unit tests → integration tests → Docker build

Technology Stack

Layer Technology
Streaming ingest Apache Kafka, Confluent Schema Registry, Avro
Batch processing Apache Spark 3.5 (PySpark)
Table format Apache Iceberg
Object storage MinIO (local) / AWS S3 (prod)
Data formats Avro (Kafka), Parquet (Iceberg)
Data warehouse Snowflake (external Iceberg tables)
Orchestration Apache Airflow
Containerisation Docker, Docker Compose
CI/CD GitHub Actions
Testing pytest, pytest-spark

Quick Start (Local)

Prerequisites

  • Docker + Docker Compose
  • Python 3.11+

1. Start the local stack

git clone https://github.com/ritesxh/finlake.git
cd finlake
docker compose -f infra/docker/docker-compose.yml up -d

This starts: Kafka, Zookeeper, Schema Registry, MinIO, Spark.

2. Install Python dependencies

pip install -e ".[dev]"

3. Run historical backfill (yfinance → Bronze)

python src/ingestion/backfill.py --symbols AAPL,MSFT,GOOGL --days 90

4. Start real-time Kafka producer (Binance WebSocket)

python src/ingestion/kafka_producer.py --symbols btcusdt,ethusdt

5. Start Spark Structured Streaming job (Bronze layer)

python src/processing/bronze_stream.py

6. Run Silver and Gold batch jobs

python src/processing/silver_batch.py
python src/processing/gold_batch.py

7. Run reconciliation checks

python src/quality/reconciliation.py --layer silver --date 2025-01-15

Project Structure

finlake/
├── infra/
│   ├── docker/
│   │   └── docker-compose.yml      # Kafka, Spark, MinIO, Schema Registry
│   ├── kafka/
│   │   └── schemas/                # Avro schemas (v1, v2)
│   └── spark/
│       └── spark-defaults.conf
├── src/
│   ├── ingestion/
│   │   ├── kafka_producer.py       # Binance WebSocket → Kafka
│   │   └── backfill.py             # yfinance → Bronze (batch)
│   ├── processing/
│   │   ├── bronze_stream.py        # Spark Streaming: Kafka → Iceberg Bronze
│   │   ├── silver_batch.py         # Spark Batch: Bronze → Silver
│   │   └── gold_batch.py           # Spark Batch: Silver → Gold
│   ├── quality/
│   │   ├── reconciliation.py       # Break detection engine
│   │   └── validators.py           # Completeness / accuracy / consistency
│   └── models/
│       └── temporal.py             # Bi-temporal model helpers
├── tests/
│   ├── unit/
│   └── integration/
├── .github/
│   └── workflows/
│       └── ci.yml
├── pyproject.toml
└── README.md

Data Model

Bi-temporal Schema (Silver layer)

-- Silver: market_ohlcv
CREATE TABLE finlake.silver.market_ohlcv (
    tick_id        STRING,      -- surrogate key (UUID)
    symbol         STRING,      -- natural key
    trade_date     DATE,        -- natural key
    open           DOUBLE,
    high           DOUBLE,
    low            DOUBLE,
    close          DOUBLE,
    volume         BIGINT,
    -- Business time (when the event was valid in the real world)
    valid_from     TIMESTAMP,
    valid_to       TIMESTAMP,   -- 9999-12-31 for current records
    -- System time (when the record was loaded into the system)
    sys_from       TIMESTAMP,
    sys_to         TIMESTAMP,   -- 9999-12-31 for current records
    is_current     BOOLEAN
)
USING iceberg
PARTITIONED BY (days(trade_date), symbol);

Reconciliation Engine

The ReconciliationEngine compares Bronze ↔ Silver ↔ Gold counts and values, classifying breaks as:

Break Type Description
MISSING Record present in Bronze but absent in Silver/Gold
DUPLICATE Same natural key appears more than once
VALUE_DRIFT Aggregated value differs by >0.01% between layers
SCHEMA_DRIFT Column added/removed/type-changed vs. registered schema

Roadmap

  • Phase 1: Local infra (Kafka + MinIO + Spark)
  • Phase 2: Bronze streaming ingestion
  • Phase 3: Silver batch with bi-temporal model
  • Phase 4: Gold curated data products
  • Phase 5: Reconciliation engine
  • Phase 6: Snowflake external tables
  • Phase 7: Airflow DAGs
  • Phase 8: Monitoring + alerting

License

MIT

About

Real-time financial Lakehouse: Kafka → Spark → Apache Iceberg medallion architecture with bi-temporal data modelling

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages