Notice
Recent Posts
Recent Comments
Link
반응형
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
Tags more
Archives
Today
Total
관리 메뉴

David의 블로그

[Spring Boot]대량의 데이터를 한번에 처리하는 Spring Batch. 세번째 본문

프로그래밍/Spring Boot

[Spring Boot]대량의 데이터를 한번에 처리하는 Spring Batch. 세번째

David 리 2025. 4. 16. 17:08
반응형

저번 시간에는 직접 Spring Batch 스케줄링 컨피그 파일을 직접 만들었다.

2025.04.15 - [프로그래밍/Spring Boot] - [Spring Boot] 대량의 데이터를 한번에 처리하는 Spring Batch. 두번째

 

[Spring Boot] 대량의 데이터를 한번에 처리하는 Spring Batch. 두번째

2025.04.15 - [프로그래밍/Spring Boot] - [Spring Boot] 대량의 데이터를 한번에 처리하는 Spring Batch. 첫번째 [Spring Boot] 대량의 데이터를 한번에 처리하는 Spring Batch. 첫번째"Spring Batch"는 대량의 데이터를 효

78alswo.tistory.com

 

 

이번시간에는 Batch 컨피그 파일을 만들어 볼 것이다.

이전에 나는 Job 2개를 만들었다.

requestCrawlerJob은 Http통신 요청을 행하는 Job이고, batchMstCRUDJob은 최종테이블에 CRUD작업을 하는 Job이다.

 

먼저 Batch컨피그 파일을 생성한다.

```FirstBatch```

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
 
@Configuration
@EnableBatchProcessing
@Slf4j
public class FirstBatch {
    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;
 
    private final JobMstRepository jobMstRepository;
    private final BatchJobMstRepository batchJobMstRepository;
    
    public FirstBatch(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager,
            JobMstRepository jobMstRepository, BatchJobMstRepository batchJobMstRepository) {
 
        this.jobRepository = jobRepository;
        this.platformTransactionManager = platformTransactionManager;
        this.jobMstRepository = jobMstRepository;
        this.batchJobMstRepository = batchJobMstRepository;
    }
    
    // OkHttpClient 연결/리드/읽기 타임아웃 1분으로 지정.
    private final OkHttpClient client = new OkHttpClient.Builder()
            .connectTimeout(60, TimeUnit.SECONDS) // 연결 타임아웃
            .readTimeout(60, TimeUnit.SECONDS)    // 읽기 타임아웃
            .writeTimeout(60, TimeUnit.SECONDS)   // 쓰기 타임아웃
            .build();
    
    // Job_mst Http통신요청부터 배치 시작!
    @Bean
    public Job requestCrawlerJob() {
        return new JobBuilder("requestCrawlerJob")
                .repository(jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(requestStep())
                .build();
    }
 
    @Bean
    public Step requestStep() {
        return new StepBuilder("requestStep")
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .<String, Response>chunk(1// URL 하나씩 처리
                .reader(urlReader())
                .processor(httpRequestProcessor())
                .writer(httpResponseWriter())
                .build();
    }
 
    @Bean
    public ItemReader<String> urlReader() {
        List<String> urls = List.of(
            "yourHttpRequestUrl1.do",
            "yourHttpRequestUrl2.do",
            
        );
 
        return new ListItemReader<>(urls);
    }
 
    @Bean
    public ItemProcessor<String, Response> httpRequestProcessor() {
        return new ItemProcessor<String, Response>() {
            @Override
            public Response process(String url) throws Exception {
                // HTTP 요청을 수행하는 로직
                Response response = performHttpRequest(url);
                return response;
            }
 
            private Response performHttpRequest(String url) throws IOException {
                // HTTP 요청을 수행하고 응답을 반환하는 로직 구현
                // 예: HttpClient를 사용하여 GET 요청 수행
                // HttpResponse response = httpClient.execute(new HttpGet(url));
                // return response;
                
                Request request = new Request.Builder()
                        .url(url)
                        .build();
 
                try (Response response = client.newCall(request).execute()) {
                    if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
                    // 응답 처리
                } catch (IOException e) {
                    log.info("httpRequestProcessor Request통신중에 에러가 발생했습니다. url = {} , message = {}", url, e.getMessage());
//                    System.err.println("Error fetching data from: " + url + " - " + e.getMessage());
                }
                return client.newCall(request).execute();
                 // 실제 응답 객체로 대체
            }
        };
    }
 
    @Bean
    public ItemWriter<Response> httpResponseWriter() {
        return new ItemWriter<Response>() {
            @Override
            public void write(List<extends Response> items) throws Exception {
                for (Response response : items) {
                    // 응답을 처리하는 로직
                    // 예: 데이터베이스에 저장하거나 로그에 기록
                    log.info("Response: {}", response);
//                    System.out.println("Response: " + response);
                }
            }
        };
    }
    ///////////////// requestCrawlerJob step여기까지.
    
    
    
    @Bean
    public Job batchMstCRUDJob() {
        return new JobBuilder("batchMstCRUDJob")
                .repository(jobRepository)
                //.incrementer(new RunIdIncrementer())                     // RunIdIncrementer 추가
                .start(batchMstDelete())
                .next(firstStep())
                .build();
    }
    
    @Bean
    public Step batchMstDelete() {
        return new StepBuilder("batchMstDelete")
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)            // 트랜잭션 매니저 설정
                .<Batch_output_job_mst, Batch_output_job_mst> chunk(10)
                .reader(bathchMstReader())
                .writer(bathchMstWriter())
                .build();    
    }
    
    @Bean
    public RepositoryItemReader<Batch_output_job_mst> bathchMstReader() {
        return new RepositoryItemReaderBuilder<Batch_output_job_mst>()
                .name("beforeReader")
                .pageSize(10)
                .methodName("findAll")
                .repository(batchJobMstRepository)
                .sorts(Map.of("id", Sort.Direction.ASC))
                .build();
    }
    
    @Bean
    public ItemWriter<Batch_output_job_mst> bathchMstWriter() {
        return new ItemWriter<Batch_output_job_mst>() {
 
            @Override
            public void write(List<extends Batch_output_job_mst> items) throws Exception {
                // TODO Auto-generated method stub
                 batchJobMstRepository.deleteAll();
            }
        };
    }
    
    @Bean
    public Step firstStep() {
        return new StepBuilder("firstStep")
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)            // 트랜잭션 매니저 설정
                .<Job_mst, Batch_output_job_mst> chunk(10)
                .reader(beforeReader())
                .processor(middleProcessor())
                .writer(afterWriter())
                .build();    
    }
    
    
    @Bean
    public RepositoryItemReader<Job_mst> beforeReader() {
        return new RepositoryItemReaderBuilder<Job_mst>()
                .name("beforeReader")
                .pageSize(10)
                .methodName("findAll")
                .repository(jobMstRepository)
                .sorts(Map.of("id", Sort.Direction.ASC))
                .build();
    }
    
    @Bean
    public ItemProcessor<Job_mst, Batch_output_job_mst> middleProcessor() {
        // 중복된 annoId를 추적하기 위한 Set
        Set<Long> processedAnnoIds = new HashSet<>();
        
        return new ItemProcessor<Job_mst, Batch_output_job_mst>() {    
            @Override
            public Batch_output_job_mst process(Job_mst item) throws Exception {
                
                // 현재 item의 annoId가 이미 처리된 것인지 확인
                if (processedAnnoIds.contains(item.getAnnoId())) {
                    // 중복된 경우 null 반환하여 후속 처리에서 건너뛰게 함
                    return null;
                }
 
                // 중복이 아닌 경우, annoId를 Set에 추가
                processedAnnoIds.add(item.getAnnoId());
                
                Batch_output_job_mst afterEntity = new Batch_output_job_mst();
                afterEntity.setId(item.getId());
                afterEntity.setCompanyCd(item.getCompanyCd());
                afterEntity.setAnnoId(item.getAnnoId());
                afterEntity.setClassCdNm(item.getClassCdNm());
                afterEntity.setEmpTypeCdNm(item.getEmpTypeCdNm());
                afterEntity.setAnnoSubject(item.getAnnoSubject());
                    
                afterEntity.setSubJobCdNm(item.getSubJobCdNm());
                afterEntity.setSysCompanyCdNm(item.getSysCompanyCdNm());
                afterEntity.setJobDetailLink(item.getJobDetailLink());
                afterEntity.setWorkplace(item.getWorkplace());
                afterEntity.setBatchDate(getTodayDate());
                
                return afterEntity;
            }
        };
    }
    
    @Bean
    public RepositoryItemWriter<Batch_output_job_mst> afterWriter() {
 
        return new RepositoryItemWriterBuilder<Batch_output_job_mst>()
                .repository(batchJobMstRepository)
                .methodName("save")
                .build();
    }
    
    /**
     * <p>Batch_output_job_mst 엔터티에 데이터 삽입시 batchDate는 현재시간을 추출해서 set한다.</p>
     * @author JaCob LEE
     * */
    private String getTodayDate() {
        LocalDateTime today = LocalDateTime.now(); // 현재 날짜와 시간으로 설정
        // 포맷 정의
        DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        return today.format(dateFormat);
    }
    
    
}
 
cs

상단줄 @EnableBatchProcessing은 Batch작업을 설정하고 구성하는데 필요한 여러 기능을 활성화하는 어노테이션이다.

22 ~ 26번줄은 default 타임아웃이 존재해 1분으로 설정한것을 확인할 수 있다.

 

43번줄 chunk에 1을 할당했는데, 데이터 처리 단위를 의미하며, 한번에 처리할 데이터의 수를 의미한다.

예를들어 총 데이터가 100개이고 chuuk를 10으로 설정했다면, 10개씩 10번 진행하겠다는 것이다.

나는URL당 1나씩 처리하기위해 1로 설정했다.

설정은 마음대로 설정할 수 있으나,  너무적게하면 오버헤드가 발생하고, 크게 설정하면 메모리 가득차니 

생각하고 설정하면 되겠다.

 

62번줄 ItemProcessor<String, Response>의미는 input을 ItemReader()에 읽어들인 String타입으로 받고,
Response로 output을 하겠다는 뜻이다.

그리고 쓰기애 해당하는 ItemWriter<Response>에서 output한 Response데이터를 input으로 받고있다.

 

이렇게 Batch 컨피그 파일까지 완성했다.

스케줄러 크론식으로 매일 오후 6시로 설정했으니, 해당 시간에 배치가 실행되는지 확인해보면 되겠다.

 

 

뭔가 훅훅 지나간 느낌이 들지만, 

궁금한점이나 정보가 부족한 점이 있다면 댓글에 한번 남겨주길 바란다.

 

정보가 유익하셨다면 좋아요와 댓글 부탁 드립니다. 감사합니다.

반응형