본문 바로가기

개발/Spring

단일 폴링 스케줄러를 전용 ThreadPool로 고도화하기

728x90
반응형

레디스 큐에서 작업을 하나씩 처리하던 구조를 전용 실행 풀(ThreadPool) 기반 병렬 파이프라인으로 바꿔 처리량과 지연을 동시에 개선한 사례를 정리해보려고 합니다.

배경 / 문제

기존 구조

  • @Scheduled 1개 스레드가 주기적으로 Redis 큐에서 한 건씩 꺼내 처리

문제점

  • 트래픽 상승 시 큐 적체 증가 → 응답/완료 지연
  • 스케줄러 스레드가 오래 점유되면 다음 틱 지연 (fixedDelay라도 실질 주기 늘어남)
  • 처리량, 동시성, 백프레셔(backpressure) 제어가 어려움

위와 같은 배경과 문제로 저는 아래와 같이 목표를 정했습니다.

  • 처리량 확장(Throughput↑)과 대기시간 감소(Queue wait↓)
  • 안전한 백프레셔(과부하 시 자연 감속 or 재큐잉)
  • 장애 시 at-least-once 보장(손실 없이 재처리)
  • 운영 가시성: 처리량/실패율/대기시간/동시 처리 수 모니터링

구현

1) 실행 풀 설정 (전용 ThreadPool)

  • AbortPolicy 사용으로 거부 시 예외를 던지므로 호출부에서 큐에 되돌리기(at-least-once) 처리가 수월합니다. 만약 감속이 필요하다면 CallerRunsPlicy 로 변경하여 사용해도 괜찮습니다.(스케줄러 스레드가 직접 실행 → 다음 dequeue가 늦어져 과도한 투입 방지)
@Configuration
public class ExecutorConfig {

  @Value("${image.executor.thread-name-prefix:image-}")
  private String prefix; // 모니터링/디버깅용 명명
  @Value("${image.executor.core:2}")
  private int core; // 최소 스레드
  @Value("${image.executor.max:4}")
  private int max; // 최대 스레드(피크 흡수)
  @Value("${image.executor.queue:300}")
  private int queue; // 대기 큐 용량
  @Value("${image.executor.keep-alive-seconds:30}")
  private int keepAliveSeconds; // core 초과 유휴 스레드 생존 시간

  /**
   * 전용 스레드풀
   * - AbortPolicy: 거부 시 예외 → 호출부에서 재큐잉(손실 방지)
   * - 종료 시 현재 작업 완료까지 대기
   */
  @Bean("imageExecutor")
  public ThreadPoolTaskExecutor imageExecutor() {
    ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor();
    ex.setThreadNamePrefix(prefix);
    ex.setCorePoolSize(core);
    ex.setMaxPoolSize(max);
    ex.setQueueCapacity(queue);
    ex.setKeepAliveSeconds(keepAliveSeconds);
    ex.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.AbortPolicy());
    ex.setWaitForTasksToCompleteOnShutdown(true);
    ex.initialize();
    return ex;
  }
}

2) 워커(스케줄러 → 전용 풀 제출)

@Component
@RequiredArgsConstructor
public class QueueWorker {

  private final StringRedisTemplate redis;
  private final ThreadPoolTaskExecutor imageExecutor;
  private static final int BATCH_SIZE = 50; // 틱당 최대 제출 수

  @Scheduled(fixedDelay = 200)
  public void processQueue() {
    var pool = imageExecutor.getThreadPoolExecutor();
    int remainingQueue = pool.getQueue().remainingCapacity();
    int freeThreads    = pool.getMaximumPoolSize() - pool.getActiveCount();

    // 풀/큐 여유 내에서만 제출(백프레셔)
    int slots = Math.max(0, Math.min(BATCH_SIZE, remainingQueue + Math.max(0, freeThreads)));
    if (slots == 0) return;

    for (int i = 0; i < slots; i++) {         
      String payload = redis.opsForList().leftPop("QUEUE_KEY");
      if (payload == null) {
          break; 
      } 

      try {
        imageExecutor.execute(() -> {
          try {
            // 실제 작업 처리 (예: 이미지 합성)
            process(payload);
          } catch (Exception e) {
            // 실패 집계 및 재시도 정책은 도메인에 맞게
            // e.g. mark fail, status=FAIL, DLQ 등
          }
        });
      } catch (Exception rejected) {
        // 경쟁으로 거부되면 되돌려놓고 종료(손실 방지)
        redis.opsForList().leftPush("QUEUE_KEY", payload);
        break;
      }
    }
  }

  private void process(String payload) {
    // 도메인 로직: 멱등성 보장(DONE skip), 예외는 상위에서 카운팅
  }
}

기존 모니터링툴로 grafana 사용중이며, k6 부하테스트 도구를 사용하여 테스트 진행했을 때 똑같은 가상 유저를 두고 진행했을 때 아래와 같이 개선효과를 볼 수 있었습니다. (큐 대기시간 약 8초 → 약 0.05초)

 

단일 폴링 스케줄러 (전)

 

ThreadPool 을 사용한 병렬 처리 (후)

728x90
반응형