Consuemr는 데이터를 가져가도 kafka 내에서 데이터가 사라지지 않음
-> 아주 유용한 유연성을 제공
Consumer 의 역할
- topic의 파티션으로부터 데이터 polling
- partition offset 위치 기록(커밋)
- Consumer 그룹을 통해 병렬 처리
# 토픽 파티션에서 레코드 조회
prop - 옵션 설정 변수
group 1 - > 컨슈머 그룹 설정
consumer.subscribe - 읽을 토픽 설정
polling 루프
- poll 메서드가 포함된 무한 루프
poll() 설정한 시간동안 기다리고 데이터를 가져온다
이 때 데이터가 없다면 그냥 빈값을 반환
가장 작은 단위 : record
# 파티션 - 그룹 단위 할당
파티션의 개수보다 컨슈머의 개수가 많아지면, 컨슈머는 놀게된다
즉, 컨슈머의 개수가 늘어나게 된다면, 파티션도 늘려야한다.
컨슈머 그룹이 다르면 데이터를 읽어오는 순서는 보장되는가
각 컨슈머 그룹별로 파티션에 읽어오는 offset의 정보는 나누어 저장되기 때문에 순서가 보장된다.
# 커밋과 오프셋
Consumer.poll() 할 때마다 오프셋과 커밋이 바뀌게 되고 그다음 poll 때 해당 오프셋을 기준으로 poll
Producer의 데이터 넣는 속도보다 Consumer가 읽어가는 속도가 더 빠르다면?
Consumer Lag
= 프로듀서가 마지막으로 넣은 offset - Consumer의 마지막 offset
현재 파이프라인의 프로듀서와 컨슈머의 상태를 모니터링으로 관찰이 가능
토픽내에 여러 파티션이 존재한다면
파티션 별로 컨슈머 랙은 여러 값이 존재할 수 있다.
이 때 컨슈머 랙 중 가장 높은 숫자의 값 = records-lag-max
그러면 맨처음은 어떻게 해?
컨슈머 설정 값들
서버 = 브로커
offset 정의를 통해 - topic에 partitiong하고 몇번 오프셋까지 커밋햇는지 확인 및
commitSync 를 통해 중복은 제거가 가능 하지만 순서 보장 X
비동기기 때문에 바로 실패의 여부를 알 수가 없다 .
# 컨슈머 리밸런스
- 컨슈머 그룹의 파티션 소유권이 변경될 때 일어남
- 리밸런스하는 동안에는 일시적으로 메시지를 가져올 수 없다.
- 리밸런스 발생시, 데이터 유실 / 중복 발생 가능성 존재
- commitSync() 또는 추가적인 방법으로 데이터 유실/ 중복 방지
- 리밸런스 발생 상황
- consumer.close() 혹은 consumer 세션 끊어질 때
# RebalancedListner 를 통해
- Rebalanced 하는 상황에서 운영상으로 모니터링이 가능하다.
# 재처리와 순서
- 동일 메시지 조회 가능성
- 일시적 커밋 실패, 리밸런스 등에 의해 발생한다.
- 컨슈머는 멱등성을 고려해야한다
- ex) [조회수 1+ -> 좋아요 1+ -> 조회수 1 ] 메시지를 재처리 할 경우
- 단순 재처리시, 조회수는 2가 아닌 4가 될 수 있다
- 데이터의 특성에 따라 타임스탬프와 일련 번호 등을 활용하면 좋다. (중복처리를 제거하는 방법)
- 파티션 내에서만 메시지 순서 보장이 처리가 되기 때문에
- 컨슈머가 여러개인 경우에는 저장된 데이터를 timestamp를 통해 order by하면된다.
# 컨슈머 종료 처리
정상동작에서의 로직
이러고 다시 켜면 100번~ 150이 커밋이 안되서 다시 100번부터 시작
Rutime.getRuntime().addShutDownHook(new Thread(){
public void run(){
consumer.wakeup();
}
});
try {
while(true){
...
consummer.commitSync();
}
} catch(WakeupException e){
...
}finally{
consumer.commitSync();
consumer.close();
}
주의 사항
카프카 컨슈머는 스레드에 안전하지 않다
- 여러 스레드에서 동시에 사용하지 않도록 주의
- wakeup() 메서드는 예외
# Consumer Thread 전략
'kafka' 카테고리의 다른 글
Kafka Partition Replication, ISR 그리고 Producer Acks (1) | 2024.01.07 |
---|---|
Kafka Streams (0) | 2022.01.20 |
kafka - Producer (0) | 2022.01.18 |
Kafka의 구조 (0) | 2022.01.17 |