Spring Batch โ Complete Guide
- New learners โ start at What is Spring Batch? and Core Architecture to understand the foundational model.
- Senior engineers โ jump to Scaling Patterns, Transaction Boundaries, Partitioning, or Production Patterns.
What is Spring Batch?โ
Spring Batch is a lightweight, open-source framework for processing large volumes of data reliably. It provides the infrastructure for batch workloads โ reading millions of records, transforming them, writing the output โ with built-in restart, skip, retry, and transaction management.
It does not schedule jobs (that is Quartz, @Scheduled, or a cron job). Spring Batch is the engine that runs when a scheduled trigger fires.
The assembly line analogyโ
The clearest mental model: a factory assembly line.
| Factory concept | Spring Batch equivalent | What it does |
|---|---|---|
| Assembly line | Job | The entire process from start to finish |
| Phase of the line | Step | One logical stage (e.g. read CSV โ write to DB) |
| Robot arm that picks parts | ItemReader | Reads one record at a time from the source |
| Painting / welding robot | ItemProcessor | Transforms or filters each record |
| Truck that delivers in batches | ItemWriter | Writes a batch of records to the destination |
| Factory logbook | JobRepository | Records exactly where the line stopped โ enables restart |
Without Spring Batch (naive loop):
Read row 1 โ Process โ Write | Read row 2 โ Process โ Write | ... ร 5,000,000
- If it crashes at row 3,451,000 you start over from row 1.
- Everything runs in one giant transaction โ one failure rolls back everything.
- No visibility into progress.
With Spring Batch:
Read 100 rows โ Process 100 โ Write 100 โ COMMIT โ
Read 100 rows โ Process 100 โ Write 100 โ COMMIT โ
... crash at row 3,451,000 ...
Restart โ resumes from row 3,451,001 โ
When to use Spring Batchโ
| Use case | Spring Batch? | Why |
|---|---|---|
| Nightly ETL โ migrate 10M records from legacy DB to new schema | โ | Chunked transactions, restartability, parallel partitioning |
| Generate monthly invoices for 500K customers | โ | Sequential processing with audit trail, fault tolerance |
| Send bulk emails / notifications | โ | Rate-limiting with chunk size, skip bad addresses |
| Process uploaded CSV files asynchronously | โ | FlatFileItemReader + async job launch |
| Real-time event processing (per-event, millisecond latency) | โ | Use Kafka Streams or Spring Integration instead |
| A simple CRUD API with a scheduled cleanup query | โ | @Scheduled + @Query with @Modifying is simpler |
| Stream processing with complex windowing | โ | Apache Flink or Kafka Streams |
Core Architectureโ
Component hierarchyโ
JobLauncher
โ
โผ
Job โโโ JobParameters (unique run identifier)
โ
โโโ Step 1: Read CSV โ Validate โ Write to staging DB
โ โโโ ItemReader
โ โโโ ItemProcessor
โ โโโ ItemWriter
โ
โโโ Step 2: Aggregate staging data โ Write to reporting DB
โ
โโโ Step 3: Send email summary โ Archive CSV file
JobRepository (PostgreSQL / H2)
โโโ Stores: JobInstance, JobExecution, StepExecution metadata
Core components explainedโ
| Component | Role | Example |
|---|---|---|
Job | Top-level batch process โ a named sequence of Steps | "monthlyInvoiceJob" |
Step | One processing phase within a Job | "readCsvStep", "writeDbStep" |
ItemReader<I> | Reads one item at a time from a source | FlatFileItemReader, JdbcPagingItemReader |
ItemProcessor<I, O> | Transforms/filters one item, returns null to skip | Validate, enrich, convert |
ItemWriter<O> | Writes a list (chunk) of items to a destination | JdbcBatchItemWriter, JpaItemWriter |
JobLauncher | Starts a Job with given JobParameters | Triggered by REST, @Scheduled, or CLI |
JobRepository | Persists Job/Step execution state to a DB | Enables restart, monitoring, deduplication |
JobParameters | Key-value pairs that make a Job run unique | {"date":"2024-01-15","file":"/data/jan.csv"} |
JobRepository โ the restart engineโ
The JobRepository is what makes Spring Batch fundamentally different from a loop. Before processing a single row, it writes to its metadata tables:
"Job 'invoiceJob' with parameters {month=2024-01} โ STARTED at 02:00:00"
After every successful chunk commit:
"Step 'processInvoices' โ processed 500 rows, last committed item index = 500"
On crash, a restart reads this metadata and resumes from the exact chunk boundary. Without JobRepository, there is no restart โ it is mandatory.
# application.yaml โ Spring Boot 3.x auto-configures JobRepository with your datasource
spring:
batch:
jdbc:
initialize-schema: always # creates BATCH_* metadata tables on startup
datasource:
url: jdbc:postgresql://localhost:5432/mydb
In production, isolate batch metadata in its own schema or database to prevent it from polluting your application's tables:
spring.batch.jdbc.table-prefix: BATCH_ # default โ all metadata tables prefixed BATCH_
Chunk-Oriented Processingโ
Chunk processing is the heart of Spring Batch. Instead of processing one record at a time (full transaction per record) or all records at once (one giant transaction), it processes in configurable chunks โ each chunk is one database transaction.
The chunk lifecycleโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ One Transaction โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โ
โ Read item 1 โโโ [item1] โ
โ Read item 2 โโโ [item1, item2] โ
โ ... โ
โ Read item 100 โโ [item1 ... item100] โ chunk size reached โ
โ โ โ
โ ItemProcessor โ
โ (transforms each item) โ
โ โ โ
โ [output1 ... output100] โ
โ โ โ
โ ItemWriter โ
โ (bulk INSERT all 100 at once) โ
โ โ โ
โ COMMIT โ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Repeat until ItemReader returns null (source exhausted)
Key property: if the ItemWriter throws an exception mid-chunk, the entire chunk of 100 rolls back โ not the entire job. The job can then retry or skip that chunk and continue from the next one.
Basic step configurationโ
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public Step processCsvStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<UserCsvDto> reader,
ItemProcessor<UserCsvDto, UserEntity> processor,
ItemWriter<UserEntity> writer) {
return new StepBuilder("processCsvStep", jobRepository)
.<UserCsvDto, UserEntity>chunk(100, txManager) // chunk size = 100
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job importUsersJob(JobRepository jobRepository, Step processCsvStep) {
return new JobBuilder("importUsersJob", jobRepository)
.start(processCsvStep)
.build();
}
}
Choosing chunk sizeโ
| Dataset / Item type | Recommended chunk size | Why |
|---|---|---|
| Small items (< 1 KB each), fast write | 500โ1000 | More items per transaction = fewer commits |
| Medium items (~5 KB each) with DB write | 100โ500 | Balance between commit frequency and memory |
| Large items (> 50 KB), complex processor | 10โ50 | Avoid heap bloat; smaller transaction window |
| External API write (per-item HTTP call) | 1โ10 | API latency dominates; small chunk = faster retry |
| Chunk with Async processor | 50โ200 | Futures resolve concurrently; larger batch improves throughput |
Setting chunk(10_000) to reduce commits sounds efficient but creates:
- A long-running DB transaction โ locks rows/tables for seconds, causing deadlocks with live user traffic.
- 10,000 large objects in JVM heap simultaneously โ OutOfMemoryError risk.
- On failure, the entire 10,000-item chunk rolls back โ losing more work and making retry more expensive.
Rule of thumb: start at 100, measure memory + throughput, tune upward cautiously.
ItemReader โ Reading Dataโ
ItemReader<T> reads one item per call. Spring Batch calls it repeatedly until it returns null, which signals the end of input.
- CSV / Flat File
- JDBC Paging (thread-safe)
- JDBC Cursor (single-thread only)
- JPA / Repository
- CompositeItemReader (multiple sources)
@Bean
public FlatFileItemReader<UserCsvDto> csvReader() {
return new FlatFileItemReaderBuilder<UserCsvDto>()
.name("userCsvReader")
.resource(new FileSystemResource("/data/users.csv"))
.delimited()
.delimiter(",")
.names("id", "firstName", "lastName", "email", "status")
.targetType(UserCsvDto.class)
.linesToSkip(1) // skip header row
.encoding("UTF-8")
.build();
}
For very large files, enable buffered reading:
.bufferedReaderFactory((resource, encoding) ->
new BufferedReader(new InputStreamReader(resource.getInputStream(), encoding), 65536))
// JdbcPagingItemReader โ thread-safe, recommended for multi-threaded steps
@Bean
public JdbcPagingItemReader<UserEntity> jdbcPagingReader(DataSource dataSource) {
Map<String, Order> sortKeys = Map.of("id", Order.ASCENDING);
return new JdbcPagingItemReaderBuilder<UserEntity>()
.name("userPagingReader")
.dataSource(dataSource)
.selectClause("SELECT id, first_name, last_name, email, status")
.fromClause("FROM users")
.whereClause("WHERE status = 'PENDING' AND created_at < :cutoff")
.parameterValues(Map.of("cutoff", LocalDate.now().minusDays(30)))
.sortKeys(sortKeys) // REQUIRED โ deterministic ordering for paging
.pageSize(100) // fetches 100 rows per JDBC round trip
.rowMapper(new BeanPropertyRowMapper<>(UserEntity.class))
.build();
}
// JdbcCursorItemReader โ fastest for single-threaded steps; NOT thread-safe
@Bean
public JdbcCursorItemReader<UserEntity> jdbcCursorReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<UserEntity>()
.name("userCursorReader")
.dataSource(dataSource)
.sql("SELECT id, first_name, email FROM users WHERE status = 'PENDING'")
.rowMapper(new BeanPropertyRowMapper<>(UserEntity.class))
.fetchSize(100) // JDBC prefetch batch size
.build();
}
// โ ๏ธ Do NOT use this in a multi-threaded Step โ multiple threads will read the same rows
@Bean
public RepositoryItemReader<User> jpaReader(UserRepository repo) {
return new RepositoryItemReaderBuilder<User>()
.name("userJpaReader")
.repository(repo)
.methodName("findByStatus")
.arguments(List.of("PENDING"))
.sorts(Map.of("id", Sort.Direction.ASC)) // required for stable paging
.pageSize(100)
.build();
}
// Read from multiple sources sequentially in one Step
@Bean
public ItemReader<UserCsvDto> compositeReader() {
SynchronizedItemStreamReader<UserCsvDto> reader1 =
new SynchronizedItemStreamReader<>();
reader1.setDelegate(csvReaderForFileA());
SynchronizedItemStreamReader<UserCsvDto> reader2 =
new SynchronizedItemStreamReader<>();
reader2.setDelegate(csvReaderForFileB());
return new CompositeItemStreamReader<>(List.of(reader1, reader2));
}
ItemReader comparisonโ
| Reader | Thread-safe | Use for | Notes |
|---|---|---|---|
FlatFileItemReader | โ | CSV, fixed-width, delimited files | Wrap with SynchronizedItemStreamReader for multi-thread |
JdbcCursorItemReader | โ | Large DB reads, single-threaded steps | Fastest; holds open DB cursor |
JdbcPagingItemReader | โ | DB reads in multi-threaded steps | Uses LIMIT/OFFSET; requires stable sort key |
JpaPagingItemReader | โ | JPA entity reads | Manages its own EntityManager per page |
RepositoryItemReader | โ | Spring Data repositories | Convenient; less control over SQL |
JsonItemReader | โ | JSON array files | Jackson-based |
AxonFrameworkEventReader | N/A | Event store replay | Specialised |
ItemProcessor โ Transforming Dataโ
ItemProcessor<I, O> receives one item, returns the transformed output, or returns null to filter (skip) the item without counting it as an error.
@Component
public class UserProcessor implements ItemProcessor<UserCsvDto, UserEntity> {
@Override
public UserEntity process(UserCsvDto csv) throws Exception {
// Return null to silently filter this item โ it will not be passed to the writer
if (csv.getEmail() == null || !csv.getEmail().contains("@")) {
log.warn("Filtering invalid email for user id={}", csv.getId());
return null; // โ filtered item
}
return UserEntity.builder()
.externalId(csv.getId())
.firstName(csv.getFirstName().trim())
.lastName(csv.getLastName().trim())
.email(csv.getEmail().toLowerCase())
.status(Status.PENDING)
.importedAt(Instant.now())
.build();
}
}
CompositeItemProcessor โ chaining processorsโ
When transformation has multiple distinct stages, compose them rather than building one monolithic processor:
@Bean
public CompositeItemProcessor<UserCsvDto, UserEntity> compositeProcessor(
ValidationProcessor validationProcessor,
EnrichmentProcessor enrichmentProcessor,
MappingProcessor mappingProcessor) {
CompositeItemProcessor<UserCsvDto, UserEntity> processor = new CompositeItemProcessor<>();
processor.setDelegates(List.of(
validationProcessor, // Stage 1: validate fields
enrichmentProcessor, // Stage 2: call external API to enrich
mappingProcessor // Stage 3: map to entity
));
return processor;
}
ItemWriter โ Writing Dataโ
ItemWriter<T> receives a Chunk<T> (the full list of processed items) and writes them all at once. This is where bulk INSERT happens.
- JDBC Batch Writer
- JPA Writer
- Flat File Writer
- CompositeItemWriter (multiple targets)
@Bean
public JdbcBatchItemWriter<UserEntity> jdbcWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<UserEntity>()
.dataSource(dataSource)
.sql("""
INSERT INTO users (external_id, first_name, last_name, email, status, imported_at)
VALUES (:externalId, :firstName, :lastName, :email, :status, :importedAt)
ON CONFLICT (external_id) DO UPDATE
SET first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name,
status = EXCLUDED.status
""")
.beanMapped() // maps named SQL params to entity field names
.assertUpdates(false) // set true to fail if a row wasn't inserted/updated
.build();
}
@Bean
public JpaItemWriter<UserEntity> jpaWriter(EntityManagerFactory emf) {
JpaItemWriter<UserEntity> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
// Uses EntityManager.merge() โ handles insert and update automatically
// Warning: triggers dirty checking per entity โ slower than JdbcBatchItemWriter
return writer;
}
@Bean
public FlatFileItemWriter<UserEntity> csvWriter() {
return new FlatFileItemWriterBuilder<UserEntity>()
.name("userCsvWriter")
.resource(new FileSystemResource("/output/users-processed.csv"))
.delimited()
.delimiter(",")
.names("id", "firstName", "email", "status")
.headerCallback(w -> w.write("id,firstName,email,status"))
.shouldDeleteIfExists(true)
.build();
}
// Write to two destinations in one chunk โ both succeed or both roll back
@Bean
public CompositeItemWriter<UserEntity> compositeWriter(
JdbcBatchItemWriter<UserEntity> dbWriter,
FlatFileItemWriter<UserEntity> auditWriter) {
CompositeItemWriter<UserEntity> writer = new CompositeItemWriter<>();
writer.setDelegates(List.of(dbWriter, auditWriter));
return writer;
}
Multi-Step Jobs and Flow Controlโ
A Job can contain multiple Steps executed conditionally based on the outcome of previous steps.
Sequential stepsโ
@Bean
public Job etlJob(JobRepository repo, Step extract, Step transform, Step load) {
return new JobBuilder("etlJob", repo)
.start(extract)
.next(transform)
.next(load)
.build();
}
Conditional flow with exit codesโ
@Bean
public Job conditionalJob(JobRepository repo,
Step validateStep,
Step processStep,
Step errorReportStep) {
return new JobBuilder("conditionalJob", repo)
.start(validateStep)
.on("FAILED") .to(errorReportStep) // if validate fails โ report
.on("COMPLETED").to(processStep) // if validate passes โ process
.from(processStep)
.on("*") .end() // any outcome โ done
.from(errorReportStep)
.on("*") .fail() // mark Job as FAILED after report
.build();
}
JobParameters โ making runs uniqueโ
Every JobInstance is uniquely identified by Job name + JobParameters. Running the same job with the same parameters twice is rejected (idempotency guard):
// Launching a job programmatically
@Service
public class BatchLaunchService {
@Autowired private JobLauncher jobLauncher;
@Autowired private Job importUsersJob;
public void launch(String filePath) throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("filePath", filePath)
.addLocalDateTime("runAt", LocalDateTime.now()) // makes it unique per run
.toJobParameters();
JobExecution execution = jobLauncher.run(importUsersJob, params);
log.info("Job status: {}", execution.getStatus());
}
}
Fault Tolerance โ Skip and Retryโ
Skip โ ignore bad recordsโ
@Bean
public Step tolerantStep(JobRepository repo, PlatformTransactionManager tx) {
return new StepBuilder("tolerantStep", repo)
.<UserCsvDto, UserEntity>chunk(100, tx)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.skipLimit(50) // allow up to 50 skipped items total
.skip(ValidationException.class) // skip on validation error
.skip(DataIntegrityViolationException.class) // skip on DB constraint violation
.noSkip(FileNotFoundException.class) // never skip โ always fail
.build();
}
Skip lifecycle: when an exception matches the skip list, Spring Batch retries the chunk item-by-item (binary search isolation). The offending item is skipped, the rest of the chunk is committed normally.
SkipListener โ log what you skippedโ
@Component
public class UserSkipListener implements SkipListener<UserCsvDto, UserEntity> {
@Override
public void onSkipInRead(Throwable t) {
log.warn("Skipped during READ: {}", t.getMessage());
}
@Override
public void onSkipInProcess(UserCsvDto item, Throwable t) {
log.warn("Skipped during PROCESS: item={}, reason={}", item.getId(), t.getMessage());
}
@Override
public void onSkipInWrite(UserEntity item, Throwable t) {
log.warn("Skipped during WRITE: entity={}, reason={}", item.getExternalId(), t.getMessage());
// Write to a dead-letter table for manual review
deadLetterRepository.save(new DeadLetter(item, t.getMessage()));
}
}
// Register on the step:
.faultTolerant()
.skipLimit(50)
.skip(ValidationException.class)
.listener(userSkipListener)
Retry โ transient failuresโ
Use retry for transient failures where the same operation may succeed on a second attempt (network blip, optimistic lock, temporary API rate limit):
.faultTolerant()
.retryLimit(3) // up to 3 attempts per item
.retry(OptimisticLockingFailureException.class)
.retry(ResourceAccessException.class) // RestTemplate / network timeout
.noRetry(DataIntegrityViolationException.class) // don't retry constraint violations
.backOffPolicy(new ExponentialBackOffPolicy()) // wait 1s, 2s, 4s between retries
Skip vs Retry โ when to use eachโ
| Skip | Retry | |
|---|---|---|
| Use for | Permanently bad data (invalid email, corrupt row) | Transient failures (network, lock contention) |
| Outcome | Item is discarded | Same item is re-attempted |
| Audit | Log to dead-letter table | Log retry attempts |
| Example exception | ValidationException, DataIntegrityViolationException | ResourceAccessException, OptimisticLockingFailureException |
Listeners โ Hooks into the Job Lifecycleโ
@Component
public class JobCompletionListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("Job '{}' starting with params: {}",
jobExecution.getJobInstance().getJobName(),
jobExecution.getJobParameters());
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("Job completed. Items: read={}, written={}, skipped={}",
jobExecution.getStepExecutions().stream()
.mapToLong(s -> s.getReadCount()).sum(),
jobExecution.getStepExecutions().stream()
.mapToLong(s -> s.getWriteCount()).sum(),
jobExecution.getStepExecutions().stream()
.mapToLong(s -> s.getSkipCount()).sum());
} else {
log.error("Job FAILED: {}", jobExecution.getFailureExceptions());
alertService.notifyJobFailure(jobExecution);
}
}
}
// Register on the job:
@Bean
public Job importJob(JobRepository repo, Step step, JobCompletionListener listener) {
return new JobBuilder("importJob", repo)
.listener(listener)
.start(step)
.build();
}
Scaling Patternsโ
A single-threaded job reading 50 million rows with a 200ms processor per item takes 115 days. Senior engineers know when and how to scale.
Pattern 1 โ Multi-threaded Stepโ
Run multiple threads executing the chunk lifecycle in parallel within a single step. Each thread reads โ processes โ writes its own chunk independently.
@Bean
public Step multiThreadedStep(JobRepository repo, PlatformTransactionManager tx) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("batch-worker-");
executor.initialize();
return new StepBuilder("multiThreadedStep", repo)
.<UserCsvDto, UserEntity>chunk(100, tx)
.reader(synchronizedReader()) // โ MUST be thread-safe
.processor(processor()) // stateless โ thread-safe by default
.writer(writer()) // JdbcBatchItemWriter is thread-safe
.taskExecutor(executor)
.throttleLimit(4) // max concurrent chunks
.build();
}
// CRITICAL: wrap non-thread-safe readers
@Bean
public SynchronizedItemStreamReader<UserCsvDto> synchronizedReader() {
SynchronizedItemStreamReader<UserCsvDto> reader = new SynchronizedItemStreamReader<>();
reader.setDelegate(csvReader()); // FlatFileItemReader is NOT thread-safe
return reader;
}
FlatFileItemReader and JdbcCursorItemReader are not thread-safe โ multiple threads will read the same rows or corrupt the file position. Always use SynchronizedItemStreamReader for file readers, or switch to JdbcPagingItemReader (inherently thread-safe) for database readers.
Pattern 2 โ AsyncItemProcessor / AsyncItemWriterโ
The processor submits work to a thread pool and immediately returns a Future<O>. The AsyncItemWriter waits for all futures in the chunk to resolve, then bulk-writes. Ideal when the processor makes slow external API calls.
@Bean
public AsyncItemProcessor<UserCsvDto, UserEntity> asyncProcessor(
UserProcessor delegate,
ThreadPoolTaskExecutor executor) {
AsyncItemProcessor<UserCsvDto, UserEntity> async = new AsyncItemProcessor<>();
async.setDelegate(delegate);
async.setTaskExecutor(executor);
return async;
}
@Bean
public AsyncItemWriter<UserEntity> asyncWriter(JdbcBatchItemWriter<UserEntity> delegate) {
AsyncItemWriter<UserEntity> async = new AsyncItemWriter<>();
async.setDelegate(delegate);
return async;
}
@Bean
public Step asyncProcessingStep(JobRepository repo, PlatformTransactionManager tx) {
return new StepBuilder("asyncStep", repo)
.<UserCsvDto, Future<UserEntity>>chunk(100, tx) // note: Future<> generic type
.reader(reader())
.processor(asyncProcessor(...))
.writer(asyncWriter(...))
.build();
}
Throughput model:
Sync: 100 items ร 200ms/item = 20 seconds per chunk
Async (10 threads): 100 items / 10 threads ร 200ms = 2 seconds per chunk โ 10ร faster
Pattern 3: Partitioningโ
Partitioning splits a dataset into non-overlapping partitions and assigns each to an independent worker step. Workers execute fully in parallel โ either on the same JVM (local) or across multiple servers (remote).
How partitioning worksโ
Master Step (Partitioner)
โ
โโโ Partition 1: user_id 1โ100,000 โ Worker Step 1
โโโ Partition 2: user_id 100,001โ200,000 โ Worker Step 2
โโโ Partition 3: user_id 200,001โ300,000 โ Worker Step 3
โโโ Partition 4: user_id 300,001โ400,000 โ Worker Step 4
(all run in parallel)
Local partitioning (same JVM)โ
// Step 1: Define how to split the dataset
@Component
public class UserPartitioner implements Partitioner {
@Autowired private JdbcTemplate jdbcTemplate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Long minId = jdbcTemplate.queryForObject("SELECT MIN(id) FROM users", Long.class);
Long maxId = jdbcTemplate.queryForObject("SELECT MAX(id) FROM users", Long.class);
long rangeSize = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
long start = minId + (i * rangeSize);
long end = Math.min(start + rangeSize - 1, maxId);
ExecutionContext ctx = new ExecutionContext();
ctx.putLong("minId", start);
ctx.putLong("maxId", end);
partitions.put("partition-" + i, ctx);
}
return partitions; // e.g. {partition-0: {minId:1, maxId:100000}, ...}
}
}
// Step 2: Worker step reads only its assigned partition range
@Bean
@StepScope // CRITICAL โ new instance per partition execution context
public JdbcPagingItemReader<UserEntity> partitionedReader(
DataSource ds,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcPagingItemReaderBuilder<UserEntity>()
.name("partitionedUserReader")
.dataSource(ds)
.selectClause("SELECT *")
.fromClause("FROM users")
.whereClause("WHERE id >= :minId AND id <= :maxId")
.parameterValues(Map.of("minId", minId, "maxId", maxId))
.sortKeys(Map.of("id", Order.ASCENDING))
.pageSize(100)
.rowMapper(new BeanPropertyRowMapper<>(UserEntity.class))
.build();
}
// Step 3: Wire the master + worker steps
@Bean
public Step workerStep(JobRepository repo, PlatformTransactionManager tx) {
return new StepBuilder("workerStep", repo)
.<UserEntity, UserEntity>chunk(100, tx)
.reader(partitionedReader(null, null, null)) // Spring injects at runtime
.processor(processor())
.writer(writer())
.build();
}
@Bean
public Step masterStep(JobRepository repo, UserPartitioner partitioner) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.initialize();
return new StepBuilder("masterStep", repo)
.partitioner("workerStep", partitioner)
.step(workerStep(null, null))
.gridSize(4) // 4 partitions โ 4 worker threads
.taskExecutor(executor)
.build();
}
Remote partitioning (across servers)โ
For truly massive datasets (hundreds of millions of rows), distribute worker steps across multiple server instances via a message queue:
Master (Server A) Message Queue (RabbitMQ / Kafka)
โ โ
โโโ Partition 1 message โโโ โโโโ Worker Server B processes partition 1
โโโ Partition 2 message โโโ โโโโ Worker Server C processes partition 2
โโโ Partition 3 message โโโ โโโโ Worker Server D processes partition 3
โโโ Partition 4 message โโโ โโโโ Worker Server E processes partition 4
# Requires spring-batch-integration dependency
spring:
batch:
remote-partitioning:
polling-interval: 1000ms
Scaling pattern comparisonโ
| Pattern | Complexity | Best for | Limitation |
|---|---|---|---|
| Single-threaded | Low | < 1M simple records | Bottleneck at slow processors |
| Multi-threaded Step | Medium | I/O-bound reads + fast processor | Reader must be thread-safe |
| AsyncItemProcessor | Medium | Heavy external API calls in processor | Complex Future generics |
| Local Partitioning | High | 1Mโ100M records, single server | Limited by one JVM's threads |
| Remote Partitioning | Very high | 100M+ records, multiple servers | Requires messaging infrastructure |
Chunk Transaction Boundariesโ
Understanding what happens at the JVM and database level during a chunk transaction is essential for diagnosing performance problems.
What a chunk transaction controlsโ
BEGIN TRANSACTION
โ
โโ JDBC batch INSERT (100 rows) โโ table-level lock held
โ
โโ BATCH_STEP_EXECUTION UPDATE โโ "commit count = N" recorded in JobRepository
โ
COMMIT
โ
โโ Lock released โ live user traffic can proceed
Lock duration = time to process 100 items + time to bulk INSERT them. A chunk of 1000 items with a slow processor holds locks for 10ร longer, blocking concurrent user writes to the same table.
Transaction isolation and batch conflictsโ
// If batch writes to a table that live users also write to,
// consider running batch in READ COMMITTED (default) not REPEATABLE READ
@Bean
public Step stepWithExplicitIsolation(JobRepository repo) {
DefaultTransactionAttribute attr = new DefaultTransactionAttribute();
attr.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
attr.setTimeout(30); // fail if transaction takes > 30 seconds
return new StepBuilder("step", repo)
.<In, Out>chunk(100, transactionManager)
.reader(reader())
.writer(writer())
.transactionAttribute(attr)
.build();
}
The optimal chunk size formulaโ
Target chunk duration = 1โ5 seconds (keeps locks short, throughput high)
Chunk size = (Target duration in seconds ร Items per second)
Example:
Processor speed: 500 items/sec
Writer speed: 2000 items/sec โ bottleneck is processor at 500/sec
Target duration: 2 seconds
โ Chunk size = 500 ร 2 = 1000 items
BUT: if each item is 5 KB:
โ 1000 ร 5 KB = 5 MB in heap per chunk thread
โ With 4 threads: 20 MB โ acceptable
โ With 20 MB items: 1000 ร 20 MB = 20 GB โ OutOfMemoryError โ reduce chunk size
Production Patternsโ
๐ฌ Senior deep-dive: @StepScope and @JobScope
@StepScope creates a new bean instance per Step execution, injecting the current StepExecutionContext into it. This is mandatory for partitioned readers (each worker needs its own reader with its own ID range):
@Bean
@StepScope // โ creates one instance per step execution
public FlatFileItemReader<UserCsvDto> scopedReader(
@Value("#{jobParameters['filePath']}") String filePath) {
// filePath is injected from JobParameters at runtime โ not at bean creation
return new FlatFileItemReaderBuilder<UserCsvDto>()
.resource(new FileSystemResource(filePath))
.build();
}
Without @StepScope, @Value("#{jobParameters[...]}") is resolved at application startup โ before any Job has run โ and the value is null.
@JobScope is similar but scoped to the entire Job execution rather than a single Step. Use it for beans that must be shared across all steps of one job run but recreated for each new job run.
๐ฌ Senior deep-dive: idempotency and re-runnability
Spring Batch prevents re-running a completed JobInstance by default โ the same job name + same parameters = "already done". But real production systems must handle re-runs:
Strategy 1: Include a unique run timestamp in parameters
new JobParametersBuilder()
.addString("file", "/data/users.csv")
.addLocalDateTime("runAt", LocalDateTime.now()) // unique per trigger
.toJobParameters();
Downside: every trigger is a new instance โ you lose the "don't run the same file twice" protection.
Strategy 2: Make the processor idempotent using ON CONFLICT
INSERT INTO users (external_id, ...)
VALUES (...)
ON CONFLICT (external_id) DO UPDATE SET ...
Now re-running the same file is safe โ duplicate rows update rather than fail.
Strategy 3: Explicit deduplication check
@Override
public UserEntity process(UserCsvDto csv) {
if (userRepository.existsByExternalId(csv.getId())) {
return null; // skip โ already imported
}
return mapper.toEntity(csv);
}
๐ฌ Senior deep-dive: monitoring with Actuator and Micrometer
Spring Batch exposes metrics automatically via Micrometer when spring-boot-actuator is on the classpath:
management:
endpoints:
web:
exposure:
include: health, metrics, batch
metrics:
tags:
application: my-batch-app
Key metrics to monitor:
| Metric | Alert condition |
|---|---|
spring.batch.job.duration | > expected SLA (e.g. must finish before 6 AM) |
spring.batch.step.duration | Sudden spike โ processor bottleneck |
spring.batch.chunk.write.duration | Spike โ DB write contention or lock |
spring.batch.item.skip.count | Rising โ data quality degradation |
hikaricp.connections.pending | > 0 sustained โ connection pool pressure from batch |
// Custom metric: track business-level KPI alongside technical metrics
@Component
public class BatchMetricsListener implements StepExecutionListener {
@Autowired private MeterRegistry meterRegistry;
@Override
public void afterStep(StepExecution stepExecution) {
meterRegistry.counter("batch.users.imported",
"job", stepExecution.getJobExecution().getJobInstance().getJobName(),
"status", stepExecution.getStatus().name())
.increment(stepExecution.getWriteCount());
}
}
๐ฌ Senior deep-dive: JDBC batch performance tuning
Even with JdbcBatchItemWriter, writes can be slow if JDBC batching is not enabled at the driver level:
# application.yaml โ enable JDBC batching for PostgreSQL and MySQL
spring:
datasource:
url: jdbc:postgresql://localhost:5432/mydb?reWriteBatchedInserts=true
# PostgreSQL: reWriteBatchedInserts=true rewrites N individual INSERTs
# into one multi-row INSERT โ 3โ5ร faster
jpa:
properties:
hibernate:
jdbc.batch_size: 100 # must match chunk size
order_inserts: true # group same-table inserts together
order_updates: true
generate_statistics: false # turn off in production
Batch INSERT performance comparison (10,000 rows, PostgreSQL):
| Approach | Time |
|---|---|
| 10,000 individual INSERTs | ~8 seconds |
JdbcBatchItemWriter without reWriteBatchedInserts | ~3 seconds |
JdbcBatchItemWriter + reWriteBatchedInserts=true | ~0.4 seconds |
COPY FROM (PostgreSQL native bulk load) | ~0.1 seconds |
For absolute maximum throughput, bypass JdbcBatchItemWriter and use PostgreSQL's COPY command via a custom writer:
@Component
public class PostgresCopyWriter implements ItemWriter<UserEntity> {
@Autowired private DataSource dataSource;
@Override
public void write(Chunk<? extends UserEntity> chunk) throws Exception {
try (Connection conn = dataSource.getConnection()) {
CopyManager cm = ((PGConnection) conn.unwrap(Connection.class)).getCopyAPI();
StringBuilder sb = new StringBuilder();
for (UserEntity u : chunk) {
sb.append(u.getExternalId()).append('\t')
.append(u.getEmail()).append('\n');
}
cm.copyIn("COPY users(external_id, email) FROM STDIN",
new StringReader(sb.toString()));
}
}
}
Testing Spring Batch Jobsโ
@SpringBatchTest // provides JobLauncherTestUtils, JobRepositoryTestUtils
@SpringBootTest
@ActiveProfiles("test")
class ImportUsersJobTest {
@Autowired private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired private UserRepository userRepository;
@BeforeEach
void cleanBatchMetadata() {
// Remove previous job executions from in-memory H2 so tests don't conflict
jobRepositoryTestUtils.removeJobExecutions();
}
@Test
void importUsersJob_completes_and_persists_users() throws Exception {
// Provide job parameters (e.g. point to a test CSV file)
JobParameters params = new JobParametersBuilder()
.addString("filePath", "classpath:test-data/users.csv")
.addLocalDateTime("runAt", LocalDateTime.now())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Assert job status
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
// Assert step metrics
StepExecution stepExecution = execution.getStepExecutions().iterator().next();
assertThat(stepExecution.getReadCount()).isEqualTo(5);
assertThat(stepExecution.getWriteCount()).isEqualTo(4); // 1 filtered by processor
assertThat(stepExecution.getSkipCount()).isEqualTo(0);
// Assert business outcome
assertThat(userRepository.count()).isEqualTo(4);
}
@Test
void importUsersJob_skips_invalid_rows_and_completes() throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("filePath", "classpath:test-data/users-with-corrupt-rows.csv")
.addLocalDateTime("runAt", LocalDateTime.now())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
StepExecution step = execution.getStepExecutions().iterator().next();
assertThat(step.getSkipCount()).isGreaterThan(0);
assertThat(step.getWriteCount()).isGreaterThan(0); // some rows still succeeded
}
@Test
void testStepInIsolation() throws Exception {
// Test a single step without running the full job
JobExecution execution = jobLauncherTestUtils.launchStep("processCsvStep");
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
}
Common Mistakesโ
| Mistake | Problem | Fix |
|---|---|---|
JdbcCursorItemReader in a multi-threaded Step | Multiple threads read the same row โ data corruption | Use JdbcPagingItemReader or wrap with SynchronizedItemStreamReader |
FlatFileItemReader in a multi-threaded Step | Concurrent file position access โ corrupted reads | Wrap with SynchronizedItemStreamReader |
Missing @StepScope on partitioned readers | All workers share one reader instance โ same ID range processed by all | Add @StepScope and use @Value("#{stepExecutionContext[...]}") |
| Very large chunk size (> 1000) for complex items | OOM risk; long-held DB locks causing deadlocks with live traffic | Tune to 100โ500; measure heap per item |
@Modifying bulk updates inside a Step processor | Bypasses L1 cache โ stale entity reads after bulk update | Use clearAutomatically = true or avoid mixing bulk DML with entity reads |
| Re-running a job with identical parameters | Spring Batch rejects it โ JobInstanceAlreadyCompleteException | Add a unique runAt timestamp parameter or use JobParametersIncrementer |
No SkipListener with skipLimit | Skipped items are silently discarded โ no audit trail | Always add a SkipListener that writes to a dead-letter table |
Not enabling reWriteBatchedInserts (PostgreSQL) | Individual INSERT per row despite using batch writer โ 10ร slower | Add reWriteBatchedInserts=true to the JDBC URL |
๐ฏ Interview Questionsโ
Q1. What is Spring Batch and how does it differ from @Scheduled tasks?
@Scheduledis a trigger mechanism โ it runs a method at a time interval. Spring Batch is a processing engine for large-volume data with built-in chunk transactions, restartability, skip/retry, and execution metadata.@Scheduledcan trigger a Spring Batch job, but Spring Batch does the actual work. A@Scheduledmethod that loops over a million records has no transaction management, no progress tracking, and no restart capability โ if it crashes at item 900,000 you start from zero.
Q2. What is chunk-oriented processing and why is it better than processing one record at a time?
Chunk processing groups N records into one database transaction. One-at-a-time processing means one commit per record โ 1 million records = 1 million commits, each with transaction overhead. One-giant-transaction means locking the table for the entire job duration and losing all work on failure. Chunking is the middle ground: every N records commit together โ fewer commits than per-record, shorter lock windows than one transaction, and on failure only the current chunk rolls back, not the entire job.
Q3. What is the JobRepository and why is it mandatory?
The
JobRepositorypersists Job and Step execution metadata to a relational database. Before processing starts, it records the Job instance and parameters. After every successful chunk, it updates the Step's commit count. On restart after a crash, Spring Batch reads this metadata to know exactly which chunk committed last and resumes from there. Without it there is no restart, no deduplication (running the same job twice), and no execution history.
Q4. Why can't you use JdbcCursorItemReader in a multi-threaded Step?
JdbcCursorItemReaderholds a single JDBCResultSetcursor on a database connection. The cursor has a current position (the next row to return). Multiple threads callingread()concurrently will advance the same cursor โ some threads will read the same row, others will skip rows, producing duplicate and missing data. The fix isJdbcPagingItemReader(uses independentLIMIT/OFFSETqueries per call โ inherently thread-safe) or wrapping withSynchronizedItemStreamReader(serialises all read calls with a lock).
Q5. Explain partitioning and when you would use it over a multi-threaded Step.
Partitioning splits a dataset into non-overlapping ranges and assigns each to an independent worker step with its own reader, processor, and writer. A multi-threaded Step runs multiple threads sharing one reader on the same JVM. Partitioning is used when: (1) the dataset is so large that one JVM's thread count isn't enough, (2) you need remote partitioning across multiple servers for horizontal scaling, or (3) the ItemReader is not safely shareable across threads and partitioning is cleaner than synchronization. Each worker in a partitioned step is a completely independent processing unit with its own transaction boundary.
Q6. (Senior) What is the risk of a large chunk size, and how do you determine the optimal value?
Large chunk sizes hold a database transaction open for longer โ increasing lock contention with concurrent user traffic and risking deadlocks. They also load more objects into the JVM heap simultaneously โ risking OutOfMemoryError for large objects. Optimal chunk size depends on: (1) item size in bytes (smaller items โ larger chunks), (2) processor speed (slower processor โ smaller chunks to reduce lock duration), (3) acceptable lock duration (target 1โ5 seconds per chunk). Start at 100, measure heap usage per thread ร number of threads, and tune upward while monitoring P99 write latency on the target table.
Q7. (Senior) How does AsyncItemProcessor improve throughput, and what is its trade-off?
AsyncItemProcessorsubmits each item to a thread pool and returns aFuture<O>immediately โ the calling thread then moves on to submit the next item rather than blocking. TheAsyncItemWriterreceives a chunk ofFutures, waits for all to resolve, and bulk-writes the results. This parallelises the processor stage โ 100 items with a 200ms API call each take 20 seconds synchronously but ~2 seconds with a 10-thread pool (10ร improvement). Trade-offs: the generic type becomesFuture<O>which complicates wiring; if any future in a chunk throws, the entire chunk fails; and thread pool size must be tuned against the external dependency's rate limits and your connection pool capacity.