목차

     

    Quartz 와 Batch

    Quartz 

    언제 실행시킬지와 관련 = 스케줄링

    - Scheduling

     

    Batch 

    - 무엇을 실행시킬지와 관련

    - Batch job

    - 보통 배치를 짠다는 말은 Batch job 개발을 한다는 의미

    Quartz와 Batch의 관계 및 함께 사용하는 이유

    • 일괄처리(로직)을 batch job 으로, batch job 을 스케쥴링 하기 위해 quartz 를 사용
    • 높은 Spring Version은 @Scheduled 어노테이션으로 Crontrigger와 같은 기능을 제공하여 Quartz 가 필요 없다곤 하나
    • Quartz 의 Clustering 기능, DB 값을 기준으로 동작 제어가 가능하다는 점에 Quartz를 함께 사용

    쿼츠 Quartz

    쿼츠란, 오픈소스 스케줄러이다. 쿼츠는 자바 환경의 규모와 상관없이 사용이 가능하고 잡 실행에 유용한 스프링 부트 지원과 같이 오래전부터 스프링 연동을 지원하고있다. 쿼츠는 아래 3가지 컴포넌트를 제공한다.

    쿼츠는 스케줄러, 잡, 트리거라는 세 가지 주요 컴포넌트를 가진다. 

    스케줄러 (Scheduler)

    스케줄러는 SchedulerFactory 를 통해서 가져올 수 있으며 JobDetails 및 트리거의 저장소 기능을 한다. 

    스케줄러는 SchedulerFactory를 통해서 가져올 수 있으며 JobDetails 및 트리거의 저장소 기능을한다. 또한 스케줄러는 연관된 트리거가 작동할 때 잡을 실행하는 역할을 한다. 

    잡 (Job)

    실행의 작업 단위이다.

    트리거 (Trigger)

    : 작업(=잡) 실행 시점을 정의한다.

    : 트리거가 작동되어 쿼츠에게 잡을 실행하도록 지시하면, 잡의 개별 실행을 정의하는 JobDetails 객체가 생성된다.

    참고

    https://blog.kingbbode.com/38

     

    Quartz + Spring Batch 조합하기

    Zum  에서 BeyondJ2EE 김태기 팀장님과 표준화 프로젝트를 진행하며, Zum  에서의 Batch  에 대한 표준을 작성하며 알게 된 Quartz Framework  의 매력과 직접 개발해본 Spring 과의 조합 및 궁합 을 소개해

    blog.kingbbode.com

     

    목차

      잡의 재시작 방지

      스프링 배치의 모든 잡은 실패하거나 중지될 때 다시 실행할 수 있었다. 스프링 배치는 기본적으로 이렇게 동작하므로 우리는 다시 실행하면 안되는 잡이 있을시 재시작을 방지해야한다. 

      preventRestart()

      preventRestart() 메서드를 호출하여 잡이 실패하거나 어떤 이유로 중지된 경우에도 다시 실행할 수 없다.

      /**
      * 잡 실행
      * @return
      */
      @Bean
      public Job transactionJob() {
            return this.jobBuilderFactory.get("transactionJob")
            .preventRestart() /* 잡은 기본적으로 재시작이 가능하도록 구성되어있다. 잡의 재시작 방지 */
            .start(...)
            .next(...)
            .build();
        }

      잡의 재시작 횟수 제한

      startLimit(n) : 재시작 횟수를  n번으로 제한한다.

      @Bean
      public Step importTransactionFileStep() {
            return this.stepBuilderFactory.get("importTransactionFileStep")
            .startLimit(2) /* 잡의 재시작 횟수 제한 */
            .<Transaction, Transaction>chunk(100)
            .reader(...)
            .writer(...)
            .listener(...) /* 스텝 빌드하기 전 실행할 리스너 등록 */
            .build();
      }

      완료된 스텝 재실행하기

      allowStartIfComplete(true)

      스텝이 잘 완료됐더라도, 다시 실행할 수 있어야 할때 사용한다. 주의할 점은, 잡의 ExitStatus 가 COMPLETE 라면 모든 스텝에 allowStartIfComplete(true) 를 적용하더라도 이와 관계없이 잡 인스턴스는 다시 실행할 수 없다.

      잡이 재실행될때 무조건 실행되어야할 스텝이 존재할 경우 설정한다.

      @Bean
      public Step importTransactionFileStep() {
            return this.stepBuilderFactory.get("importTransactionFileStep")
            .<Transaction, Transaction>chunk(100)
            .reader(...)
            .writer(...)
            .allowStartIfComplete(true) /* 잡이 재시작될시, 스텝이 다시 실행될 수 있도록 재시작 허용 */
            .listener(...) /* 스텝 빌드하기 전 실행할 리스너 등록 */
            .build();
      }

      목차

        소개

        일반적으로 Spring Batch는 단일 쓰레드에서 실행됩니다.
        즉, 모든 것이 순차적으로 실행되는 것을 의미하는데요.
        Spring Batch에서는 이를 병렬로 실행할 수 있는 방법을 여러가지 지원합니다.
        이번 시간에는 그 중 하나인 멀티스레드로 Step을 실행하는 방법에 대해서 알아보겠습니다.

        Spring Batch의 멀티쓰레드 Step은 Spring의 TaskExecutor를 이용하여 각 쓰레드가 Chunk 단위로 실행되게 하는 방식입니다.

        Spring Batch Chunk에 대한 내용은 이전 포스팅에 소개되어있습니다.

        여기서 어떤 TaskExecutor 를 선택하냐에 따라 모든 Chunk 단위별로 쓰레드가 계속 새로 생성될 수도 있으며 (SimpleAsyncTaskExecutor) 혹은 쓰레드풀 내에서 지정된 갯수의 쓰레드만을 재사용하면서 실행 될 수도 있습니다. (ThreadPoolTaskExecutor)

        Spring Batch에서 멀티쓰레드 환경을 구성하기 위해서 가장 먼저 해야할 일은 사용하고자 하는 Reader와 Writer가 멀티쓰레드를 지원하는지 확인하는 것 입니다.

        JpaPagingItemReader의 Javadoc

        각 Reader와 Writer의 Javadoc에 항상 저 thread-safe 문구가 있는지 확인해보셔야 합니다.
        만약 없는 경우엔 thread-safe가 지원되는 Reader 와 Writer를 선택해주셔야하며, 꼭 그 Reader를 써야한다면 SynchronizedItemStreamReader 등을 이용해 thread-safe로 변환해서 사용해볼 수 있습니다.

        그리고 또 하나 주의할 것은 멀티 쓰레드로 각 Chunk들이 개별로 진행되다보니 Spring Batch의 큰 장점중 하나인 실패 지점에서 재시작하는 것은 불가능 합니다. 이유는 간단합니다.
        단일 쓰레드로 순차적으로 실행할때는 10번째 Chunk가 실패한다면 9번째까지의 Chunk가 성공했음이 보장되지만, 멀티쓰레드의 경우 1~10개의 Chunk가 동시에 실행되다보니 10번째 Chunk가 실패했다고 해서 1~9개까지의 Chunk가 다 성공된 상태임이 보장되지 않습니다.

        그래서 일반적으로는 ItemReader의 saveState 옵션을 false 로 설정하고 사용합니다.

        이건 예제 코드에서 설정을 보여드리겠습니다.

        자 그럼 실제로 하나씩 코드를 작성하면서 실습해보겠습니다.

        2. PagingItemReader 예제

        가장 먼저 알아볼 것은 PagingItemReader를 사용할때 입니다.
        이때는 걱정할 게 없습니다.
        PagingItemReader는 Thread Safe 하기 때문입니다.

        멀티 쓰레드로 실행할 배치가 필요하시다면 웬만하면 PagingItemReader로 사용하길 추천드립니다.

        예제 코드는 JpaPagingItemReader로 작성하였습니다.

        @Slf4j
        @RequiredArgsConstructor
        @Configuration
        public class MultiThreadPagingConfiguration {
            public static final String JOB_NAME = "multiThreadPagingBatch";
        
            private final JobBuilderFactory jobBuilderFactory;
            private final StepBuilderFactory stepBuilderFactory;
            private final EntityManagerFactory entityManagerFactory;
        
            private int chunkSize;
        
            @Value("${chunkSize:1000}")
            public void setChunkSize(int chunkSize) {
                this.chunkSize = chunkSize;
            }
        
            private int poolSize;
        
            @Value("${poolSize:10}") // (1)
            public void setPoolSize(int poolSize) {
                this.poolSize = poolSize;
            }
        
            @Bean(name = JOB_NAME+"taskPool")
            public TaskExecutor executor() {
                ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // (2)
                executor.setCorePoolSize(poolSize);
                executor.setMaxPoolSize(poolSize);
                executor.setThreadNamePrefix("multi-thread-");
                executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
                executor.initialize();
                return executor;
            }
        
            @Bean(name = JOB_NAME)
            public Job job() {
                return jobBuilderFactory.get(JOB_NAME)
                        .start(step())
                        .preventRestart()
                        .build();
            }
        
            @Bean(name = JOB_NAME +"_step")
            @JobScope
            public Step step() {
                return stepBuilderFactory.get(JOB_NAME +"_step")
                        .<Product, ProductBackup>chunk(chunkSize)
                        .reader(reader(null))
                        .processor(processor())
                        .writer(writer())
                        .taskExecutor(executor()) // (2)
                        .throttleLimit(poolSize) // (3)
                        .build();
            }
        
        
            @Bean(name = JOB_NAME +"_reader")
            @StepScope
            public JpaPagingItemReader<Product> reader(@Value("#{jobParameters[createDate]}") String createDate) {
        
                Map<String, Object> params = new HashMap<>();
                params.put("createDate", LocalDate.parse(createDate, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        
                return new JpaPagingItemReaderBuilder<Product>()
                        .name(JOB_NAME +"_reader")
                        .entityManagerFactory(entityManagerFactory)
                        .pageSize(chunkSize)
                        .queryString("SELECT p FROM Product p WHERE p.createDate =:createDate")
                        .parameterValues(params)
                        .saveState(false) // (4)
                        .build();
            }
        
            private ItemProcessor<Product, ProductBackup> processor() {
                return ProductBackup::new;
            }
        
            @Bean(name = JOB_NAME +"_writer")
            @StepScope
            public JpaItemWriter<ProductBackup> writer() {
                return new JpaItemWriterBuilder<ProductBackup>()
                        .entityManagerFactory(entityManagerFactory)
                        .build();
            }
        }

        (1) @Value("${poolSize:10}")

        • 생성할 쓰레드 풀의 쓰레드 수를 환경변수로 받아서 사용합니다.
        • ${poolSize:10} 에서 10은 앞에 선언된 변수 poolSize가 없을 경우 10을 사용한다는 기본값으로 보시면 됩니다.
        • 배치 실행시 PoolSize를 조정하는 이유는 실행 환경에 맞게 유동적으로 쓰레드풀을 관리하기 위함입니다.
          • 개발 환경에서는 1개의 쓰레드로, 운영에선 10개의 쓰레드로 실행할 수도 있습니다.
          • 혹은 같은 시간대에 수행되는 다른 배치들로 인해서 갑자기 쓰레드 개수를 줄여야 할 수도 있습니다.
          • 언제든 유동적으로 배치 실행시점에 몇개의 쓰레드를 생성할지 결정할 수 있으니 웬만하면 외부에 받아서 사용하는 방식을 선호합니다.
        • Field가 아닌 Setter로 받는 이유는 Spring Context가 없이 테스트 코드를 작성할때 PoolSize, ChunkSize등을 입력할 방법이 없기 때문입니다.

        (2) ThreadPoolTaskExecutor

        • 쓰레드 풀을 이용한 쓰레드 관리 방식입니다.
        • 옵션
          • corePoolSize: Pool의 기본 사이즈
          • maxPoolSize: Pool의 최대 사이즈
        • 이외에도 SimpleAsyncTaskExecutor 가 있는데, 이를 사용할 경우 매 요청시마다 쓰레드를 생성하게 됩니다.
          • 이때 계속 생성하다가 concurrency limit 을 초과할 경우 이후 요청을 막게되는 현상까지 있어, 운영 환경에선 잘 사용하진 않습니다.
        • 좀 더 자세한 설명은 링크 참고하시면 더 좋습니다

        (3) throttleLimit(poolSize)

        • 기본값은 4 입니다.
        • 생성된 쓰레드 중 몇개를 실제 작업에 사용할지를 결정합니다.
        • 만약 10개의 쓰레드를 생성하고 throttleLimit을 4로 두었다면, 10개 쓰레드 중 4개만 배치에서 사용하게 됨을 의미합니다.
        • 일반적으로 corePoolSize, maximumPoolSize, throttleLimit 를 모두 같은 값으로 맞춥니다.

        (4) .saveState(false)

        • 앞에서도 설명드린것처럼, 멀티쓰레드 환경에서 사용할 경우 필수적으로 사용해야할 옵션이 saveState = false 입니다.
        • 해당 옵션을 끄게 되면 (false) Reader 가 실패한 지점을 저장하지 못하게해, 다음 실행시에도 무조건 처음부터 다시 읽도록 합니다.
        • 이 옵션을 켜놓으면 오히려 더 큰 문제가 발생할 수 있습니다.
          • 8번째 Chunk 에서 실패했는데, 사실은 4번째 Chunk도 실패했다면 8번째가 기록되어 다음 재실행시 8번째부터 실행될수 있기 때문입니다.
          • 실패하면 무조건 처음부터 다시 실행될 수 있도록 해당 옵션은 false로 두는 것을 추천합니다.
        • 비슷한 기능으로 Job 옵션에 있는 .preventRestart()가 있는데, 해당 옵션은 Job이 같은 파라미터로 재실행되는것을 금지합니다.
          • .saveState(false)는 Reader가 실패난 지점을 기록하지 못하게 하는 옵션이라 엄밀히 말하면 둘은 서로 다른 옵션이긴 합니다.
          • Step 재실행을 막는다정도로 봐주시면 됩니다.

        자 그럼 이제 이 코드가 실제로 멀티쓰레드로 잘 작동하는지 테스트 코드로 검증해보겠습니다.

        테스트 코드

        모든 테스트 코드는 JUnit5를 사용합니다.
        Spring Batch에서 테스트 코드 작성이 처음이신분들은 앞에 작성된 포스팅을 먼저 참고해주세요.

        @ExtendWith(SpringExtension.class)
        @SpringBatchTest
        @SpringBootTest(classes={MultiThreadPagingConfiguration.class, TestBatchConfig.class})
        @TestPropertySource(properties = {"chunkSize=1", "poolSize=2"}) // (1)
        public class MultiThreadPagingConfigurationTest {
        
            @Autowired
            private ProductRepository productRepository;
        
            @Autowired
            private ProductBackupRepository productBackupRepository;
        
            @Autowired
            private JobLauncherTestUtils jobLauncherTestUtils;
        
            @AfterEach
            void after() {
                productRepository.deleteAll();
                productBackupRepository.deleteAll();
            }
        
            @Test
            public void 페이징_분산처리_된다() throws Exception {
                //given
                LocalDate createDate = LocalDate.of(2020,4,13);
                ProductStatus status = ProductStatus.APPROVE;
                long price = 1000L;
                for (int i = 0; i < 10; i++) {
                    productRepository.save(Product.builder()
                            .price(i * price)
                            .createDate(createDate)
                            .status(status)
                            .build());
                }
        
                JobParameters jobParameters = new JobParametersBuilder()
                        .addString("createDate", createDate.toString())
                        .addString("status", status.name())
                        .toJobParameters();
                //when
                JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
        
                //then
                assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
                List<ProductBackup> backups = productBackupRepository.findAll();
                backups.sort(Comparator.comparingLong(ProductBackup::getPrice));
        
                assertThat(backups).hasSize(10);
                assertThat(backups.get(0).getPrice()).isEqualTo(0L);
                assertThat(backups.get(9).getPrice()).isEqualTo(9000L);
            }
        
        }

        (1) properties = {"chunkSize=1", "poolSize=2"}

        • 각 옵션은 다음과 같은 의미를 가집니다.
          • chunkSize=1: 하나의 Chunk가 처리할 데이터가 1건을 의미합니다.
          • poolSize=2: 생성될 쓰레드 풀의 쓰레드 개수를 2개로 합니다.
        • 이렇게 할 경우 10개의 데이터를 처리할때 2개의 쓰레드가 각 5회씩 처리됩니다.
          • 물론 1개의 쓰레드에서 오랜 시간 동안 처리하게 된다면 다른 1개가 더 많은 건수를 처리할 수도 있습니다.

        위 테스트 코드를 한번 실행해보면?
        아래 그림처럼 2개의 쓰레드가 각자 페이지를 Read하고 Write 하는것을 확인할 수 있습니다.

        이전과 같이 단일 쓰레드 모델이였다면 어떻게 될까요?
        그럼 아래와 같이 1개페이지에 대해 읽기와 쓰기가 모두 끝난 후에야 다음 페이지를 진행하게 됩니다.

        JpaPagingItemReader를 예시로 보여드렸지만, 그외 나머지 PagingItemReader들 역시 동일하게 사용하시면 됩니다

        (JdbcPagingItemReader)

        비교적 편하게 작동되는 PagingItemReader들은 쓰레드풀만 지정하면 됩니다.
        자 그럼 ThreadSafe 하지 않는 Cursor 기반의 Reader들은 어떻게 할지 알아보겠습니다.

        3. CursorItemReader

        JdbcCursorItemReader를 비롯하여 JDBC ResultSet를 사용하여 데이터를 읽는 CursorItemReader는 Thread Safe하지 않습니다.

        (Javadoc어디에도 Thread Safe 단어를 찾을 수가 없습니다.)

        이와 같이 Thread Safe 하지 않는 Reader들을 Thread Safe하게 변경하기 위해서는 데이터를 읽는 read()에 synchronized 를 걸어야만 합니다.

        다만 이렇게 하게 되면 Reader는 멀티 쓰레드로 작동하지 않고, 순차적으로 데이터를 읽게 될텐데요.
        Reader가 동기화 방식이 된다하더라도, Processor/Writer는 멀티 쓰레드로 작동이 됩니다.

        일반적으로 배치 과정에서는 Write 단계에서 더 많은 자원과 시간을 소모합니다.
        그래서 Bulk Insert 등의 방법에 대해서 많이 얘기가 나옵니다.

        이미 구현체가 있는 JdbcCursorItemReader나 HibernateCursorItemReader에 synchronized 를 추가하려면 어떻게 해야할까요?

        JpaCursorItemReader는 Spring Batch 4.3에 추가될 예정입니다.

        가장 쉬운 방법은 Spring Batch 4.0부터 추가된 SynchronizedItemStreamReader로 Wrapping 하는 것입니다.

        자 그럼 예제 코드로 실제로 CursorItemReader가 Thread Safe 하지 않는지 확인후, 이를 고치는 과정으로 살펴보겠습니다.

        3-1. Not Thread Safety 코드

        먼저 멀티쓰레드 환경에서 바로 JdbcCursorItemReader를 사용할 경우 입니다.

        @Slf4j
        @RequiredArgsConstructor
        @Configuration
        public class MultiThreadCursorConfiguration {
            public static final String JOB_NAME = "multiThreadCursorBatch";
        
            private final JobBuilderFactory jobBuilderFactory;
            private final StepBuilderFactory stepBuilderFactory;
            private final EntityManagerFactory entityManagerFactory;
            private final DataSource dataSource;
        
            private int chunkSize;
        
            @Value("${chunkSize:1000}")
            public void setChunkSize(int chunkSize) {
                this.chunkSize = chunkSize;
            }
        
            private int poolSize;
        
            @Value("${poolSize:10}")
            public void setPoolSize(int poolSize) {
                this.poolSize = poolSize;
            }
        
            @Bean(name = JOB_NAME+"taskPool")
            public TaskExecutor executor() {
                ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                executor.setCorePoolSize(poolSize);
                executor.setMaxPoolSize(poolSize);
                executor.setThreadNamePrefix("multi-thread-");
                executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
                executor.initialize();
                return executor;
            }
        
            @Bean(name = JOB_NAME)
            public Job job() {
                return jobBuilderFactory.get(JOB_NAME)
                        .start(step())
                        .preventRestart()
                        .build();
            }
        
            @Bean(name = JOB_NAME +"_step")
            @JobScope
            public Step step() {
                return stepBuilderFactory.get(JOB_NAME +"_step")
                        .<Product, ProductBackup>chunk(chunkSize)
                        .reader(reader(null))
                        .listener(new CursorItemReaderListener()) // (1)
                        .processor(processor())
                        .writer(writer())
                        .taskExecutor(executor())
                        .throttleLimit(poolSize)
                        .build();
            }
        
            @Bean(name = JOB_NAME +"_reader")
            @StepScope
            public JdbcCursorItemReader<Product> reader(@Value("#{jobParameters[createDate]}") String createDate) {
                String sql = "SELECT id, name, price, create_date, status FROM product WHERE create_date=':createDate'"
                        .replace(":createDate", createDate);
        
                return new JdbcCursorItemReaderBuilder<Product>() // (2)
                        .fetchSize(chunkSize)
                        .dataSource(dataSource)
                        .rowMapper(new BeanPropertyRowMapper<>(Product.class))
                        .sql(sql)
                        .name(JOB_NAME +"_reader")
                        .build();
            }
        
            private ItemProcessor<Product, ProductBackup> processor() {
                return item -> {
                    log.info("Processing Start Item id={}", item.getId());
                    Thread.sleep(1000); // (3)
                    log.info("Processing End Item id={}", item.getId());
                    return new ProductBackup(item);
                };
            }
        
            @Bean(name = JOB_NAME +"_writer")
            @StepScope
            public JpaItemWriter<ProductBackup> writer() {
                return new JpaItemWriterBuilder<ProductBackup>()
                        .entityManagerFactory(entityManagerFactory)
                        .build();
            }
        }

        (1) .listener(new CursorItemReaderListener())

        • JdbcCursorItemReader는 별도로 데이터 읽기 수행시 별도의 로그를 남기지 않습니다.
        • 멀티쓰레드로 데이터를 읽고 있음을 쉽게 확인하기 위해 리스너를 추가합니다.

        (2) JdbcCursorItemReaderBuilder

        • JpaPagingItemReader 코드와 딱 Reader 영역만 교체하여 사용합니다.

        (3) Thread.sleep(1000);

        • 멀티쓰레드가 진행되는지 명확하게 구분하기 위해 각 Thread의 Processor 단계에서 1초간 Sleep이 발생하도록 합니다.
        • 너무 고속으로 처리될 경우 멀티쓰레드와 단일쓰레드의 차이가 구분이 거의 힘들기 때문에 의도적으로 지연 현상을 발생시킨 것입니다.

        자 그럼 위 코드를 테스트 코드로 한번 검증해보겠습니다.

        @ExtendWith(SpringExtension.class)
        @SpringBatchTest
        @SpringBootTest(classes={MultiThreadCursorConfiguration.class, TestBatchConfig.class})
        @TestPropertySource(properties = {"chunkSize=1", "poolSize=5"})
        public class MultiThreadCursorConfigurationTest {
        
            @Autowired
            private ProductRepository productRepository;
        
            @Autowired
            private ProductBackupRepository productBackupRepository;
        
            @Autowired
            private JobLauncherTestUtils jobLauncherTestUtils;
        
            @AfterEach
            void after() {
                productRepository.deleteAll();
                productBackupRepository.deleteAll();
            }
        
            @Test
            public void Cursor_분산처리_된다() throws Exception {
                //given
                LocalDate createDate = LocalDate.of(2020,4,13);
                ProductStatus status = ProductStatus.APPROVE;
                long price = 1000L;
                for (int i = 0; i < 10; i++) {
                    productRepository.save(Product.builder()
                            .price(i * price)
                            .createDate(createDate)
                            .status(status)
                            .build());
                }
        
                JobParameters jobParameters = new JobParametersBuilder()
                        .addString("createDate", createDate.toString())
                        .toJobParameters();
                //when
                JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
        
                //then
                assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
                List<ProductBackup> backups = productBackupRepository.findAll();
                backups.sort(Comparator.comparingLong(ProductBackup::getPrice));
        
                assertThat(backups).hasSize(10);
                assertThat(backups.get(0).getPrice()).isEqualTo(0L);
                assertThat(backups.get(9).getPrice()).isEqualTo(9000L);
            }
        
        }

        10개의 데이터가 정상적으로 백업 테이블로 이관되었는지를 검증합니다.
        여기서 Thread Safe하지 않으면 10개가 아닌 다른 개수가 있겠죠?

        위 테스트를 한번 실행해보면!

        역시 10개가 아닌 다른 건수가 들어가 있습니다.

        저장된 12개의 데이터를 확인해보면 이처럼 똑같은 데이터가 여러개 저장되어 있음을 알 수 있습니다.

        등록한 리스너를 통해 쓰레드들 (지정된 Pool Size는 5) 이 모두 같은 ID를 가진 데이터를 읽기 시작한 것도 확인할 수 있습니다.

        현재 코드에 문제가 있는것이 확인되었으니, 바로 코드를 수정해보겠습니다.

        3-3. Thread Safety 코드

        Thread Safety 코드는 Reader 영역을 SynchronizedItemStreamReader로 감싸기만 하면 됩니다.

        @Bean(name = JOB_NAME +"_reader")
        @StepScope
        public SynchronizedItemStreamReader<Product> reader(@Value("#{jobParameters[createDate]}") String createDate) {
        String sql = "SELECT id, name, price, create_date, status FROM product WHERE create_date=':createDate'"
                .replace(":createDate", createDate);
        
        JdbcCursorItemReader<Product> itemReader = new JdbcCursorItemReaderBuilder<Product>()
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Product.class))
                .sql(sql)
                .name(JOB_NAME + "_reader")
                .build();
        
        return new SynchronizedItemStreamReaderBuilder<Product>() 
                .delegate(itemReader) // (1)
                .build();
        }

        (1) .delegate(itemReader)

        • delegate 에 감싸고 싶은 ItemReader 객체를 등록 합니다.
        • 감싸진 객체는 아래 사진에 나온것처럼 synchronized 메소드에서 감싸져 호출되어 동기화된 읽기가 가능하게 됩니다.

        SynchronizedItemStreamReader는 Spring Batch 4.0 부터 지원됩니다.
        그 이하 버전을 사용하시는 분들이라면 SynchronizedItemStreamReader 클래스 코드를 복사하여 프로젝트에 추가하시면 됩니다.

        SynchronizedItemStreamReader 로 변경후 다시 테스트를 돌려보면?
        테스트가 성공적으로 통과하는 것을 확인할 수 있습니다.

        실제로 실행 로그에서도 멀티쓰레드 환경에서 잘 작동되었음을 확인할 수 있습니다.

        정상적으로 Cursor 기반의 멀티쓰레드 Step을 확인하였습니다.

        마무리

        이제 느린 Batch 작업들은 멀티쓰레드로 해결하면 되는 것일까요!?
        그렇지는 않습니다.
        이미 네트워크/DISK IO/CPU/Memory 등 서버 자원이 이미 단일 쓰레드에서도 리소스 사용량이 한계치에 달했다면 멀티쓰레드로 진행한다고 해서 성능 향상을 기대할 순 없습니다.

        멀티 쓰레드는 여러가지 고려사항이 많습니다.
        그래서 실제 운영 환경에 적용하실때는 Spring 공식문서 를 숙지하시고, 충분히 테스트를 해보신뒤 실행해보시길 권장합니다.

        참고 : https://jojoldu.tistory.com/493

        bulk insert

        개발을 하다보면 API 호출 시 1000건이상의 데이터를 삽입해야하는 상황이 있습니다.

        1000건의 데이터를 insert하는데 걸리는 시간은 약 5초였으며 시간을 줄일 수 있는 방안을 생각해봤는데요. 

        mybatis에서 대량의 데이터를 한번에 삽입할 수 있도록 insert foreach문(DBMS마다 지원하는 쿼리가 다름)을 지원하는 것을 알게 되었습니다.

         

        수정 전 SQL

        	<insert id="add" parameterType="com.dto.Receiver$Info">
        		insert into tb_receiver(receiver_key, token, user_id, amount, is_received, receiver_date) 
        		values(#{receiver_key}, #{token}, #{user_id}, #{amount}, #{is_received}, #{receiver_date})
        	</insert>
        

         

        수정 후 SQL

        	
        	<insert id="addList" parameterType="com.dto.Receiver$Info">
        		insert into tb_receiver(receiver_key, token, user_id, amount, is_received, receiver_date)
        		values
        		<foreach collection="list" index="index" item="receiver" separator=",">
        		(
        			#{receiver.receiver_key},
        			#{receiver.token},
        			#{receiver.user_id},
        			#{receiver.amount},		
        			#{receiver.is_received},		
        			#{receiver.receiver_date}		
        		)		
        		</foreach>
        	</insert>	

         

        기존방식에서 bulk insert 방식으로 변경 시 성능향상이 얼마만큼 되는지 JUnit 테스트를 진행해봤습니다.

        테스트코드

        	@Autowired
        	ReceiverDao receiverDao;
        	
        	@Autowired
        	SpreadService spreadService;
        	
        	private final int peopleCnt = 7000;
        	@Test
        	public void 벌크_인석트_테스트() {
        
        		long amount = 1000000;
        		
        		long[] amounts = spreadService.distribute(amount, peopleCnt);
        		List<Receiver.Info> listReceiver = new ArrayList<Receiver.Info>();
        				
        		//뿌리기 받을사람 정보 추가
        		for(int i=0; i<amounts.length; i++) {
        			Receiver.Info receiver = new Receiver.Info();
        			receiver.setToken("AsD");			
        			receiver.setAmount(amounts[i]);
        			receiver.setReceiver_key(SeedUtil.createUUID());
        			receiver.setIs_received("0");
        			receiver.setReceiver_date(new Date());
        			listReceiver.add(receiver);
        		}
        		
        		int result = receiverDao.addList(listReceiver);
        				
        		assertThat(result, is(peopleCnt));
        	}
        	
        	@Test
        	public void 단일_인석트_테스트() {
        
        		long amount = 1000000;
        		int sum = 0, result = 0;
        		long[] amounts = spreadService.distribute(amount, peopleCnt);
        				
        		//뿌리기 받을사람 정보 추가
        		for(int i=0; i<amounts.length; i++) {
        			Receiver.Info receiver = new Receiver.Info();
        			receiver.setToken("Asq");			
        			receiver.setAmount(amounts[i]);
        			receiver.setReceiver_key(SeedUtil.createUUID());
        			receiver.setIs_received("0");
        			receiver.setReceiver_date(new Date());
        			result = receiverDao.add(receiver);
        			sum += result;
        		}
        						
        		assertThat(sum, is(peopleCnt));
        	}

         

        테스트는 다음과 같이 진행했습니다. 삽입하려는 데이터의 건수는 1,000건부터 8,000건까지 1,000단위로 테스트 진행

        건수 single insert bulk insert
        1,000건 약 5초 약 0.7초
        2,000건 약 7초 약 1초
        3,000건 약 9초 약 1초
        4,000건 약 12초 약 1.5초
        5,000건 약 14초 약 1.5초
        6,000건 약 17초 약 2초
        7,000건 약 20초 약 2.2초
        8,000건 약 25초 약 2.2초

        Spring Batch ItemWriter 성능 향상

        대규모 데이터를 처리하는 Spring Batch 에서 배치 성능은 중요한 요소입니다.
        배치 성능에 있어서 튜닝 요소는 크게 2가지로 정리 될 수 있습니다.

        • Reader를 통한 데이터 조회
        • Writer를 통한 데이터 등록/수정

        Reader의 경우엔 Select Query 튜닝을 통한 개선 이야기가 많이 공유되어있습니다.

        Querydsl을 통한 Paging, No Offset 조회 방법은 이전 포스팅 을 참고하시면 됩니다.

        반면 Writer의 경우에는 Reader에 비해서는 공유된 내용이 많지 않습니다.
        그래서 이번 시간에는 Spring Batch JPA를 사용하는 경우에 어떻게 개선할 수 있을지 실제 비교를 해가며 정리하였습니다.

        Merge vs Persist

        JPA에서 Merge는 Insert에서 비효율적으로 작동을 합니다.

        Merge는 Entity의 persistent 상태를 알 수 없거나 이미 저장된 것을 변경하는데 유용합니다.

        다만, Spring Batch에서는 JpaItemWriter를 통한 write 작업이 신규 생성되는 Entity를 저장하는 기능과 기존 Entity의 값 변경 2영역에 모두 대응해야되어 Merge 를 기본 Mode로 구현하였습니다.

        그러던 중, Spring Batch 4.2 버전 선택적으로 Persist 모드를 선택할 수 있도록 개편되었습니다.

        Spring Boot 2.2.8 부터 사용 가능합니다.

        자 그럼 2가지 Mode에 대한 비교를 해보겠습니다.

        1-1. Non Auto Increment

        먼저 비교해볼 것은 테이블의 ID 생성 전략이 없는 경우 입니다. (즉, Auto Increment가 없는 상태) 이는 Id 채번을 애플리케이션에서 하는 경우입니다. 테스트 할 Entity 는 다음과 같습니다.

        @Getter
        @NoArgsConstructor
        @Entity
        public class Person2 {
        
            @Id
            private Long id;
        
            private String name;
        
            public Person2(Long id, String name) {
                this.id = id;
                this.name = name;
            }
        }

        보시다시피 @Id 외에 @GeneratedValue(strategy) 를 선언하지 않은 상태입니다. 해당 Entity에 대한 Merge 테스트 코드는 다음과 같습니다.

        @Test
        public void non_auto_increment_test_merge() throws Exception {
            // given
            JpaItemWriter<Person2> writer = new JpaItemWriterBuilder<Person2>()
                    .entityManagerFactory(this.entityManagerFactory)
                    .build();
        
            writer.afterPropertiesSet();
            List<Person2> items = new ArrayList<>();
            for (long i = 0; i < TEST_COUNT; i++) {
                items.add(new Person2(i, "foo" + i));
            }
        
            // when
            writer.write(items);
        }

        Reader로 인한 조회 성능 차이나 그 밖에 Spring Batch의 여러 요소들로 인해 방해되는 것을 막기 위해 순수 Writer만 가지고 테스트를 진행합니다.

        위 테스트 코드를 수행해보면?

        Select쿼리와 Insert쿼리가 함께 수행 되는 것을 볼 수 있는데, 이는 Hibernate의 Merge 작동 방식때문인데요. 

        기존에 해당 Id로 저장된 Entity가 있을 경우 Update를, 없을 경우엔 Insert를 실행하기 위하여 저장하는 Entity 개수만큼 Select 쿼리가 발생합니다.

        반대로 Persist에선 어떻게 작동할까요?

        아래와 같이 테스트 코드를 작성하여 실행해봅니다.

        @Test
        public void non_auto_increment_test_persist() throws Exception {
            // given
            JpaItemWriter<Person2> writer = new JpaItemWriterBuilder<Person2>()
                    .usePersist(true) // (1)
                    .entityManagerFactory(this.entityManagerFactory)
                    .build();
        
            writer.afterPropertiesSet();
            List<Person2> items = new ArrayList<>();
            for (long i = 0; i < TEST_COUNT; i++) {
                items.add(new Person2(i, "foo" + i));
            }
        
            // when
            writer.write(items);
        }

        (1) .usePersist(true)

        • 글 상단에서 언급한것처럼 Spring Batch 4.2에서 도입된 persist 모드를 활성화하는 옵션입니다.

        테스트 로그를 보면?

        Merge와 달리 Insert쿼리만 발생한 것을 확인할 수 있습니다.

        자 그럼 이 둘의 실제 성능 차이는 얼마나 발생할까요?

        Non Auto Increment 성능 비교

        1만건의 Entity를 AWS RDS Aurora (r5.large) 에 밀어넣어보면서 비교해보겠습니다.

        1. Merge

        2. Persist

        테스트 결과 약 2배 (merge: 2m 16s, persist: 1m 9s) 의 성능 차이가 발생하는 것을 확인할 수 있습니다.

        Id 생성 전략이 별도로 없을 경우 Persist가 좋다는 것을 확인할 수 있습니다.

        1-2. Auto Increment

        자 그럼 반대로 Auto Increment가 선언 된 경우엔 어떻게 될까요?

        테스트에 사용될 Entity는 다음과 같습니다.

        @Getter
        @NoArgsConstructor
        @Entity
        public class Person {
        
            @Id
            @GeneratedValue(strategy = GenerationType.IDENTITY)
            private Long id;
        
            private String name;
        
            public Person(String name) {
                this.name = name;
            }
        }

        가장 먼저 Merge에 대한 테스트 코드입니다.

        @Test
        public void auto_increment_test_merge() throws Exception {
            // given
            JpaItemWriter<Person> writer = new JpaItemWriterBuilder<Person>()
                    .entityManagerFactory(this.entityManagerFactory)
                    .build();
        
            writer.afterPropertiesSet();
            List<Person> items = new ArrayList<>();
            for (long i = 0; i < TEST_COUNT; i++) {
                items.add(new Person( "foo" + i));
            }
        
            // when
            writer.write(items);
        
            // then
            assertThat(personRepository.count()).isEqualTo(TEST_COUNT);
        }

        이번 테스트부터는 Id 생성을 Auto Increment에 맡기기 때문에 직접 생성하지 않도록 하였습니다.

        위 코드를 실행해보면?

        추가 Select 쿼리 없이 Insert만 수행되는 것을 확인할 수 있습니다.
        지정된 Id가 없으니 명확하게 새로운 Entity 임을 알 수 있기 때문에 별도의 Select 쿼리가 발생하지 않았습니다.

        그럼 Persist는 어떻게 될까요?

        @Test
        public void auto_increment_test_persist() throws Exception {
            // given
            JpaItemWriter<Person> writer = new JpaItemWriterBuilder<Person>()
                    .usePersist(true)
                    .entityManagerFactory(this.entityManagerFactory)
                    .build();
        
            writer.afterPropertiesSet();
            List<Person> items = new ArrayList<>();
            for (long i = 0; i < TEST_COUNT; i++) {
                items.add(new Person( "foo" + i));
            }
        
            // when
            writer.write(items);
        
            // then
            assertThat(personRepository.count()).isEqualTo(TEST_COUNT);
        }

        Persist 테스트 코드를 수행해보면?

        마찬가지로 Insert쿼리만 수행되는 것을 확인할 수 있습니다.

        Persist의 경우 항상 새로운 객체를 저장할 때만 사용해야 합니다.
        Auto Increment에서 ID가 있는 Entity를 저장할 경우 에러가 발생합니다.

        둘 사이에 쿼리가 차이가 없으니 실제로 성능비교를 한번 해보겠습니다.

        Auto Increment 성능 비교

        실제 발생하는 쿼리가 동일하니 성능 역시 비슷하게 나옵니다.

        1. Merge

        2. Persist

        둘의 수행속도가 비슷하니 Auto Increment인 경우에 써도 되지 않을까? 싶으실텐데요.

        실제 Merge는 Entity 복사를 매번 수행합니다.
        PersistenceContext에 존재하는 것을 반환하거나 Entity의 새 인스턴스를 만듭니다.
        어쨌든 제공된 Entity에서 상태를 복사하고 관리되는 복사본을 반환합니다.
        (전달한 인스턴스는 관리되지 않습니다.)

        그래서 성능이 비슷하다 하더라도 신규 Entity를 생성할때는 Persist를 사용하는 것이 좋습니다.

        2. Jdbc Batch Insert

        위 테스트들을 거치면서 한가지 의문이 있으셨을 것입니다.

        • JpaItemWriter는 왜 Batch Insert (혹은 Bulk Insert) 로 처리하지 않는 것이지?

        일반적으로 Batch Insert라 하면 아래와 같은 쿼리를 이야기 합니다.

        INSERT INTO person (name) VALUES
        ('name1'),
        ('name2'),
        ('name3');

        이렇게 할 경우 MySQL 매커니즘으로 인해서 고성능으로 대량의 데이터를 처리할 수 있는데요.

        실제 성능 비교를 아래에서 진행합니다.

        JPA (정확히는 Hibernate) 에서는 Auto Increment 일 경우엔 이 방식을 지원하지 않습니다.

        물론 Auto Increment가 아닐 경우엔 아래와 같은 옵션으로 values 사이즈를 조절하여 Batch Insert를 사용할 수 있습니다.

        spring.jpa.properties.hibernate.jdbc.batch_size=개수

        이는 Hibernate의 매커니즘상 Entity의 Id를 알 수 없는 경우 Transactional write behind(트랜잭션을 지원하는 쓰기 지연: 트랜잭션이 커밋 될때까지 내부 쿼리저장소에 모아뒀다가 한번에 실행하는 방식)과 충돌이 발생하기 때문입니다.

        예를 들어, OneToMany의 Entity를 insert할 경우

        1) 부모 Entity를 insert 하고 생성된 Id 반환
        2) 자식 Entity에선 1) 에서 생성된 부모 Id를 FK 값으로 채워서 insert

        위 과정를 진행하는 쿼리를 모아서 실행하는게 Hibernate의 방식인데, 이때 Batch Insert과 같은 대량 등록의 경우엔 이 방식을 사용할 수가 없습니다.
        (부모 Entity를 한번에 대량 등록하게 되면, 어느 자식 Entity가 어느 부모 Entity에 매핑되어야하는지 알 수 없겠죠?)

        그럼 ID 생성 전략을 Auto Increment가 아닌 Table (Sequence)를 선택하면 되지 않을까 생각하게 되는데요.
        아래 글에서 자세하게 설명하고 있지만, 성능상 이슈 Dead Lock에 대한 이슈로 Auto Increment를 강력하게 추천합니다.

        그래서 이 포스팅에서도 Auto Increment와 직접 생성 방식에 대해서만 성능 비교를 진행해보겠습니다.

        혹시나 MySQL에서 실행중인 쿼리를 확인했을때 Insert 쿼리 합치기가 안된다면 Spring Boot의 Jdbc-url 설정에 rewriteBatchedStatements 옵션 (기본값이 false) 이 true 인지 확인해보시면 좋습니다.
        적용방법: jdbc:mysql:://DB주소:포트/스키마?rewriteBatchedStatements=true

        3-1. Non Auto Increment 성능

        먼저 Auto Increment 가 아닐 경우의 성능을 확인해보겠습니다.

        테스트할 코드는 아래와 같습니다.

        @Test
        public void non_auto_increment_test_jdbc() throws Exception {
            //given
            JdbcBatchItemWriter<Person2> writer = new JdbcBatchItemWriterBuilder<Person2>()
                    .dataSource(dataSource)
                    .sql("insert into person(id, name) values (:id, :name)")
                    .beanMapped()
                    .build();
        
            writer.afterPropertiesSet();
            List<Person2> items = new ArrayList<>();
            for (long i = 0; i < TEST_COUNT; i++) {
                items.add(new Person2(i, "foo" + i));
            }
        
            // when
            writer.write(items);
        }

        1만건을 요청하는 위 코드를 직접 MySQL에 요청을 해보면?

        0.586초 라는 JpaItemWriter에 비해 압도적인 성능을 보여줍니다.

        3-2. Auto Increment 성능

        그럼 Auto Increment일 경우엔 어떻게 될까요?

        @Test
        public void auto_increment_test_jdbc() throws Exception {
            //given
            JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriterBuilder<Person>()
                    .dataSource(dataSource)
                    .sql("insert into person(name) values (:name)")
                    .beanMapped()
                    .build();
        
            writer.afterPropertiesSet();
            List<Person> items = new ArrayList<>();
            for (long i = 0; i < TEST_COUNT; i++) {
                items.add(new Person( "foo" + i));
            }
        
            // when
            writer.write(items);
        }

        동일하게 1만건을 요청할 경우에도 마찬가지로 0.561초라는 결과를 보여줍니다.

        순수하게 단일 테이블의 등록면에 있어서는 Jdbc Batch Insert의 성능이 비교가 안될 정도로 좋다는 것을 알 수 있습니다.

        다만 무조건 많은 양의 row를 한번에 요청하는게 빠른 방법은 아닙니다.
        한번에 몇개의 insert value를 만들지 MySQL의 max_allowed_packet, Buffer Size, bulk_insert_buffer_size 등 여러 옵션들에 따라 상이하니 적절한 성능 테스트를 통해 값을 찾아야 합니다.

        3. 최종 비교

        최종적으로 Spring Batch ItemWriter들의 성능을 비교하면 다음과 같습니다.

        ItemWriter ModeNon Auto Increment (10,000 row)Auto Increment (10,000 row)

        ItemWriter Mode Non Auto Increment Auto Increament
        Jpa.Merge 2m 16s 1m 1s
        Jpa.Persist 1m 9s 1m 2s
        Jdbc Batch Insert 0.586s 0.586s

        순수하게 단일 테이블에 대량으로 등록할 경우 Jdbc의 Batch Insert 방식이 압도적인 성능을 보여줍니다.
        다만, 무조건 Jdbc Batch Insert 방식을 사용하기엔 아래와 같은 단점들이 있습니다.

        • OneToMany, ManyToMany와 같이 복잡한 Entity 관계가 insert가 필요할 경우 직접 구현해야할 부분이 너무나 많이 존재
        • 컴파일체크, 타입힌트, 자동완성등 유지보수가 어려운 개발 환경

        그래서 다음과 같이 혼합 방식을 선택하기도 합니다.

        이를 테면 OneToMany의 관계가 등록이 필요할 경우

        • 부모 Entity는 JpaItemWriter를 이용하여 ChunkSize별로 저장하여 PK값과 Entity를 확보
        • PK가 확보된 부모 Entity를 통해 자식 Entity들을 생성 (부모 ID값을 갖고 생성)
        • 자식 Entity들은 JdbcItemWriter를 통해 Jdbc Batch Insert

        와 같이 구현해서 처리하기도 합니다.

         

        (참고 : https://jojoldu.tistory.com/473)

        목차

          Chunk ? 

          Spring Batch에서의 Chunk란 데이터 덩어리로 작업 할 때 각 커밋 사이에 처리되는 row 수를 얘기합니다. 즉, Chunk 지향 처리란 한 번에 하나씩 데이터를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것을 의미합니다. 여기서 트랜잭션이라는게 중요한데요. Chunk 단위로 트랜잭션을 수행하기 때문에 실패할 경우엔 해당 Chunk 만큼만 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다는 것입니다. Chunk 지향 처리가 결국 Chunk 단위로 데이터를 처리한다는 의미이기 때문에 그림으로 표현하면 아래와 같습니다.

          Chunk 지향 프로세싱은 1000개의 데이터에 대해 배치 로직을 실행한다고 가정하면, Chunk 단위로 나누지 않았을 경우에는 한개만 실패해도 성공한 999개의 데이터가 롤백된다. Chunk 단위를 10으로 한다면, 작업 중에 다른 Chunk는 영향을 받지 않는다.
          • Reader에서 데이터 하나를 읽어 온다.(item 단위)
          • 읽어온 데이터를 Processor에서 가공한다.(item 단위)\
          • 가공된 데이터들을 별도의 공간에 모은 뒤 Chunk 단위만큼 쌓이게 되면 Writer에 전달하고 Writer는 일괄 저장한다. (Chunk = items)

          Reader와 Processor에서는 1건씩 다뤄지고, Writer에선 Chunk 단위로 처리된다는 것만 기억하시면 됩니다.

          Chunk 지향 처리를 Java 코드로 표현하면 아래처럼 될 것 같습니다.

          여기선 Reader, Processor에서는 1건씩 다뤄지고, Writer에서는 Chunk 단위로 처리된다는 것을 기억하면 된다.

          Chunk-oriented processing의 장점

          위에서 알아본 청크지향 프로세싱을 사용하지 않는다 하더라도 개발자가 충분히 비슷한 로직으로 구현을 할 수도 있습니다. 하지만 청크지향 프로세싱은 단순히 청크단위의 트랜잭션만 제공해주는것은 아닙니다.

          Spring batch 청크지향 프로세싱의 가장 큰 장점이라고 하면, 내결함성 (Falut tolernat)를 위한 다양한 기능들을 제공하고 있다는 것 입니다.

          멀티 스레드 환경에서 chunk 지향 불가

          각 Reader와 Writer의 Javadoc에 항상 저 thread-safe 문구가 있는지 확인해보셔야 합니다.
          만약 없는 경우엔 thread-safe가 지원되는 Reader 와 Writer를 선택해주셔야하며, 꼭 그 Reader를 써야한다면 SynchronizedItemStreamReader 등을 이용해 thread-safe로 변환해서 사용해볼 수 있습니다.

          그리고 또 하나 주의할 것은 멀티 쓰레드로 각 Chunk들이 개별로 진행되다보니 Spring Batch의 큰 장점중 하나인 실패 지점에서 재시작하는 것은 불가능 합니다. 이유는 간단합니다. 단일 쓰레드로 순차적으로 실행할때는 10번째 Chunk가 실패한다면 9번째까지의 Chunk가 성공했음이 보장되지만, 멀티쓰레드의 경우 1~10개의 Chunk가 동시에 실행되다보니 10번째 Chunk가 실패했다고 해서 1~9개까지의Chunk가 다 성공된 상태임이 보장되지 않습니다. 그래서 일반적으로는 ItemReader의 saveState 옵션을 false 로 설정하고 사용합니다.

          거래명세서 생성

          Step1 Step2 Step3 Step4
          고객 데이터 가져오기 거래 정보 데이터 가져오기 현재 잔액 계산하기 월별 고객 거래명세서 생성

          고객 데이터 가져오기

          • 스프링 배치는 스텝 내에서 사용 하는 여러 레코드 형식을 처리할 수 있는 기능을 제공한다.
          • 데이터를 읽을 수 있다면, 쓰기 처라할 때 오류를 최소하도록 ItemProcessor를 사용해 데이터 유효성을 검증한다.
          • 다음 적절한 ItemWriter 구현체를 사용해 레코드 유형에 따라 적절하게 데이터를 갱신한다.

          거래 정보 데이터 가져오기

          고객 데이터를 가져온 후 거래 데이터를 가져온다. 스프링 배치는 강력한 ItemReader 및 ItemWriter 구현체를 제공하므로, 이 스텝에서는 XML을 읽은 뒤 데이터베이스에 기록하는 구현체를 사용할 수 있다. 

          다음은 거래 XML 파일의 입력 예다.

          <?xml version="1.0" encoding="UTF-8" ?>
          <transaction>
          	<transaction>
              	<transactionId>1</transactionId/>
                  <accoutId>15</accoutId>
                  <credit>5.62</credit>
                  <debit>1.95</debit>
                  <timestamp>2017-07-12 12:05:21 </timestamp>
               </transaction>
               <transaction>
               	<transactionId>2</transactionId/>
                  <accoutId>68</accoutId>
                  <credit>5.27</credit>
                  <debit>6.26</debit>
                  <timestamp>2017-07-12 16:28:37 </timestamp>
               </transaction>
            .
            .
            .

          현재 잔액 계산하기

          거래 데이터를 가져온 뒤 계좌 테이블의 잔액도 갱신해야한다.

           

          월별 고객 거래명세서 생성

          • 먼저 ItemReader를 사용해 데이터베이스에서 고객 데이터를 읽어 온다.
          • 그 뒤 각 거래명세서 작성에 필요한 모든 데이터를 추가하는 일을 담당하는 ItemProcessor에게 해당 고객 데이터를 보낸다.
          • 그리고 최종적으로 필요한 모든 데이터의 추가가 완료된 아이템을 파일 기반 ItemWriter에게 전달한다.

           

           

          웹 프레임워크에는 MVC 패턴이라는 것이 있고 특정 MVC 프레임워크를 이해하고 나면 다른 MVC 프레임워크를 사용하더라도 서로 다른 문법적 차이만 이해하면 된다. 그러나 배치 프레임워크는 종류가 많지 않다. 그래서 배치 프레임워크는 분야가 조금 생소할 수 있다. 
          job(잡)이나 Step(스텝)이 무엇인지?
          ItemReader와 ItemWriter가 어떻게 연관돼 있는지
          Tasklet이 무엇인지?
          2장에서는 위의 질문에 대한 답을 할 것이다.


          잡과 스텝

          배치 잡

          ex) 은행업무
          Step1 : 다른 시스템에서 수신한 거래 정보 파일을 읽어와 데이터베이스에 저장한다.
          Step2 : 모든 입금 정보를 계좌에 반영한다.
          Step3 : 모든 출금정보를 계좌에 반영한다.

           

          각 스텝은 잡을 구성하는 독립된 작업의 단위라는 것을 알 수 있다.

          스탭

          • 테스크릿(tasklet) 기반 스텝
          • 청크(chunk) 기반 스텝

          테스크릿(tasklet) 기반 스텝

          스텝이 중지될 때까지 execute 메서드가 반속해서(execute 메서드를 호출할 때마다 독립적인 트랜잭션이 얻어짐) 수행 된다. 

          • 초기화
          • 저장 프로시저 실행 
          • 알람 전송

          청크(chunk) 기반 스텝

          약간 스텝의 구조가 복잡하며, 아이템 기반의 처리가 사용된다. 

          • ItemReader(필수)
          • ItemProcessor(선택)
          • ItemWriter(필수)

          ItemReader와 ItemWrier만으로 구성해 스텝을 진행 할 수도 있다.(이러한 스텝은 데이터 마이그레이션 잡에 일반적으로 사용된다.)

           

          배치 잡을 구성하는 인터페이스

          인터페이스 설명
          org.springframework.batch.core.Job ApplicationContext 내에 구성되는 잡 객체
          org.springframework.batch.core.Step ApplicationContext 내에 구성되는 스텝을 나타내는 객체
          org.springframework.batch.core.step.tasklet.Tasklet 트랜잭션 내에서 로직이 실행 될 수 있는 기능을 제공하는 전략(Strategy) 인터페이스
          org.springframework.batch.item.itemReader<T> 스텝 내에서 입력을 제공하는 전략 인터페이스
          org.springframework.batch.item.itemProccesor<T> 스텝 내에서 제공받은 개별 아이템(item)에 업무 로직, 검증 등을 적용하는 역할을 하는 인터페이스
          org.springframework.batch.item.itemWriter<T> 스텝 내에서 아이템을 저장하는 전략 인터페이스

           

          잡 실행

          JobRepository

          JobLauncher

          개발자가 구현하기에 따라 역할이 달라진다.

          • Job.execute메서드를 호출하는 역할 
          • 잡의 재실행 가능 여부 검증(모든 잡을 재시작할 수 있는 것은 아님)
          • 잡의 실행 방법(현재 스레드에서 수행할지 스레드 풀을 통해 실행할지 등)
          • 파라미터 유효성 검증

          잡을 실행하면, 해당 잡은 각 스텝을 실행한다. 각 스텝이 실행되면 JobRepository는 현재 상태로 갱신된다. 즉, 실행된 스텝, 현재 상태, 읽은 아이템 및 처리된 아이템 수 등이 모두 JobRepository에 저장된다.

           

          병렬화

          가장 단순한 방법은 배치 처리 아키텍처는 잡 내의 스텝을 처읍부터 끝까지 순서대로 단일 스레드에서 진행하는 것이다. 하지만 스프링 배치는 실 사례에 필요한 벙렬화 방버을 제공한다. 

          • 다중 스레드 스텝을 통한 작업 분할
          • 전체 스텝의 병렬 실행
          • 비동기 ItemProcessor/ItemWriter 구성
          • 원격 청킹
          • 파티셔닝

          package io.spring.batch.helloworld;
          
          import org.springframework.batch.core.Job;
          import org.springframework.batch.core.Step;
          import org.springframework.batch.core.StepContribution;
          import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
          import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
          import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
          import org.springframework.batch.core.scope.context.ChunkContext;
          import org.springframework.batch.core.step.tasklet.Tasklet;
          import org.springframework.batch.repeat.RepeatStatus;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.boot.SpringApplication;
          import org.springframework.boot.autoconfigure.SpringBootApplication;
          import org.springframework.context.annotation.Bean;
          
          //새롭게 추가1
          @EnableBatchProcessing
          @SpringBootApplication
          public class HelloWorldApplication {
          //새롭게 추가2
          	@Autowired
          	private JobBuilderFactory jobBuilderFactory;
          //새롭게 추가3
          	@Autowired
          	private StepBuilderFactory stepBuilderFactory;
          
          	@Bean
          	public Step step() {
          		return this.stepBuilderFactory.get("step1")
          				.tasklet(new Tasklet() {
          					@Override
          					public RepeatStatus execute(StepContribution contribution,
          							ChunkContext chunkContext) {
          						System.out.println("Hello, World!");
          						return RepeatStatus.FINISHED;
          					}
          				}).build();
          	}
          
          	@Bean
          	public Job job() {
          		return this.jobBuilderFactory.get("job")
          				.start(step())
          				.build();
          	}
          
          	public static void main(String[] args) {
          		SpringApplication.run(HelloWorldApplication.class, args);
          	}
          }

           

          @EnableBatchProcessing 에너테이션의 역할

          이 에너테이션은 배치 인프라스트럭처를 위한 대부분의 스프링 빈 정의를 제공하므로 다음과 같은 컴포넌트를 직접 포함시킬 필요는 없다

          컴포넌트 설명
          JobRepository 실행 중인 잡의 상태를 기록하는 데 사용됨
          JobLauncher 잡을 구동하는 데 사용됨
          JobExplorer JobRepository를 사용해 읽기 전용 작업을 수행하는 데 사용
          JobRegistry 특정한 런처 구현체를 사용할 때 잡을 찾는 용도로 사용됨
          PlatformTransactionManager 잡 진행 과정에서 트랜잭션을 다루는 데 사용됨
          JobBuilderFactory 잡을 생성하는 빌더
          StepBuilderFactory 스텝을 생성하는 빌더

           

          @SpringBootApplication 에너테이션의 역할

          @componenetScan과 @EnableAutoConfiguration을 결합한 메타 에너테이션이다. 이 에너텡션은 데이터 소스뿐만 아니라 스프링 부트 기반의 적절한 자동 구성을 만들어 준다. 

           

          public Step step()

          이 잡은 단일 스텝으로 구성되므로 간단하게 스텝이름만 지정한다. 스텝은 스프링 빈으로 구성됐으며, 이 간단한 예제에서는 두가지 요인인 이름 및 테스크릿만 필요하다.  System.out.println("Hello, World")을 호출한 다음에 RepeatStatus.FINISHED를 반환한다.

          RepeatStatus  
          RepeatStatus.FINISHED 테스크릿(Tasklet)이 완료됐음을 뜻함.
          RepeatStatus.CONTINUEBLE 테스크릿(Tasklet)을 다시 호출

          JobBuilderFactory

          잡 이름과 해당 잡에서 시작할 스텝을 구성한다.

           


          잡 실행하기

          잡은 첫 번째 스텝을 실행했다. 이때 트랜잭션이 시작하며, Tasklet이 실행됐고, 결과가 JobRepository에 갱신됐다. 

           

          배치처리 예시

          • 쇼핑몰에서 쿠폰이 포함된 이메일
          • 아마존과 같은 사이트에서 연관 상품을 추천하느 데이터 고하ㅏㄱ 모델 
          • 빅데이터 작업을 조정하는 일
          • 은행에서 생성되는 거래명세서와 퇴직연금명세서

          배치처리하면 좋은 이유

          필요한 모든 정보를 원하는 즉시 받아 볼 수는 없다.

          • 고객의 마음을 바꿔 주문을 취소할 때, 아직 배송하기 전이라면 훨씬 더 저렴한 비용으로 취소할 수 있다. 고객에게 몇 시간을 더 주면서 배치로 배송을 처러하면 소매업자는 많은 돈을 절약할 수 있다.(주문/거래/배송->정산)

          왜 자바로 배치를 처리하는 가?

          배치 처리 개발에 자바 및 오픈소를 사용해야 하는 이유 6가지

          1. 유지 보수성
          2. 유연성
          3. 확장성
          4. 개발 리소스
          5. 지원
          6. 비용

          스프링 배치 프레임워크 구조

          스프링 배치는 레이어(Layer) 구조로 조립된 세 개의 티어(Tier)로 이뤄져 있다. 

          어플리케이션 레이어
          [ 코어 / 인프라스트럭처 레이어 ]

          어플리케이션 레이어

          가장 바깥쪽에 위치하며, 배치 처리 구축에 사용되는 '모든 사용자 코드' 및 '구성'이 포함된다. '업무 로직 서비스' 등은 물론 '잡 구조'와 관련된 구성도 포함한다.

          가장 최상위에 있는 것이 아니라 코어/인프라스트럭처를 감싸고 있음에 주목하자. (그 이유는 개발자가 개발하는 대부분의 코드가 '코어 레이어'와 함께 동작하는 어플리케이션레어 레이어 이지만 때로는 커스텀 리더(Reader)나 커스텀 라이터(Writer)와 같이 인프라스트럭처의 일부를 만들기도 하기 때문이다.)

          이해가 안가지만 대충 어플리케이션 레이어는 코어 레이어랑 많이 노는데 가끔 인프라스트럭처랑도 논다. 이런 느낌

           

          무중단 처리 또는 상시 데이터 처리

          스프링 프레임 워크를 사용하면 큐(queue)에서 메시지를 읽은 뒤, 청크 단위로 배치 처리를 수행하는 과정을 끝없이 반복할 수 있다. 따라서 이러한 솔루션을 처음부터 개발하는 복잡한 상황을 이해할 필요 없이(Queue가 알아서해주기 때문에) 대량 데이터 처리량을 늘릴수 있다.

           

          Chunk(청크) 처리란?
          Spring Batch에서의 Chunk란 데이터를 덩어리로 작업 할 때 각 커밋 사이에 처리되는 row의 수 이다.
          즉, Chunk 지향 처리란 한 번에 하나씩 데이터를 읽어 Chunk라는 덩어리르 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것을 의미한다.

          - Reader에서 데이터를 하나 읽어옴
          - 읽어온 데이터를 Processor에서 가공
          - 가공된 데이터를 별도의 공간에 모은 뒤, Chunk 단위만큼 쌓이게 되면 Writer에 전달하고 Writer는 일괄 저장
          즉, Reader와 Processor에서는 1건씩 다뤄지고 Writer에서는 Chunk 단위로 처리된다.

          스프링으로 잡 정의하기

          1. 잡은 중단이나 상호작용 없이 처음부터 끝까지 실행되는 처리이다.
          2. 잡은 여러 개의 스텝이 모여 이뤄질 수 있다.
          3. 스텝이 실패했을 때 반복 실행할 수도 있고 못할 수도 있다.
          4. 잡의 플로우(flow)는 조건부일 수 있다. (예를들어 수익을 계산하는 스텝이 $1,000,000 이상의 수익을 반환할 때만 보너스 계산 스텝을 실행하는 경우)
          //AccountTasklet : 커스텀 컨포넌트
          @Bean
          public AccountTasklet accountTasklet(){
          	return new AccountTasklet();
          }
          
          @Bean
          public Job accountJob(){
          //AccountTasklet을 감싸는 스텝하나를 생성
          	Step accountStep =
              	this.stepBuilderFactory
                  	.get("accountStep")
                      .task(accountTasklet());
                      .build();
          //스텝을 감싸는 잡을 생성
               return this.jobBuilderFactory
               			.get("accountJob")
                          .start("accountStep")
                          .build();
          }

           

          AccountTasklet

          • 첫 번째 Bean은 AccountTasklet이다. 커스텀 컴포넌트이다. 스텝이 동작하는 동안에 비즈니스 로직을 수행한다. 스프링배치는 AccountTasklet이 완료될 때까지 단일 메서드(execute 메서드)를 반복해서 호출하는데, 이때 각각은 새 트렌잭션으로 호출된다.
          • 두 번째 Bean은 스프링 배치 잡이다. 이 Bean 정의 내에서는 팩토리가 제공하는 빌더를 사용해, 조금 전 정의했던 AccountTasklet을 감싸는 스텝하나를 생성한다. 그런  다음에 잡 빌더를 사용해 스텝을 감싸는 잡을 생성한다. 스프링 부트(Spring Boot는 애플리케이션 기동 시에 이 잡을 찾아내 자동으로 실행시킨다.

          'Spring > Spring Batch' 카테고리의 다른 글

          스프링배치_완벽가이드_3장_거래명세서 생성  (0) 2021.07.15
          스프링배치_완벽가이드_2장 스프링 배치  (0) 2021.07.14
          Spring Batch - 0 개념  (0) 2021.01.27
          Spring Batch - 2 구조  (0) 2021.01.25
          Spring Batch - 1  (0) 2021.01.19

          + Recent posts