일괄(배치)처리 작업 수행 시, 작업처리가 종료될 때까지 대기하는 동기방식 처리와 작업처리의 종료를 Callback매커니즘을 이용하여 전달받는 비동기처리에 대한 예제를 보여준다.
Job 수행시, 동기와 비동기 방식으로 데이터를 처리할 수 있으며, 이 예제에서는 동기 처리가 기본값으로 설정되어 있다. 설정위치는 Launcher 설정파일의 jobLauncher 빈에서 taskExecutor 프로퍼티이며, 참조하는 값으로 다음 두 가지를 설정할 수 있다.
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> <property name="taskExecutor" ref="sync"/> <!-- 비동기시 ref="async" --> </bean> <!-- 동기 처리시 sync --> <bean id="sync" class="org.springframework.core.task.SyncTaskExecutor" /> <!-- 비동기 처리시 async --> <bean id="async" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
동기/비동기 처리 예제를 위해 특별히 Job을 설정하는 내용은 없다. 이 예제에서 제공하는 Job의 상세 내용은 기존 업무 재사용 예제의 Job 설정과 같으므로 이를 참고한다.
<job id="delegateJob" xmlns="http://www.springframework.org/schema/batch"> <step id="delegateStep1"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="3"/> </tasklet> </step> </job> <bean id="reader" class="org.springframework.batch.item.adapter.ItemReaderAdapter"> <property name="targetObject" ref="delegateObject" /> <property name="targetMethod" value="getData" /> </bean> <bean id="writer" class="org.springframework.batch.item.adapter.PropertyExtractingDelegatingItemWriter"> <property name="targetObject" ref="delegateObject" /> <property name="targetMethod" value="processPerson" /> <property name="fieldsUsedAsTargetMethodArguments"> <list> <value>firstName</value> <value>address.city</value> </list> </property> </bean> <bean id="delegateObject" class="egovframework.brte.sample.common.domain.person.PersonService" />
Item Processor를 비동기 처리하기 위해 Spring Batch에서 AsyncItemProcessor 서비를 지원한다.
AsyncItemProcessor 서비스를 이용한 설정은 아래와 같다.
<task:executor id="taskExecutor" pool-size="100"/> <bean id="itemProcessorAsync" class="org.springframework.batch.integration.async.AsyncItemProcessor"> <!-- delegate통해 실제 동작 할 Item Processor를 설정한다. --> <property name="delegate" ref="fixedLengthToFixedLengthJob.fixedLengthToFixedLengthStep.itemProcessor"/> <!-- Executor를 설정한다. --> <property name="taskExecutor" ref="taskExecutor" /> </bean>
✔ JunitTest 클래스의 구조는 배치실행환경 예제 Junit Test 설명을 참고한다.
✔ assertEquals(“COMPLETED”, jobExecution.getExitStatus().getExitCode()) : 배치수행결과가 COMPLETED 인지 확인한다.
✔ Thread.sleep(4000) : 비동기로 배치를 수행 시, DB에 배치상태(UNKNOWN)를 셋팅하고 DB연결이 종료되어 Job이 정상적으로 수행되더라도 종료상태(COMPLETED,FAILED)를 확인할 수 없다. 예제에서는 Job결과를 확인하기 위해 Thread를 적정시간동안 정지시켜 인위적으로 종료상태를 확인하도록 설정하였다.
@ContextConfiguration(locations = { "/egovframework/batch/sync-job-launcher-context.xml", "/egovframework/batch/jobs/delegatingJob.xml", "/egovframework/batch/job-runner-context.xml" }) public class EgovSyncDelegatingJobFunctionalTests { ... @Test public void testLaunchJob() throws Exception { JobExecution jobExecution=null; try{ jobExecution =jobLauncherTestUtils.launchJob(); //Async 로 수행되는 경우 Exit Status는 UNKNOWN으로 설정 됨 assertEquals("UNKNOWN", jobExecution.getExitStatus().getExitCode()); Thread.sleep(4000); }catch (InterruptedException ie){ ie.printStackTrace(); } assertTrue(personService.getReturnedCount() > 0); assertEquals(personService.getReturnedCount(), personService.getReceivedCount()) ; assertEquals("COMPLETED", jobExecution.getExitStatus().getExitCode()); } }
수행방법은 JunitTest 실행을 참고한다.