728x90
반응형
kafka streams - kafka 에서 공식적으로 지원하는 라이브러리
토픽에있는 데이터를 낮은 지연률로 빠른 속도로 데이터 처리 가능
장점
1. 카프카와의 호환성
- 카프카의 버전정보에 대해 다른 툴들은 완벽한 호환성을 제공하지 않지만 카프카 스트림즈는 카프카 호환성을 걱정하지 않아도 된다.
- 유실과 중복 처리 방지에 대한 완벽한 기능을 제공하고 있다.
2. 스케줄링 도구가 필요없다
- 스파크 스트리밍과 연동해서 사용한다면 마이크로 배치 서비스를 구축 할 수 있지만 이를 위해 클러스터 관리자, 리소스 관리자가 필요하고 또한 서버들도 필요합니다. 하지만 Kafka Streams는 다른 것 필요없이 스트림즈 어플리케이션만 가지고 사용할 수 있습니다.
3. 스트림즈 DSL , 프로세서 API 제공
- 많은 기능들을 이미 스트림즈 DSL에서 재공하고 있기 때문에 필요에 의해 새로운 로직이 필요하다면 프로세서 API를 사용하면된다.
- Kstream , KTable , GlobalTable
4. 자체적으로 로컬 상태 저장소를 사용한다.
비 상태기반 처리 - 데이터가 들어올 때마다 처리해주고 넘기면 유실과 중복에 대한 걱정이 적다.
상태기반 처리 - window , join 등을 통해 메모리에 저장한 후에 다음 데이터를 참조후에 처리하기 때문에 굉장히 허들이 높다.
이러한 허들을 로컬에 rocksdb를 사용하여 상태기반에 대한 처리를 좀 더 편하게 처리할 수 있도록 도와준다.
payment 라는 토픽에 매핑해준 후 ,
들어온 데이터에 대해서 key값이 unknown 이라면 스트림을 통해 unknown-payment 로 변환해주는 로직
private static Properties getKafkaStreamsProperties(String appId) {
// streams config
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "카프카 브로커 IP");
//메시지의 키, 값의 직렬화/역직렬화를 위한 설정
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
private static void topic1Stream() {
//스트림 토폴로지 정의
//StreamsBuilder를 build 하면 Topology 반환
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("streamTopic");
stream
.mapValues(String::trim)
.filter((String v1, String v2) -> (v1 + v2).length() > 5)
.mapValues((String v1) -> v1 + " 토픽1 처리완료 ")
.to("streamTopic2");
//스트림 생성
KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("SecondId"));
streams.start();
}
private static void topic2Stream() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("streamTopic2");
stream
.filter((key, value) -> value.contains("3"))
.mapValues((String v1) -> v1 + " 토픽2 처리완료 ")
.to("streamTopic3");
KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("LastId"));
streams.start();
}
private static void producerTest() {
//1. Properties 만들기
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "카프카 브로커 ID");
kafkaProducerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.put(ProducerConfig.ACKS_CONFIG, "-1");
// 2. Producer 생성
Producer<String, String> producer = new KafkaProducer<>(kafkaProducerProperties);
// 3. 전송
for (int index = 0; index < 3; index++) {
// 해당 토픽은 stream1에 이어지도록 설정
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("streamTopic", "stream", "스트림테스트 : " + index);
producer.send(producerRecord);
}
// 4. 닫기
producer.close();
}
Producer에서 kafka에 send 하기 이전에 전처리 해줘야하는 사항들에 있어서 사용하기에 적합
다양한 스트림 설정을 통해 연속된 처리의 파이프라인 구축 가능
stream.filter와 같은 메서드 들이 Streams DSL
728x90
반응형
'kafka' 카테고리의 다른 글
Kafka Partition Replication, ISR 그리고 Producer Acks (1) | 2024.01.07 |
---|---|
kafka - Consumer (0) | 2022.01.18 |
kafka - Producer (0) | 2022.01.18 |
Kafka의 구조 (0) | 2022.01.17 |