배치 수행 시, 대용량 처리를 위해 Job을 멀티스레드 기반으로 병행처리하는 과정을 보여준다. 실행 유형별로 멀티쓰레드 방식, Parallel 방식, 파티셔닝 방식 등이 있다.
병행처리멀티스레드(Multi-threaded Step)은 하나의 step을 멀티쓰레드로 처리하는 방식이다. chunk단위로 각 쓰레드에서 병행으로 처리한다.
멀티스레드 처리를 원하는 step의 tasklet에 비동기 설정을 한다.
<job id="parallelJob" xmlns="http://www.springframework.org/schema/batch"> <step id="staging" next="loading"> <tasklet> <chunk reader="fileItemReader" processor="validatingProcessor" writer="stagingItemWriter" commit-interval="2"/> </tasklet> </step> <step id="loading"> <tasklet task-executor="taskExecutor"> <chunk reader="stagingReader" processor="stagingProcessor" writer="tradeWriter" commit-interval="3"/> </tasklet> </step> </job> <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
병행처리멀티스레드 설정과 관련 클래스들로 Junit Test를 수행한다. 이 때 배치가 수행되고, 관련된 내용을 확인할 수 있다.
✔ JunitTest 클래스의 구조는 배치실행환경 예제 Junit Test 설명을 참고한다.
✔ assertEquals(BatchStatus.COMPLETED, execution.getStatus()) : 배치수행결과가 COMPLETED 인지 확인한다.
✔ assertEquals(after - before, execution.getStepExecutions().iterator().next().getReadCount()) : BATCH_STAGING의 data와 stepexcution의 결과를 비교하여 staging의 step의 결과를 확인한다.
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "/egovframework/batch/simple-job-launcher-context.xml", "/egovframework/batch/jobs/parallelJob.xml", "/egovframework/batch/job-runner-context.xml" }) public class EgovParallelJobFunctionalTests { //배치작업을 test하기 위한 JobLauncherTestUtils @Autowired private JobLauncherTestUtils jobLauncherTestUtils; //DB 사용을 위한 SimpleJdbcTemplate private SimpleJdbcTemplate jdbcTemplate; ... @Test public void testLaunchJob() throws Exception { int before = SimpleJdbcTestUtils.countRowsInTable(jdbcTemplate, "BATCH_STAGING"); JobExecution execution = jobLauncherTestUtils.launchJob(); int after = SimpleJdbcTestUtils.countRowsInTable(jdbcTemplate, "BATCH_STAGING"); assertEquals(BatchStatus.COMPLETED, execution.getStatus()); assertEquals(after - before, execution.getStepExecutions().iterator().next().getReadCount()); } }
수행방법은 JunitTest 실행을 참고한다.
parallelStep이란 분리된 flow을 각 thread에서 병행으로 처리하는 방식이다. 두 개의 flow1, flow2 는 각각 thread에서 병행으로 처리된다.
✔ split 태그에 비동기 설정이 있어야 병행처리가 가능하다.
<job id="parallelStep" xmlns="http://www.springframework.org/schema/batch"> <split id="split1" task-executor="taskExecutor" next="step4"> <flow> <step id="step1" next="step2"> <tasklet> <chunk reader="itemReader" writer="itemWriter1" commit-interval="1" /> </tasklet> </step> <step id="step2"> <tasklet> <chunk reader="itemReader" writer="itemWriter2" commit-interval="2" /> </tasklet> </step> </flow> <flow> <step id="step3"> <tasklet> <chunk reader="itemReader" writer="itemWriter3" commit-interval="2" /> </tasklet> </step> </flow> </split> <step id="step4"> <tasklet> <chunk reader="itemReader" writer="itemWriter4" commit-interval="2" /> </tasklet> </step> </job> <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
parallelstep 설정과 관련 클래스들로 Junit Test를 수행한다. 이 때 배치가 수행되고, 관련된 내용을 확인할 수 있다.
✔ JunitTest 클래스의 구조는 배치실행환경 예제 Junit Test 설명을 참고한다.
✔ assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()): 배치수행결과가 COMPLETED 인지 확인한다.
✔ getUniqueJobParameters에서 JobParameter에 배치에 필요한 입력 리소스, 출력 리소스 위치정보를 넘긴다.
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations ={"/egovframework/batch/simple-job-launcher-context.xml", "/egovframework/batch/jobs/parallelStep.xml","/egovframework/batch/job-runner-context.xml"}) public class EgovParallelStepFunctionalTests{ //배치작업을 test하기 위한 JobLauncherTestUtils @Autowired private JobLauncherTestUtils jobLauncherTestUtils; /** * 배치작업 테스트 */ @Test public void testLaunchJob() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(this.getUniqueJobParameters()); assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()); // /target/test-outputs/parallelStep폴더의 output파일 확인 } /** * 잡파라미터를 설정하기 위한 메소드 * @return jobParameters */ protected JobParameters getUniqueJobParameters() { return new JobParametersBuilder().addString("inputFile","/egovframework/data/input/delimited.csv") .addString("outputFile1","file:./target/test-outputs/parallelStep/delimitedOutput1.csv") .addString("outputFile2","file:./target/test-outputs/parallelStep/delimitedOutput2.csv") .addString("outputFile3","file:./target/test-outputs/parallelStep/delimitedOutput3.csv") .addString("outputFile4","file:./target/test-outputs/parallelStep/delimitedOutput4.csv") .addParameter("timestamp", new JobParameter(new Date().getTime())).toJobParameters(); } }
수행방법은 JunitTest 실행을 참고한다.
split된 두 개의 flow가 멀티쓰레드(SimpleAsyncTaskExecutor-1,SimpleAsyncTaskExecutor-2)로 실행한 결과는 콘솔창의 로그내용으로 확인할 수 있다. step1과 step3은 서로 다른 쓰레드에서 실행한 사실을 확인할 수 있다.
…
파티션 정의 File, DB의 데이터를 처리하는 병행처리 방법의 하나로 File 데이터, DB 데이터를 Partition 하여 멀티쓰레드 방식으로 처리한다. 입력리소스에 따라 DB 파티셔닝예제와 과 File 파티셔닝예제를 보여주고, 여러리소스를 읽어 하나의 타켓파일에 쓰는 SingleFile 파티셔닝 예제를 보여준다.
입력 리소스 타입 | 관계 | 예제 |
---|---|---|
DB | DB Partition 예제 | |
File | N:N | N:N Partition 예제 |
File | N:1 | N:1 Partition 예제 |