====== 대용량 처리를 위한 멀티스레드 기반의 병행처리 예제 ======
===== 개요 =====
배치 수행 시, 대용량 처리를 위해 Job을 멀티스레드 기반으로 병행처리하는 과정을 보여준다. 실행 유형별로 멀티쓰레드 방식, Parallel 방식, 파티셔닝 방식 등이 있다.
===== 설명 =====
==== 멀티쓰레드(Multi-threaded Step) 예제 ====
병행처리멀티스레드(Multi-threaded Step)은 하나의 step을 멀티쓰레드로 처리하는 방식이다. chunk단위로 각 쓰레드에서 병행으로 처리한다.
===Job설정===
== 병행처리멀티스레드 예제의 Job 설정파일인 parallelJob.xml을 확인한다.==
멀티스레드 처리를 원하는 step의 tasklet에 비동기 설정을 한다.
===JunitTest 구성 및 수행===
== JunitTest 구성 ==
병행처리멀티스레드 설정과 관련 클래스들로 Junit Test를 수행한다. 이 때 배치가 수행되고, 관련된 내용을 확인할 수 있다.
\\ ✔ JunitTest 클래스의 구조는 [[egovframework:rte2:brte:batch_example:run_junit_Test|배치실행환경 예제 Junit Test 설명]]을 참고한다.
\\ ✔ assertEquals(BatchStatus.COMPLETED, execution.getStatus()) : 배치수행결과가 COMPLETED 인지 확인한다.
\\ ✔ assertEquals(after - before, execution.getStepExecutions().iterator().next().getReadCount()) : BATCH_STAGING의 data와 stepexcution의 결과를 비교하여 staging의 step의 결과를 확인한다.
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/egovframework/batch/simple-job-launcher-context.xml", "/egovframework/batch/jobs/parallelJob.xml",
"/egovframework/batch/job-runner-context.xml" })
public class EgovParallelJobFunctionalTests {
//배치작업을 test하기 위한 JobLauncherTestUtils
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
//DB 사용을 위한 SimpleJdbcTemplate
private SimpleJdbcTemplate jdbcTemplate;
...
@Test
public void testLaunchJob() throws Exception {
int before = SimpleJdbcTestUtils.countRowsInTable(jdbcTemplate, "BATCH_STAGING");
JobExecution execution = jobLauncherTestUtils.launchJob();
int after = SimpleJdbcTestUtils.countRowsInTable(jdbcTemplate, "BATCH_STAGING");
assertEquals(BatchStatus.COMPLETED, execution.getStatus());
assertEquals(after - before, execution.getStepExecutions().iterator().next().getReadCount());
}
}
== JunitTest 수행==
수행방법은 [[egovframework:dev2:tst:test_case#Test Case 실행| JunitTest 실행]]을 참고한다.
=== 결과확인 ===
== 멀티쓰레드로 병행처리한 setp은 쓰레드마다의 처리 속도차이에 의해 순차적인 처리가 되지 않는다. loading의 step 결과인 TRADE의 data를 보면 확인할 수 있다. ==
{{:egovframework:rte2:brte:batch_example:multithread_data.png|}}
==== Parallel 예제 ====
parallelStep이란 분리된 flow을 각 thread에서 병행으로 처리하는 방식이다. 두 개의 flow1, flow2 는 각각 thread에서 병행으로 처리된다.
===Job설정===
==ParallelStep 예제의 Job 설정파일인 parallelStep.xml을 확인한다.==
✔ split 태그에 비동기 설정이 있어야 병행처리가 가능하다.
=== JunitTest 구성 및 수행 ===
== JunitTest 구성 ==
parallelstep 설정과 관련 클래스들로 Junit Test를 수행한다. 이 때 배치가 수행되고, 관련된 내용을 확인할 수 있다.
\\ ✔ JunitTest 클래스의 구조는 [[egovframework:rte2:brte:batch_example:run_junit_Test|배치실행환경 예제 Junit Test 설명]]을 참고한다.
\\ ✔ assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()): 배치수행결과가 COMPLETED 인지 확인한다.
\\ ✔ getUniqueJobParameters에서 JobParameter에 배치에 필요한 입력 리소스, 출력 리소스 위치정보를 넘긴다.
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations ={"/egovframework/batch/simple-job-launcher-context.xml", "/egovframework/batch/jobs/parallelStep.xml","/egovframework/batch/job-runner-context.xml"})
public class EgovParallelStepFunctionalTests{
//배치작업을 test하기 위한 JobLauncherTestUtils
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
/**
* 배치작업 테스트
*/
@Test
public void testLaunchJob() throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob(this.getUniqueJobParameters());
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
// /target/test-outputs/parallelStep폴더의 output파일 확인
}
/**
* 잡파라미터를 설정하기 위한 메소드
* @return jobParameters
*/
protected JobParameters getUniqueJobParameters() {
return new JobParametersBuilder().addString("inputFile","/egovframework/data/input/delimited.csv")
.addString("outputFile1","file:./target/test-outputs/parallelStep/delimitedOutput1.csv")
.addString("outputFile2","file:./target/test-outputs/parallelStep/delimitedOutput2.csv")
.addString("outputFile3","file:./target/test-outputs/parallelStep/delimitedOutput3.csv")
.addString("outputFile4","file:./target/test-outputs/parallelStep/delimitedOutput4.csv")
.addParameter("timestamp", new JobParameter(new Date().getTime())).toJobParameters();
}
}
==JunitTest 수행==
수행방법은 [[egovframework:dev2:tst:test_case#Test Case 실행| JunitTest 실행]]을 참고한다.
=== 결과확인 ===
split된 두 개의 flow가 멀티쓰레드(SimpleAsyncTaskExecutor-1,SimpleAsyncTaskExecutor-2)로 실행한 결과는 콘솔창의 로그내용으로 확인할 수 있다. step1과 step3은 서로 다른 쓰레드에서 실행한 사실을 확인할 수 있다.
{{:egovframework:rte2:brte:batch_example:parallelstep_step1.png|}}
** ...**
{{:egovframework:rte2:brte:batch_example:parallelstep_step3.png|}}
==== 파티셔닝(Partitioning) 예제====
=== ===
파티션 정의
File, DB의 데이터를 처리하는 병행처리 방법의 하나로 File 데이터, DB 데이터를 Partition 하여 멀티쓰레드 방식으로 처리한다. 입력리소스에 따라 DB 파티셔닝예제와 과 File 파티셔닝예제를 보여주고, 여러리소스를 읽어 하나의 타켓파일에 쓰는 SingleFile 파티셔닝 예제를 보여준다.
^입력 리소스 타입^관계^예제^
|DB| |[[egovframework:rte2:brte:batch_example:db_partition|DB Partition 예제]]|
|File|N:N|[[egovframework:rte2:brte:batch_example:file_partition|N:N Partition 예제]]|
|File|N:1|[[egovframework:rte2:brte:batch_example:single_file_partition|N:1 Partition 예제]]|
===== 참고자료 =====
* [[egovframework:rte2:brte:batch_core:parallel_process|병행처리]]