Spring Batch 6 Guide Capstone: Marketplace Analytics Pipeline — ETL · KPI Aggregation · K8s CronJob

Spring Batch 6 Guide Capstone: Marketplace Analytics Pipeline — ETL · KPI Aggregation · K8s CronJob


Introduction

Through Part 6 we learned chunks, transactions, scheduling, parallelism, observability, and testing — each separately. The capstone gathers those pieces into one project.

What we build is a marketplace analytics pipeline. Every dawn it moves orders piled up in the operational DB into an analytics store (ETL), then aggregates daily and monthly KPIs on top (total revenue, order count, revenue by category, top-10 products) into tables a dashboard reads. Three jobs run in sequence with time gaps, all idempotent, observable, and run as single executions by K8s CronJob.

Parts 1–6 all meet inside this one pipeline — Part 2’s Reader/Writer, Part 3’s idempotency/restart, Part 4’s scheduling/data sources, Part 5’s partitioning, Part 6’s observability/deployment. Each section marks “this is Part N’s technique,” so when you get stuck, return to that part.

The target reader is a backend engineer who has read Parts 1–6 or has built and operated Spring Batch jobs.

  • Part 1 — Job · Step · Metadata Identity
  • Part 2 — Chunk-Oriented Processing — Reader · Processor · Writer
  • Part 3 — Transactions · Failure Handling — Skip · Retry · Restart
  • Part 4 — Job Launch · Scheduling · Operations
  • Part 5 — Performance · Parallelism — Multi-thread · Partitioning · Remote Workers
  • Part 6 — Observability · Testing · Deployment
  • Capstone — Marketplace Analytics Pipeline (this post)

TL;DR

  • The capstone gathers Parts 1–6 into one pipeline — chunks, idempotency, partitioning, observability, and CronJob meet in one project. Each section marks its source part.
  • Domain — operational schema → analytics schema — move orders from the operational marketplace schema to the analytics analytics schema. It’s the simplified variant of Part 4 §6’s D (analytics Warehouse) pattern — schemas split inside one PostgreSQL 16 instance instead of a separate engine.
  • Three jobs, a time-gap directed acyclic graphdaily-etl (01:00) → daily-kpi (02:00) → monthly-kpi (1st of month, 03:00). Three CronJobs with simple time-based dependency.
  • Two datasources split transaction boundaries per schemaoperationalDataSource (read-only) and analyticsDataSource (write). The same instance, split by currentSchema and role.
  • Technique roundup — extract-transform-load uses JDBC + partitioning (Part 5), key performance indicators use SQL aggregation (window functions, Part 6), all with idempotent upsert (Part 3), observability (Part 6), and single-execution CronJob (Part 4 §7).

1. What We Build — Architecture

The simplest structure for running analytics without touching operational traffic. Orders pile up in the operational schema; the batch moves them into the analytics schema and aggregates only there.

flowchart LR
    subgraph OP["marketplace schema (operational)"]
        ORD["order · order_item<br/>member · product · category"]
    end
    subgraph AN["analytics schema"]
        FACT["sales_fact<br/>(denormalized)"]
        KD["kpi_daily"]
        KM["kpi_monthly"]
    end
    ORD -->|"daily-etl 01:00<br/>(JDBC + partitioning)"| FACT
    FACT -->|"daily-kpi 02:00<br/>(SQL aggregation)"| KD
    KD -->|"monthly-kpi 1st of month 03:00"| KM
    KD --> DASH["dashboard / Grafana"]
    KM --> DASH

1.1 Why one instance + schema split

Among the five data-source patterns from Part 4 §6, this capstone is the simplified variant of D (analytics Warehouse). Real D uses a separate analytics engine (BigQuery, Redshift, …); here we split only schemas inside one PostgreSQL 16 instance.

  • Local runs are simple — spin up a single PostgreSQL with Docker Compose and a reader can run the whole pipeline in 30 minutes.
  • Schema split is a clear-enough boundary — separate marketplace and analytics by role/grant to split operational and analytics permissions.
  • The evolution path stays open — when operational-load isolation genuinely becomes necessary, just change the analyticsDataSource URL to a read replica or separate instance. The code stays (Part 4 §6’s A → C → D evolution).

2. Data Model

The operational schema reproduces the comprehensive-assignment domain in PostgreSQL. The analytics schema holds denormalized tables optimized for aggregation.

erDiagram
    MEMBER ||--o{ ORDERS : places
    ORDERS ||--|{ ORDER_ITEM : contains
    PRODUCT ||--o{ ORDER_ITEM : "ordered in"
    CATEGORY ||--o{ PRODUCT : classifies

    SALES_FACT {
        date sale_date
        long member_id
        long category_id
        long product_id
        int quantity
        long amount
    }
    KPI_DAILY {
        date sale_date
        long category_id
        long total_amount
        long order_count
    }
    KPI_MONTHLY {
        string year_month
        long category_id
        long total_amount
    }
  • Operational marketplacemember · product · category · orders · order_item. A normalized transactional model.
  • Analytics analytics.sales_fact — a denormalized fact that flattens one order into category, product, and amount. The ETL fills it.
  • Analytics analytics.kpi_daily · kpi_monthly — aggregation results. The dashboard reads these.

3. Two-Datasource Setup

To treat operational and analytics with different transaction boundaries, we keep two datasources. They point at the same instance but differ by currentSchema and role.

app:
  datasource:
    operational:    # operational schema — read-only account
      jdbc-url: jdbc:postgresql://localhost:5432/market?currentSchema=marketplace
      username: etl_reader
      password: ${ETL_READER_PW}
    analytics:      # analytics schema — write account
      jdbc-url: jdbc:postgresql://localhost:5432/market?currentSchema=analytics
      username: analytics_writer
      password: ${ANALYTICS_WRITER_PW}
import com.zaxxer.hikari.HikariDataSource
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.jdbc.DataSourceBuilder
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import javax.sql.DataSource

@Configuration
class DataSourceConfig {

    @Bean
    @Primary   // Spring Batch metadata (JobRepository) lives on the operational datasource
    @ConfigurationProperties("app.datasource.operational")
    fun operationalDataSource(): DataSource =
        DataSourceBuilder.create().type(HikariDataSource::class.java).build()

    @Bean
    @ConfigurationProperties("app.datasource.analytics")
    fun analyticsDataSource(): DataSource =
        DataSourceBuilder.create().type(HikariDataSource::class.java).build()
}

Note: the ETL Job reads from the operational datasource and writes to the analytics one. Since Reader and Writer use different datasources, a chunk’s commit boundary (Parts 2–3) is anchored on the write side — the analytics transaction manager. Metadata sits on @Primary (operational) for simplicity; if you need more separation, use a dedicated batch datasource.


4. Job A — daily-etl (Operational → Analytics)

Every day at 01:00, move yesterday’s orders from marketplace to analytics.sales_fact. To minimize operational load it uses JDBC Reader/Writer (Part 2 §4) bypassing JPA, processes large order volumes in parallel with partitioning (Part 5), and stays safe to re-run with idempotent upsert (Part 3 §5).

4.1 Partitioning + JDBC worker Step

Part 5’s ColumnRangePartitioner splits yesterday’s order ids into four ranges, and each worker reads only its range and upserts into sales_fact.

@Bean
fun etlMasterStep(
    jobRepository: JobRepository,
    etlWorkerStep: Step,
    batchTaskExecutor: ThreadPoolTaskExecutor,
): Step =
    StepBuilder("etlMasterStep", jobRepository)
        .partitioner(etlWorkerStep.name, ColumnRangePartitioner(/* yesterday's id range */))
        .step(etlWorkerStep)
        .gridSize(4)
        .taskExecutor(batchTaskExecutor)
        .build()

@Bean
fun etlWorkerStep(
    jobRepository: JobRepository,
    analyticsTxManager: PlatformTransactionManager,   // write = analytics transaction boundary
    orderRangeReader: JdbcPagingItemReader<OrderRow>, // @StepScope, operational datasource
    salesFactWriter: JdbcBatchItemWriter<SalesFactRow>, // analytics datasource, upsert
): Step =
    StepBuilder("etlWorkerStep", jobRepository)
        .chunk<OrderRow, SalesFactRow>(1000, analyticsTxManager)
        .reader(orderRangeReader)
        .processor(orderToSalesFactProcessor())  // order → denormalized fact (Part 2 §3)
        .writer(salesFactWriter)
        .faultTolerant()
        .skip(org.springframework.batch.item.validator.ValidationException::class.java)
        .skipLimit(100)                           // tolerate up to 100 dirty orders (Part 3 §2)
        .build()

4.2 The idempotent Writer

Put UNIQUE (sale_date, order_item_id) on sales_fact and write with ON CONFLICT DO UPDATE. Running the same date twice won’t duplicate rows (Part 3 §5.3).

@Bean
fun salesFactWriter(analyticsDataSource: DataSource): JdbcBatchItemWriter<SalesFactRow> =
    JdbcBatchItemWriterBuilder<SalesFactRow>()
        .dataSource(analyticsDataSource)
        .sql(
            """
            INSERT INTO sales_fact (sale_date, order_item_id, member_id, category_id, product_id, quantity, amount)
            VALUES (:saleDate, :orderItemId, :memberId, :categoryId, :productId, :quantity, :amount)
            ON CONFLICT (sale_date, order_item_id)
            DO UPDATE SET amount = EXCLUDED.amount, quantity = EXCLUDED.quantity
            """.trimIndent(),
        )
        .beanMapped()
        .build()

5. Jobs B · C — KPI Aggregation

Aggregation is done by PostgreSQL itself, not in memory. Put the window functions and GROUP BY from Part 6 §5 into the Reader query so the Reader streams already-aggregated rows one at a time, and the Writer just upserts.

5.1 daily-kpi — daily revenue by category

@Bean
@org.springframework.batch.core.configuration.annotation.StepScope
fun categoryKpiReader(
    analyticsDataSource: DataSource,
    @Value("#{jobParameters['targetDate']}") targetDate: LocalDate,
): JdbcPagingItemReader<KpiDailyRow> {
    // SELECT category_id, SUM(amount) AS total_amount, COUNT(DISTINCT ...) AS order_count
    // FROM sales_fact WHERE sale_date = :targetDate GROUP BY category_id
    // → one row per category; the Reader streams aggregated rows
    return JdbcPagingItemReaderBuilder<KpiDailyRow>()
        .name("categoryKpiReader")
        .dataSource(analyticsDataSource)
        .pageSize(500)
        // .queryProvider(...) GROUP BY category_id
        .parameterValues(mapOf("targetDate" to targetDate))
        .build()
}

Put UNIQUE (sale_date, category_id) on kpi_daily and upsert, and daily-kpi is idempotent too. “Top-10 products” is pulled with the ROW_NUMBER() OVER (ORDER BY SUM(amount) DESC) window function into a separate KPI table — impossible on H2, only on real PostgreSQL (Part 6 §5.1).

5.2 monthly-kpi — monthly rollup

On the 1st of each month at 03:00, re-aggregate kpi_daily by month into kpi_monthly. Since the source is already aggregated daily, it’s light.

// SELECT to_char(sale_date, 'YYYY-MM') AS year_month, category_id, SUM(total_amount)
// FROM kpi_daily WHERE sale_date >= :monthStart AND sale_date < :nextMonth
// GROUP BY year_month, category_id  → upsert kpi_monthly

The KPI jobs could switch to JpaPagingItemReader (Part 2 §2.3) where domain-object transforms are needed, but for pure aggregation SQL is fastest and simplest.


6. Orchestration & Operations

6.1 Three CronJobs, a time-gap DAG

Inter-job dependency is simplified with time gaps (Part 4 §2). Leave time for the ETL to finish before running KPIs.

flowchart TD
    C1["CronJob: daily-etl<br/>schedule 0 1 * * *"] --> ETL["load sales_fact"]
    C2["CronJob: daily-kpi<br/>schedule 0 2 * * *"] --> KD["load kpi_daily"]
    C3["CronJob: monthly-kpi<br/>schedule 0 3 1 * *"] --> KM["load kpi_monthly"]
    ETL -.->|"1-hour gap"| KD
    KD -.->|"1st of month"| KM

If you need strict dependency (KPI only after ETL succeeds), wire them as a DAG in Argo Workflows/Airflow (Part 4 §2). The time-gap approach is simple but risks the KPI aggregating stale data if the ETL doesn’t finish within the hour — scale up to a DAG as volume grows.

6.2 Operational requirements — the series answers them all

RequirementSolutionSource
Safe same-date re-runJobParameters date key + ON CONFLICT upsertPart 3 §5
Partial-failure recoveryfaultTolerant + skipLimit + SkipListener loggingPart 3 §2–3
Failure alertsSlack from JobExecutionListener.afterJobPart 4 §5
Throughput · failure rate · durationMicrometer + Prometheus + GrafanaPart 6 §1
Multi-instance single executionK8s CronJob concurrencyPolicy: ForbidPart 4 §7
Accelerate large ETLpartitioning into 4Part 5 §2
Operational-load isolationread-only account + (if needed) read replicaPart 4 §6

6.3 Docker Compose locally, CronJob in production

Locally, spin up a single PostgreSQL 16 with Docker Compose and run the jobs directly (Appendix A). In production, deploy the same image as three K8s CronJobs (Part 6 §6.2). Even with multiple instances, CronJob launches each job exactly once, so there’s no duplicate execution.


7. Series Map — Which Part Went Where

How this single pipeline absorbed the whole series, at a glance:

PartTechniqueCapstone application point
Part 1JobRepository metadataexecution history & restart basis for the 3 jobs
Part 2chunks · JDBC Reader/Writerthe ETL’s JdbcPagingItemReader + JdbcBatchItemWriter
Part 3idempotency · Skip · restartON CONFLICT upsert, faultTolerant, date key
Part 4scheduling · data sourcesCronJob triggers, D-pattern schema split, single execution
Part 5partitioningETL worker Step split into 4
Part 6observability · testing · deploymentMicrometer/MDC, Testcontainers, multi-stage + CronJob

Recap

The key takeaways from the capstone, one line each:

  • One pipeline absorbs the whole series — chunks, idempotency, partitioning, observability, and CronJob all operate together inside the three jobs daily-etl · daily-kpi · monthly-kpi.
  • Schema split separates operational and analytics — one PostgreSQL 16 instance, two datasources (currentSchema + role). When load isolation is needed, change only the URL to evolve to a read replica or separate instance.
  • PostgreSQL does the aggregation — put GROUP BY/window functions in the Reader query to stream already-aggregated rows; the Writer just upserts. Validate on real PostgreSQL, not H2.
  • Idempotency + single execution are the two pillars of operations — date key + upsert for safe re-runs, CronJob Forbid to block duplicate execution.
  • Same code for local and production — spin it up in 30 minutes with Docker Compose, deploy the same image as CronJobs.

This post closes the Spring Batch 6 Guide series. Starting from Part 1’s Hello Tasklet, through chunks and transactions, past scheduling and parallelism and observability, we’ve arrived at a real analytics pipeline. A beginner who follows the series end to end can design, build, and operate “batch that runs safely” with Spring Batch 6. Natural next topics: extending this pipeline’s alerts to be event-driven with Spring Cloud Stream, or deepening observability with OpenTelemetry.


Appendix

A. Running locally — Docker Compose

Expand — one PostgreSQL 16 hosting two schemas
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: market
      POSTGRES_PASSWORD: postgres
    ports:
      - "5432:5432"
    volumes:
      - ./init:/docker-entrypoint-initdb.d   # SQL creating marketplace·analytics schemas·roles

Put SQL creating the two schemas and roles (etl_reader · analytics_writer) in init/, and it’s applied automatically on the container’s first start. Then launch a job directly with --spring.batch.job.name=daily-etl --targetDate=2026-05-16.

B. External references

Shop on Amazon

As an Amazon Associate, I earn from qualifying purchases.