<job id="partitionJob" restartable="true" xmlns="http://www.springframework.org/schema/batch"> <step id="step"> <partition step="step1" partitioner="partitioner"> <handler grid-size="4" task-executor="taskExecutor" /> </partition> </step> </job>
<bean id="partitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"> <property name="resources" value="classpath:egovframework/data/input/delimited*.csv" /> </bean>
<step id="step1" xmlns="http://www.springframework.org/schema/batch"> <tasklet transaction-manager="transactionManager"> <chunk writer="itemWriter" reader="itemReader" commit-interval="5" /> <listeners> <listener ref="fileNameListener" /> </listeners> </tasklet> </step>
<bean id="fileNameListener" class="egovframework.brte.sample.example.listener.EgovOutputFileListener" scope="step"> <property name="path" value="file:./target/test-outputs/partition/file/" /> </bean>
스프링에서 제공하는 MultiResourcePartitioner 를 사용하며 입력리소스의 수(입력파일 수)만큼 Context를 생성하고 출력위치를 셋팅하는 역할을 한다.(Context 생성으로 여러 Slave Step들을 사용할 수 있게 됨)
public class MultiResourcePartitioner implements Partitioner { ... public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize); int i = 0; for (Resource resource : resources) { ExecutionContext context = new ExecutionContext(); Assert.state(resource.exists(), "Resource does not exist: "+resource); try { context.putString(keyName, resource.getURL().toExternalForm()); } catch (IOException e) { throw new IllegalArgumentException("File could not be located for: "+resource, e); } map.put(PARTITION_KEY + i, context); i++; } return map; } }
Step 수행 이전에 호출이 되며, 입력리소스의 정보를 이용하여 출력파일에 대한 정보를 생성 및 저장한다.
✔ 출력은 path의 기본값인 “file:./target/output/” 이하에 생성이 되며, 이 경로는 Job 설정파일(path 프로퍼티)에서 변경할 수 있다.
public class EgovOutputFileListener { // outputKeyName private String outputKeyName = "outputFile"; // inputKeyName private String inputKeyName = "fileName"; // path private String path = "file:./target/output/"; ... @BeforeStep public void createOutputNameFromInput(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution.getExecutionContext(); String inputName = stepExecution.getStepName().replace(":", "-"); if (executionContext.containsKey(inputKeyName)) { inputName = executionContext.getString(inputKeyName); } if (!executionContext.containsKey(outputKeyName)) { executionContext.putString(outputKeyName, path + FilenameUtils.getBaseName(inputName) + ".csv"); } } }
✔ JunitTest 클래스의 구조는 배치실행환경 예제 Junit Test 설명을 참고한다.
✔ assertEquals(“COMPLETED”, jobExecution.getExitStatus().getExitCode()) : 배치수행결과가 COMPLETED 인지 확인한다.
✔ 데이터처리 결과를 확인하기 위해, 배치수행과 개별적으로 배치수행전 후의 inputs, outputs을 생성하여 비교한다.
@Test public void testUpdateCredit() throws Exception { //Job 의 output 자료들을 얻음 open(inputReader); List<CustomerCredit> inputs = new ArrayList<CustomerCredit>(getCredits(inputReader)); close(inputReader); //Job 수행 JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()); //Job 의 output 자료들을 얻음 @SuppressWarnings("unchecked") ItemReader<CustomerCredit> outputReader = (ItemReader<CustomerCredit>) applicationContext.getBean("outputTestReader"); open(outputReader); List<CustomerCredit> outputs = new ArrayList<CustomerCredit>(getCredits(outputReader)); close(outputReader); //input과 output 의 자료중 credit 의 값 비교 assertEquals(inputs.size(), outputs.size()); int itemCount = inputs.size(); inputs.iterator(); for (int i = 0; i < itemCount; i++) { assertEquals(inputs.get(i).getCredit().intValue(), outputs.get(i).getCredit().intValue()); }
수행방법은 JunitTest 실행을 참고한다.