728x90
반응형
Spark 조인 수행 방식이 어떻게 진행되는 지 살펴보자.
우선, Spark 조인 수행 방식에 대해선 실행에 필요한 두 가지 핵심 전략을 알아야 한다.
- 노드간 네트워크 통신 전략이 어떤 방식인지
- 노드별 연산 전략이 어떤 식으로 진행되는 지
네트워크 통신 전략
- 스파크는 조인 연산 수행 시, 두 가지 클러스터 통신 방식을 활용한다.
- Shuffle Join (셔플 조인) -> 전체 노드간 통신을 유발
- Broadcast Join (브로드 캐스트 조인) -> 전체 노드 간 통신을 유발 하지 않음.
참고로, 이후 스파크에서 최적화 기술은 비용 기반 옵티마이저(CBO) 가 개선된다면 더 나은 통신 전략이 도입되어 바뀔 수 있다 .
만약 조인하고자 하는 두 테이블의 크기를 가정해보고 어떤 식으로 조인이 수행되는 지 확인해보자.
밑에서 설명하는 테이블은 곧 데이터프레임이다.
큰 테이블(데이터프레임)과 큰 테이블(데이터프레임)의 조인
- 두 큰 테이블 간 조인은 셔플 조인 발생
- 두 데이터프레임이 커서, 전체 조인 프로세스가 진행되는 동안 (데이터 파티셔닝 없이) 모든 워커 노드(그리고 모든 파티션)에서 통신이 발생한다.
- 조인에 사용한 특정 키, 키 집합을 어떤 노드가 가졌는가에 따라 해당 노드와 데이터를 공유
- 이런 통신 방식으로 인해 네트워크는 복잡 및 리소스 과다 사용 (파티셔닝이 잘 안되어있다면 더 심함)
큰 테이블과 작은 테이블의 조인
- 테이블이 단일 워커 노드의 메모리 크기에 적합한 정도로 작은 경우 조인 연산을 최적화 할 수 있다.
- 이 때는 브로드캐스트 조인이 훨씬 효율적이다.
- 해당 방법은 결국 작은 데이터프레임을 클러스터 전체 워커 노드에 복제하는 것을 의미한다.
- 이를 통해 조인 프로세스 내내 전체 노드가 통신하는 현상을 방지할 수 있다.
- 해당 방식을 통해, 시작 시에만 단 한 번의 복제가 수행되며
그 이후로는 개별 워커가 다른 워커 노드를 기다리거나 통신할 필요없이 작업 진행 - 물론 브로드캐스트 조인은 이전 셔플 조인 방식과 마찬가지로 대규모 노드 간 통신이 발생하지만,
그 이후로는 노드 사이에 추가적인 통신이 발생하지 않는다. - 따라서 모든 단일 노드에서 개별적으로 조인이 수행되므로 CPU 만 가장 큰 병목 구간
다음 코드를 통해 스파크가 자동으로 데이터셋을 브로드캐스트 조인으로 설정한 것을 알 수 있다.
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr).explain()
DataFrame APi 를 통해 옵티마이저에게 브로드캐스트 조인할 수 있도록 힌트 제공도 가능하다.
- 방법: broadcast() 함수에 작은 크기의 DF 를 인수로 전달
- 하지만, 힌트 제공이 옵티마이저에게 강제성을 부여하는 것은 아니기 때문에, 늘 동일한 실행 계획을 세우는 것은 아니다.
import org.apache.spark.sql.functions.broadcast
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(graduateProgram), joinExpr).explain()
아주 작은 테이블 사이의 조인
아주 작은 테이블 사이의 조인의 경우에는 스파크가 알아서 조인을 결정하도록 내버려두는 것이 제일 좋다.
ref :
728x90
반응형