토픽에 데이터를 생성하는 역할
특정 토픽으로 데이터를 publish (전송)
브로커에게 전송하며 처리 실패 / 재시도
라이브러리 명시에 버전을 주의할 것
- 모든 버전의 하위호환성이 완벽하지 않다.
프로듀서 생성 및 send 코드
bootstrap.servers 설정시 이 때 반드시 2개이상의 브로커정보를 넣어주는 것이 장애 발생시 대처가 가능하다.
key 직렬화 - Byte array String, Integer Serializer 설정 가능
이 때 토픽에 메시지를 보내는 방법 2가지
- key 를 포함하여 보내기 - key 값 기준 파티션 선정
- 중간에 파티션이 늘어나게 되면, 키에 대한 매핑이 깨지기 때문에 규칙이 깨진다.
- key 포함하지 않고 보내기 - RR 방식으로 전송
전송이 완료되면 .close()를 통해 닫아주는 것 확실하게
# 프로듀서의 기본 흐름
- send 를 통해 보내진 레코드는 Serializer를 통해 먼저 byte 배열로 변환
- 파티션이 그 후 결정되게 되고
- 카프카 브로커에게 보내지기 이전 배치에 적재 후 쌓아서 보낼 수 있음
- 그 이후 sender는 다른 스레드에서 동작하기 때문에 배치가 찼는지의 여부에 상관없이 전송하는 역할 수행
# 처리량 관련 주요 속성
batch.size : 배치 크기 ( 배치가 다 차면 바로 전송하게 된다)
linger.ms : 전송 대기 시간(기본값 0)
- 대기 시간이 없으면 배치를 바로 전송하게 된다.
- 지연 시간을 설정하게 되면 배치로 대용량을 한 번에 보내주기 때문에 처리량이 증대하는 효과
# 프로듀서의 send가 성공했는지 확인하는 법
1. Future 사용하기
- 물론 확인하게 되면 배치 효과가 떨어지고 , 처리량이 저하될 수 있는 우려 존재
- 처리량이 낮아도 되는 경우에만 사용할 것
Future<BusData> f = producer.send(new ProducerRecord<>("topic","value"));
try {
//blocking
BusData bus = f.get();
}catch(ExecutionException ex){
}
- loop를 돌리면서 코드가 실행되면서 하나 보내고 블로킹 하나 보내고 블로킹 따라서 배치에 쌓이지 않는다.
- 하지만 건별로 확실하게 결과를 확인할 수 있음
2. Callback 사용
producer.send(new ProducerRecord<>("simple","value"),
new Callback(){
@Override
public void onCompletion(BusData busData, Exception ex){
}
});
- 처리량 저하가 없다. (블로킹 방식이 아님)
전송 보장과 ack
ack = 0
- 서버 응답을 기다리지 않는다.
- 전송 보장 X
ack. = 1
- 파티션의 리더에 저장되면 응답 받음
- 리더 장애시 메시지 유실 가능
ack = all (엄격하게 전송이 보장되어야 할 때)
- 모든 레플리카에 저장되면 응답 받음
- 브로커 min.insync.replicas 설정에 따라 달라짐
- min.insync.replicas : 프로듀서 ack 옵션이 all 일 때 저장에 성공했다고 응답할 수 있는 동기화된 레플리카 최소 수
# 재시도 주의 사항
중복 전송 가능성 속성
- enable.idempotence 속성
재시도와 순서
- max.in.flight.requests.per.connection
- 블록킹 없이 한 커넥션에서 전송할 수 있는 최대 전송 중인 요청 개수
- 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있다.
결과적으로 전송된 순서 2 - 3 - 1
'kafka' 카테고리의 다른 글
Kafka Partition Replication, ISR 그리고 Producer Acks (1) | 2024.01.07 |
---|---|
Kafka Streams (0) | 2022.01.20 |
kafka - Consumer (0) | 2022.01.18 |
Kafka의 구조 (0) | 2022.01.17 |