728x90
반응형
레디스 큐에서 작업을 하나씩 처리하던 구조를 전용 실행 풀(ThreadPool) 기반 병렬 파이프라인으로 바꿔 처리량과 지연을 동시에 개선한 사례를 정리해보려고 합니다.
배경 / 문제
기존 구조
- @Scheduled로 1개 스레드가 주기적으로 Redis 큐에서 한 건씩 꺼내 처리
문제점
- 트래픽 상승 시 큐 적체 증가 → 응답/완료 지연
- 스케줄러 스레드가 오래 점유되면 다음 틱 지연 (fixedDelay라도 실질 주기 늘어남)
- 처리량, 동시성, 백프레셔(backpressure) 제어가 어려움
위와 같은 배경과 문제로 저는 아래와 같이 목표를 정했습니다.
- 처리량 확장(Throughput↑)과 대기시간 감소(Queue wait↓)
- 안전한 백프레셔(과부하 시 자연 감속 or 재큐잉)
- 장애 시 at-least-once 보장(손실 없이 재처리)
- 운영 가시성: 처리량/실패율/대기시간/동시 처리 수 모니터링
구현
1) 실행 풀 설정 (전용 ThreadPool)
AbortPolicy사용으로 거부 시 예외를 던지므로 호출부에서 큐에 되돌리기(at-least-once) 처리가 수월합니다. 만약 감속이 필요하다면CallerRunsPlicy로 변경하여 사용해도 괜찮습니다.(스케줄러 스레드가 직접 실행 → 다음 dequeue가 늦어져 과도한 투입 방지)
@Configuration
public class ExecutorConfig {
@Value("${image.executor.thread-name-prefix:image-}")
private String prefix; // 모니터링/디버깅용 명명
@Value("${image.executor.core:2}")
private int core; // 최소 스레드
@Value("${image.executor.max:4}")
private int max; // 최대 스레드(피크 흡수)
@Value("${image.executor.queue:300}")
private int queue; // 대기 큐 용량
@Value("${image.executor.keep-alive-seconds:30}")
private int keepAliveSeconds; // core 초과 유휴 스레드 생존 시간
/**
* 전용 스레드풀
* - AbortPolicy: 거부 시 예외 → 호출부에서 재큐잉(손실 방지)
* - 종료 시 현재 작업 완료까지 대기
*/
@Bean("imageExecutor")
public ThreadPoolTaskExecutor imageExecutor() {
ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor();
ex.setThreadNamePrefix(prefix);
ex.setCorePoolSize(core);
ex.setMaxPoolSize(max);
ex.setQueueCapacity(queue);
ex.setKeepAliveSeconds(keepAliveSeconds);
ex.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.AbortPolicy());
ex.setWaitForTasksToCompleteOnShutdown(true);
ex.initialize();
return ex;
}
}
2) 워커(스케줄러 → 전용 풀 제출)
@Component
@RequiredArgsConstructor
public class QueueWorker {
private final StringRedisTemplate redis;
private final ThreadPoolTaskExecutor imageExecutor;
private static final int BATCH_SIZE = 50; // 틱당 최대 제출 수
@Scheduled(fixedDelay = 200)
public void processQueue() {
var pool = imageExecutor.getThreadPoolExecutor();
int remainingQueue = pool.getQueue().remainingCapacity();
int freeThreads = pool.getMaximumPoolSize() - pool.getActiveCount();
// 풀/큐 여유 내에서만 제출(백프레셔)
int slots = Math.max(0, Math.min(BATCH_SIZE, remainingQueue + Math.max(0, freeThreads)));
if (slots == 0) return;
for (int i = 0; i < slots; i++) {
String payload = redis.opsForList().leftPop("QUEUE_KEY");
if (payload == null) {
break;
}
try {
imageExecutor.execute(() -> {
try {
// 실제 작업 처리 (예: 이미지 합성)
process(payload);
} catch (Exception e) {
// 실패 집계 및 재시도 정책은 도메인에 맞게
// e.g. mark fail, status=FAIL, DLQ 등
}
});
} catch (Exception rejected) {
// 경쟁으로 거부되면 되돌려놓고 종료(손실 방지)
redis.opsForList().leftPush("QUEUE_KEY", payload);
break;
}
}
}
private void process(String payload) {
// 도메인 로직: 멱등성 보장(DONE skip), 예외는 상위에서 카운팅
}
}
기존 모니터링툴로 grafana 사용중이며, k6 부하테스트 도구를 사용하여 테스트 진행했을 때 똑같은 가상 유저를 두고 진행했을 때 아래와 같이 개선효과를 볼 수 있었습니다. (큐 대기시간 약 8초 → 약 0.05초)
단일 폴링 스케줄러 (전)

ThreadPool 을 사용한 병렬 처리 (후)

728x90
반응형
'개발 > Spring' 카테고리의 다른 글
| Spring Boot + Redis Queue 비동기 작업을 FIFO로 처리하기 (0) | 2025.09.04 |
|---|---|
| Micrometor & Metrics (2) | 2025.06.05 |
| Spring Boot Actuator (2) | 2025.06.05 |
| @Transactional 내부에서 어떻게 동작할까? (0) | 2025.06.04 |
| [MyBatis] if - else 사용하기 (choose) (0) | 2021.10.12 |