[2탄] Consumer가 죽으면 메시지는 어디 있을까
[2탄] Consumer가 죽으면 메시지는 어디 있을까
시리즈 [블로그 프로젝트] 알림 구조 개선 3편
- [1탄] 멀티인스턴스 환경을 고려한 구독 알림 구조 개선
- [2탄] Consumer가 죽으면 메시지는 어디 있을까
- [3탄] SSE 알림, 왜 가끔 안 올까 — Redis Pub/Sub으로 멀티 인스턴스 브로드캐스트 구현
개요
1탄에서는 Redis Streams를 도입해 알림 이벤트를 멀티 인스턴스 환경에서 처리할 수 있도록 구조를 변경했다. 이번 글에서는 Consumer가 처리에 실패했을 때 메시지를 유실하지 않고 다시 처리하는 방법을 정리해보려고 한다.
Consumer가 실패하면 메시지는 어디에 남아있을까?
Redis Streams는 PEL(Pending Entry List)라는 별도의 영역을 제공한다. PEL은 쉽게 말해 Consumer에게 전달됐지만 아직 처리 완료되지 않은 메시지 목록이다.
Consumer가 메시지를 읽는 순간, 해당 메시지는 단순히 Stream에서 사라지는 것이 아니라 PEL에 함께 등록되면서 “처리 중 상태”로 관리된다.
흐름
1
2
3
4
5
6
7
8
9
10
11
12
1. 메시지가 생성되어 Stream에 저장된다
2. Consumer가 메시지를 가져간다
→ 이때 해당 메시지는 "처리 중" 상태로 관리된다 (PEL에 등록)
3. Consumer가 처리를 완료한다
→ "처리 완료" 신호를 보내면 (ACK)
→ PEL에서 제거된다
4. 만약 Consumer가 실패하면
→ 메시지는 처리되지 않은 상태로 PEL에 남는다
※ 메시지는 Stream에는 계속 보관된다
실패한 메시지 재처리하기
PEL에 남은 메시지는 자동적으로 재처리 되지 않기 때문에 재처리 로직을 구현해야 한다. 재처리 로직을 구현 하기 전 두가지를 먼저 정했다.
- 언제 재처리할 것인가?
- 실패 직후 즉시 재시도하면 같은 이유로 또 실패할 가능성이 높음
- 5분 이상 대기 상태인 메시지를 1분마다 확인하는 방식 사용
- 무한 재시도 문제는 어떻게 해결할것인가?
- 데이터 자체에 문제가 있는 경우 반복해도 계속 실패하게 됨
- 최대 3회를 초과하면 Dead Letter 테이블에 저장
구현
Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void onMessage(MapRecord<String, String, String> message) {
if (isSkippable(message)) {
ack(message.getId());
log.info("처리 대상 아닌 메시지 스킵 - messageId={}, value={}", message.getId(), message.getValue());
return;
}
try {
Long subscriberId = Long.valueOf(message.getValue().get("subscriberId"));
Long targetId = Long.valueOf(message.getValue().get("targetId"));
String messageId = message.getId().getValue();
processor.process(subscriberId, targetId, messageId);
ack(message.getId());
} catch (Exception e) {
log.error("구독 스트림 메시지 처리 실패 - messageId={}", message.getId(), e);
}
}
PEL-재처리 스케쥴러
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.MINUTES)
public void reclaimPendingMessages() {
List<MapRecord<String, Object, Object>> pendingMessages = redisTemplate.opsForStream()
.claim(SubscribedStreamPublisher.STREAM_KEY,
CONSUMER_GROUP,
CONSUMER_NAME,
MIN_IDLE_TIME);
if (pendingMessages.isEmpty()) return;
log.info("PEL 재처리 시작 - pendingCount={}", pendingMessages.size());
for (MapRecord<String, Object, Object> message : pendingMessages) {
processReclaimedMessage(message);
}
}
PEL-재처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void processReclaimedMessage(MapRecord<String, Object, Object> message) {
long deliveryCount = getDeliveryCount(message.getId());
// 최대 재시도 횟수를 넘으면 Dead Letter 처리
if (deliveryCount > MAX_RETRY_COUNT) {
deadLetter(message, deliveryCount);
return;
}
try {
Long subscriberId = Long.valueOf((String) message.getValue().get("subscriberId"));
Long targetId = Long.valueOf((String) message.getValue().get("targetId"));
String messageId = message.getId().getValue();
processor.process(subscriberId, targetId, messageId);
ack(message.getId());
log.info("PEL 재처리 성공 - messageId={}, deliveryCount={}", message.getId(), deliveryCount);
} catch (Exception e) {
log.error("PEL 재처리 실패 - messageId={}, deliveryCount={}", message.getId(), deliveryCount, e);
}
}
Dead Letter - 처리 불가 메시지 저장
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void deadLetter(MapRecord<String, Object, Object> message, long deliveryCount) {
try {
deadLetterRepository.save(StreamDeadLetter.of(
SubscribedStreamPublisher.STREAM_KEY,
message.getId().getValue(),
(String) message.getValue().get("subscriberId"),
(String) message.getValue().get("targetId"),
(int) deliveryCount
));
// Dead Letter 테이블 저장 후 ACK 처리 하여 PEL에서 삭제
ack(message.getId());
log.error("Dead Letter 저장 완료 - messageId={}, deliveryCount={}", message.getId(), deliveryCount);
} catch (Exception e) {
log.error("Dead Letter 저장 실패 - messageId={}", message.getId(), e);
}
}
전체 흐름
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
처리 실패 → ACK 없음 → PEL 잔류
│
▼ (1분 간격 스케줄러)
5분 이상 idle 메시지 claim
│
┌───────┴────────┐
횟수 <= 3 횟수 > 3
│ │
재처리 시도 Dead Letter 저장 → ACK → PEL 제거
┌────┴────┐
성공 실패
│ │
ACK PEL 잔류
│ (다음 스케줄러에서 재시도)
PEL 제거
마무리
Redis Streams는 단순히 메시지를 전달하는 것을 넘어, PEL을 통해 처리 상태까지 관리할 수 있는 구조를 제공한다. 덕분에 Consumer가 실패하더라도 메시지를 유실하지 않고 안전하게 재처리할 수 있었다.
다음 글에서는 아직 남아 있는 문제를 다룰 예정이다. 알림 DB 저장까지는 해결됐지만, SSE 전송은 여전히 멀티 인스턴스 환경에서 제대로 동작하지 않는다. 1탄과 비슷한 문제지만, SSE 전송 특성상 이번엔 Redis Pub/Sub을 사용할 것 같다.
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.