Do you want to learn Spring Batch to efficiently upload millions of records without human intervention using Spring Boot?
If yes, let's go through the process and ways of implementing batch processing in your application. This will enhance your knowledge and show your skills in your programming career.
As discussed in the previous article, where I described what and how spring batch is, in this article, let us go through the steps for uploading a huge CSV file with about 500,000 data in your schema in minutes.
Before starting the implementation if you don’t know what batch processing is or what is it’s architecture read the topic here. Batch Processing
Step 1: Adding Dependencies
Now, Let us start the implementation. Today we will be seeing how we can get a spring batch in spring boot using maven.
To use batch processing in spring boot first we need to add few dependencies. To get the dependency you can go to this links below to get the packages.
You can add the dependencies depending upon your springboot version and java version Or you can add the dependency given below in your pom.xml file directly.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
Once you have added the dependency and synced your project to adapt with the new added dependency let’s start our code.
Step 2: Configuration
Firstly open your application.properties file or application.yml file as per your setup and let us add some configuration to start using it.
spring:
batch:
jdbc:
initialize-schema: always
job:
enabled: false
spring.batch.jdbc.initialize-schema: always
spring.batch.job.enabled: false
Now let us know what exactly this configuration do.
The first property i.e. spring.batch.jdbc.initiaize-schema: always
is telling spring boot to create the necessary spring batch configuration tables in the database.
Similarly the second property i.e. spring.batch.job.enabled: false
is telling out application that we don’t want the job to be initialized as soon as the application starts.
After adding the above configuration for spring batch let us add the configuration related to the database for the example I am using a MySql database so let’s add the configuration for connecting to the database.
datasource:
url: jdbc:mysql://localhost:3306/your_db_name
username: your_username
password: your_password
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: 'update'
show-sql: true
properties:
format_sql: true
database: mysql
database-platform: org.hibernate.dialect.MySQLDialect
spring.datasource.url=jdbc:mysql://localhost:3306/<your_database_name>
spring.datasource.username=your_username
spring.datasource.password=your_password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQLDialect
Once the configuration are updated now we are ready to write our code for processing bulk data and converting it as per our need. Let’s jump into it.
Step 3: ItemReader Setup
As explained in the previous article we have multiple layers which we need to configure to get the batch process up and running.
Lets start first with defining the configuration class and let’s start from setting up our Item Reader.
Let’s start by creating a java class which will be our configuration class for spring batch processing code.
@Configuration
public class SpringBatchConfig {}
Once we create the class and mark it as a configuration file using the @Configuration annotation we are telling our application that this java class contains configuration code so treat it as a configuration file for the application.
As we want to read the data from a csv file and add it in the the database we will be using a class called FlatFileItemReader to read the data from a csv file. Assuming the csv file is stored in your resources folder and looks like the sample below.
Index,Customer Id,First Name,Last Name,Company,City,Country,Phone 1,Phone 2,Email,Subscription Date,Website
1,e685B8690f9fbce,Erik,Little,Blankenship PLC,Caitlynmouth,Sao Tome and Principe,457-542-6899,055.415.2664x5425,shanehester@campbell.org,2021-12-23,https://wagner.com/
2,6EDdBA3a2DFA7De,Yvonne,Shaw,Jensen and Sons,Janetfort,Palestinian Territory,9610730173,531-482-3000x7085,kleinluis@vang.com,2021-01-01,https://www.paul.org/
3,b9Da13bedEc47de,Jeffery,Ibarra,"Rose, Deleon and Sanders",Darlenebury,Albania,(840)539-1797x479,209-519-5817,deckerjamie@bartlett.biz,2020-03-30,https://www.morgan-phelps.com/
4,710D4dA2FAa96B5,James,Walters,Kline and Sons,Donhaven,Bahrain,+1-985-596-1072x3040,(528)734-8924x054,dochoa@carey-morse.com,2022-01-18,https://brennan.com/
Let us write our ItemReader.
@Configuration
public class SpringBatchConfig {
@Bean
public FlatFileItemReader<User> itemReader(){
FlatFileItemReader<User> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new ClassPathResource(“customerDataSample.csv”));
itemReader.setName("studentDataReader");
itemReader.setLinesToSkip(1);
itemReader.setLineMapper(lineMapper());
return itemReader;
}
}
In the above reader let us go through the code and understand what each line means and what is happening.
Here since we are reading the data from a csv file we created a bean of FlatFileItemReader which takes a generic where the User is the entity class defined in our code. We will design it in a minute.
In a FlatFileItemReader instance we need to tell it from where or which file to read the data from in our case the name is customerDataSample.csv in our resources folder. We first tell the item reader where to read the data from by using the setResource() method.
Then the setName() method is setting the name for our item reader.
As the data is in a csv file the first line will be the column names which we don’t want to add in the database so to start reading data from the 2nd line and skip the 1st line we are using setLinesToSkip(<number of lines to skip in the file>)
As we know that the reader needs to map the contents of the csv file so to separate it and use it we use the property called setLineMapper() where we are passing a method that we will create to break the line and pass it to LineMapper.
@Configuration
public class SpringBatchConfig {
@Bean
public FlatFileItemReader<User> itemReader(){
FlatFileItemReader<User> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new ClassPathResource(“customerDataSample.csv”));
itemReader.setName("studentDataReader");
itemReader.setLinesToSkip(1);
itemReader.setLineMapper(lineMapper());
return itemReader;
}
private LineMapper<User> lineMapper() {
DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
// Setup delimeted Line tokenizer for processing the csv data
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setDelimiter(",");
delimitedLineTokenizer.setStrict(false);
// The names must match with the one defined on the entity class.
delimitedLineTokenizer.setNames("id", "customerId", "firstName", "lastName", "company", "city", "country", "phone1", "phone2", "email", "subscriptionDate", "Website");
lineMapper.setLineTokenizer(delimitedLineTokenizer);
lineMapper.setFieldSetMapper(new UserFieldSetMapper());
return lineMapper;
}
}
Step 4: User Entity & UserFieldSetMapper setup
Until this point we wrote the code for the item reader and used the line tokenizer to basically process the data in the csv. Now let us write our User entity class as well as our UserFieldSetMapper class which will be helping us with converting the data into multiple types as per the entity class.
package com.samyam.etl.demo.DataExtractionService.Models;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.Data;
import java.time.LocalDate;
@Entity
@Table(name = "USER_DATA")
@Data
public class User {
@Id
private Integer id;
private String customerId;
private String firstName;
private String lastName;
private String company;
private String city;
private String country;
private String phone1;
private String phone2;
private String email;
private LocalDate subscriptionDate;
private String website;
}
Once defined the class let us add another class UserFieldSetMapper which will be mapping the data coming from the csv into our User entity.
public class UserFieldSetMapper implements FieldSetMapper<User> {
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
User user = new User();
user.setId(fieldSet.readInt("id"));
user.setCustomerId(fieldSet.readString("customerId"));
user.setFirstName(fieldSet.readString("firstName"));
user.setLastName(fieldSet.readString("lastName"));
user.setCompany(fieldSet.readString("company"));
user.setCity(fieldSet.readString("city"));
user.setCountry(fieldSet.readString("country"));
user.setPhone1(fieldSet.readString("phone1"));
user.setPhone2(fieldSet.readString("phone2"));
user.setEmail(fieldSet.readString("email"));
user.setSubscriptionDate(
LocalDate.parse(fieldSet.readString("subscriptionDate"),DATE_FORMATTER));
user.setWebsite(fieldSet.readString("Website"));
return user;
}
Once we added the mapper class which will transform the string data coming from csv to different data type our ItemReader is ready now let us move with writing our ItemProcessor.
Step 5: ItemProcessor Setup
Let us start by creating a class which implements ItemProcessor interface which takes two generics
I.e. ItemProcessor<DataComingAs, DataConvertingTo>
public class UserDataProcessor implements ItemProcessor<User, User> {
@Override
public User process(User user) throws Exception {
// Transformation / business logic goes here
return user;
}
}
In the item processor class we are not doing any transformation.
Now let’s go back to our configuration class and add the bean for ItemProcessor and pass our custom class implementing the ItemProcessor above.
// Item Processor
@Bean
public UserDataProcessor processor(){
return new UserDataProcessor();
}
Just by adding the above line of code we wrote our item processor as well now it is time for us to Load the data in our database. To do that we still have to write our ItemWriter bean and pass it in the Step. Let us get that done next.
Step 6: ItemWriter Setup
// Item Writer
@Bean
public RepositoryItemWriter<User> writer(){
RepositoryItemWriter<User> writer = new RepositoryItemWriter<>();
writer.setRepository(userRepository);
writer.setMethodName("save");
return writer;
}
Here we will be using the RepositoryItemWriter which will let us write the processed data into the database and the setMethodName() method is saying we are trying to create the record from the processed data into the database and we are passing the userRepository interface which will be an interface which extends JpaRepository.
public interface UserRepository extends JpaRepository<User,Integer> { }
With the code above our ItemReader, ItemProcessor and ItemWriter Beans are ready to perform the operation. But we still need to pass it to the step which is the parent for them.
Step 7: Step Setup
Let us inject them in the Step next.
@Bean
public Step importStep(){
return new StepBuilder("studentDataReader",jobRepository)
.<User,User>chunk(5000, platformTransactionManager)
.reader(itemReader())
.processor(processor())
.writer(writer())
.build();
}
We create a bean which returns Step as shown above where chunk is a generic which tells it what we are getting and what we are converting it into in our case it’s user entity. And we just simply pass the **reader, processor ** and writer.
Step 8: Job Setup
With the step setup we are almost done with the code for batch processing of our huge data file but we are not done. If you remember from the previous article a Step is a part of Job which wil call the step and the step will run the reader, writer and processor. Let’s now create our Job bean. You can add the job by defining a bean of Job and passing the step in it like shown below.
@Bean
public Job runJob(){
return new JobBuilder("importUsersData", jobRepository)
.start(importStep())
.build();
}
Here the importUsersData is the name of the job which can be useful if you have multiple jobs to be executed where you can use @Qualifiers() to basically tell which job to call or give information to the application.
Configuration Class Final Code
With this our configuration class code looks like this.
package com.samyam.etl.demo.DataExtractionService.Config;
import com.samyam.etl.demo.DataExtractionService.Mapper.UserFieldSetMapper;
import com.samyam.etl.demo.DataExtractionService.Models.User;
import com.samyam.etl.demo.DataExtractionService.Processors.UserDataProcessor;
import com.samyam.etl.demo.DataExtractionService.Repositories.UserRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
public class SpringBatchConfig {
private final UserRepository userRepository;
private final JobRepository jobRepository;
private final PlatformTransactionManager platformTransactionManager;
// Item Reader
@Bean
public FlatFileItemReader<User> itemReader(){
FlatFileItemReader<User> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new ClassPathResource(“customerDataSample.csv”));
itemReader.setName("studentDataReader");
itemReader.setLinesToSkip(1);
itemReader.setLineMapper(lineMapper());
return itemReader;
}
private LineMapper<User> lineMapper() {
DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
// Setup delimeted Line tokenizer for processing the csv data
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setDelimiter(",");
delimitedLineTokenizer.setStrict(false);
// The names must match with the one defined on the entity class.
delimitedLineTokenizer.setNames("id", "customerId", "firstName", "lastName", "company", "city",
"country", "phone1", "phone2", "email", "subscriptionDate", "Website");
lineMapper.setLineTokenizer(delimitedLineTokenizer);
lineMapper.setFieldSetMapper(new UserFieldSetMapper());
return lineMapper;
}
// Item Processor
@Bean
public UserDataProcessor processor(){
return new UserDataProcessor();
}
// Item Writer
@Bean
public RepositoryItemWriter<User> writer(){
RepositoryItemWriter<User> writer = new RepositoryItemWriter<>();
writer.setRepository(userRepository);
writer.setMethodName("save");
return writer;
}
/*
* Configuration for Step Builder
* */
@Bean
public Step importStep(){
return new StepBuilder("studentDataReader",jobRepository)
.<User,User>chunk(5000, platformTransactionManager)
.reader(itemReader())
.processor(processor())
.writer(writer())
.build();
}
/*
* Configuration for Job setup
* */
@Bean
public Job runJob(){
return new JobBuilder("importUsersData", jobRepository)
.start(importStep())
.build();
}
}
Step 9: Rest API Controller
Now we are ready to write a simple api endpoint which will trigger the job and the data will start processing in a faster way. Let us create a simple rest endpoint which will trigger the job. As we recall the architecture of batch everything is interacting with JobRepository and the Job can be triggered using a job launcher we will be using the JobLauncher interface to inject it as shown below
package com.samyam.etl.demo.DataExtractionService.Controller;
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpStatus;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserDataImporter {
private final Job job;
private final JobLauncher jobLauncher;
@PostMapping("/import-from-csv")
public ResponseEntity<HashMap<String, Object>> importUserDataFromCSV(){
HashMap<String, Object> response = new HashMap<>(
Map.of("success", false,
"message", "Error Loading Data"
)
);
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("startAt", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(job,jobParameters);
response.put("success",true);
response.put("message", "Data loaded successfully");
return ResponseEntity.status(HttpStatus.SC_OK).body(response);
} catch (JobExecutionAlreadyRunningException
| JobRestartException
| JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
response.put("message", "Error: " + e.getMessage());
return ResponseEntity.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).body(response);
}
}
}
Note: JobLauncher expects two argument i.e. Job and JobParameters where job parameters has the information about when to trigger the job for the example I am initiating the job as soon as the api is called by using addLong() method where we tell the launcher to start the job right away.
By following these steps we just completed our Spring Batch processing example where we were able to read the data from a csv file using ItemReader, process the data using our custom ItemProcessor class and Write it using the ItemReader.
Step 10: Batch Processing Enhancement (Optional)
We can enhance the performance even more for the processing by using writing another bean called TaskExecutor where we can run multiple concurrent tasks at the same time. You can do it by modifying your configuration file like shown below.
@Bean
public Step importStep(){
return new StepBuilder("studentDataReader",jobRepository)
.<User,User>chunk(5000, platformTransactionManager)
.reader(itemReader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor()) // Add this code in the importStep method.
.build();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
// here 10 refers to number of threads to process at a time. Depending upon your processors set it as per // the need as misconfiguration might lead to slowing down your laptop or server.
asyncTaskExecutor.setConcurrencyLimit(10);
return asyncTaskExecutor;
}
Hope this guide enhances your understanding of batch processing and helps you implement it effectively in your projects.