kafka -  Consumer
kafka

kafka - Consumer

728x90
반응형

Consuemr는 데이터를 가져가도 kafka 내에서 데이터가 사라지지 않음 

-> 아주 유용한 유연성을 제공 

 

Consumer 의 역할

- topic의 파티션으로부터 데이터 polling

- partition offset 위치 기록(커밋)

- Consumer 그룹을 통해 병렬 처리 

 

 

# 토픽 파티션에서 레코드 조회

 

 

prop - 옵션 설정 변수

group 1 - > 컨슈머 그룹 설정 

consumer.subscribe - 읽을 토픽 설정 

 

polling 루프 

- poll 메서드가 포함된 무한 루프 

poll() 설정한 시간동안 기다리고 데이터를 가져온다 

이 때 데이터가 없다면 그냥 빈값을 반환

 

가장 작은 단위 :  record 

 

# 파티션 - 그룹 단위 할당 

 

파티션의 개수보다 컨슈머의 개수가 많아지면, 컨슈머는 놀게된다 

즉, 컨슈머의 개수가 늘어나게 된다면, 파티션도 늘려야한다.

 

 

컨슈머 그룹이 다르면 데이터를 읽어오는 순서는 보장되는가

각 컨슈머 그룹별로 파티션에 읽어오는 offset의 정보는 나누어 저장되기 때문에 순서가 보장된다. 

 

# 커밋과 오프셋

 

Consumer.poll() 할 때마다 오프셋과 커밋이 바뀌게 되고 그다음 poll 때 해당 오프셋을 기준으로 poll

 

 

 

Producer의 데이터 넣는 속도보다 Consumer가 읽어가는 속도가 더 빠르다면?

 

Consumer Lag 

=  프로듀서가 마지막으로 넣은 offset - Consumer의 마지막 offset 

현재 파이프라인의 프로듀서와 컨슈머의 상태를 모니터링으로 관찰이 가능 

 

토픽내에 여러 파티션이 존재한다면 

파티션 별로 컨슈머 랙은 여러 값이 존재할 수 있다.

이 때 컨슈머 랙 중 가장 높은 숫자의 값 = records-lag-max 

 

 

 

 

그러면 맨처음은 어떻게 해? 

 

컨슈머 설정 값들 

서버 = 브로커 

 

 

 

 

 

offset 정의를 통해 - topic에 partitiong하고 몇번 오프셋까지 커밋햇는지 확인 및 

 

commitSync 를 통해 중복은 제거가 가능 하지만 순서 보장 X

 

비동기기 때문에 바로 실패의 여부를 알 수가 없다 .

 

# 컨슈머 리밸런스 

- 컨슈머 그룹의 파티션 소유권이 변경될 때 일어남

- 리밸런스하는 동안에는 일시적으로 메시지를 가져올 수 없다. 

- 리밸런스 발생시, 데이터 유실 / 중복 발생 가능성 존재

    - commitSync() 또는 추가적인 방법으로 데이터 유실/ 중복 방지

- 리밸런스 발생 상황

    - consumer.close() 혹은 consumer 세션 끊어질 때 

# RebalancedListner 를 통해 

- Rebalanced 하는 상황에서 운영상으로 모니터링이 가능하다. 

 

# 재처리와 순서

  • 동일 메시지 조회 가능성
    • 일시적 커밋 실패, 리밸런스 등에 의해 발생한다. 
  • 컨슈머는 멱등성을 고려해야한다
    • ex) [조회수 1+ -> 좋아요 1+ -> 조회수 1 ] 메시지를 재처리 할 경우
    • 단순 재처리시, 조회수는 2가 아닌 4가 될 수 있다
  • 데이터의 특성에 따라 타임스탬프와 일련 번호 등을 활용하면 좋다. (중복처리를 제거하는 방법)
    • 파티션 내에서만 메시지 순서 보장이 처리가 되기 때문에 
    • 컨슈머가 여러개인 경우에는 저장된 데이터를 timestamp를 통해 order by하면된다. 

 

# 컨슈머 종료 처리

 

정상동작에서의 로직 

이러고 다시 켜면 100번~ 150이 커밋이 안되서 다시 100번부터 시작 

 

 

 

 

 

 

Rutime.getRuntime().addShutDownHook(new Thread(){
	public void run(){
		consumer.wakeup();
        }
   });
   
   
   
   try {
   	while(true){
       	...
        
  	consummer.commitSync();
    }
  } catch(WakeupException e){
  		...
  }finally{
  	consumer.commitSync();
    consumer.close();
    
   }

 

주의 사항 

카프카 컨슈머는 스레드에 안전하지 않다 
  • 여러 스레드에서 동시에 사용하지 않도록 주의
  • wakeup() 메서드는 예외 

# Consumer Thread 전략 

728x90
반응형

'kafka' 카테고리의 다른 글

Kafka Partition Replication, ISR 그리고 Producer Acks  (1) 2024.01.07
Kafka Streams  (0) 2022.01.20
kafka - Producer  (0) 2022.01.18
Kafka의 구조  (0) 2022.01.17