Spark Join 방식
Data Engineering

Spark Join 방식

728x90
반응형

Spark 조인 수행 방식이 어떻게 진행되는 지 살펴보자. 

우선, Spark 조인 수행 방식에 대해선 실행에 필요한 두 가지 핵심 전략을 알아야 한다.

  • 노드간 네트워크 통신 전략이 어떤 방식인지 
  • 노드별 연산 전략이 어떤 식으로 진행되는 지 

 

네트워크 통신 전략 

  • 스파크는 조인 연산 수행 시, 두 가지 클러스터 통신 방식을 활용한다. 
  • Shuffle Join (셔플 조인) -> 전체 노드간 통신을 유발
  • Broadcast Join (브로드 캐스트 조인) -> 전체 노드 간 통신을 유발 하지 않음. 
참고로, 이후 스파크에서 최적화 기술은 비용 기반 옵티마이저(CBO) 가 개선된다면  더 나은 통신 전략이 도입되어 바뀔 수 있다 .

 

만약 조인하고자 하는 두 테이블의 크기를 가정해보고 어떤 식으로 조인이 수행되는 지 확인해보자. 

밑에서 설명하는 테이블은 곧 데이터프레임이다. 

 

큰 테이블(데이터프레임)과 큰 테이블(데이터프레임)의 조인 

 

  • 두 큰 테이블 간 조인은 셔플 조인 발생
    • 두 데이터프레임이 커서, 전체 조인 프로세스가 진행되는 동안 (데이터 파티셔닝 없이) 모든 워커 노드(그리고 모든 파티션)에서 통신이 발생한다.
  • 조인에 사용한 특정 키, 키 집합을 어떤 노드가 가졌는가에 따라 해당 노드와 데이터를 공유
    • 이런 통신 방식으로 인해 네트워크는 복잡 및 리소스 과다 사용 (파티셔닝이 잘 안되어있다면 더 심함)

큰 테이블과 작은 테이블의 조인 

  • 테이블이 단일 워커 노드의 메모리 크기에 적합한 정도로 작은 경우 조인 연산을 최적화 할 수 있다.
  • 이 때는 브로드캐스트 조인이 훨씬 효율적이다.
    • 해당 방법은 결국 작은 데이터프레임을 클러스터 전체 워커 노드에 복제하는 것을 의미한다.
    • 이를 통해 조인 프로세스 내내 전체 노드가 통신하는 현상을 방지할 수 있다.

  • 해당 방식을 통해, 시작 시에만 단 한 번의 복제가 수행되며
    그 이후로는 개별 워커가 다른 워커 노드를 기다리거나 통신할 필요없이 작업 진행
  • 물론 브로드캐스트 조인은 이전 셔플 조인 방식과 마찬가지로 대규모 노드 간 통신이 발생하지만,
    그 이후로는 노드 사이에 추가적인 통신이 발생하지 않는다.
  • 따라서 모든 단일 노드에서 개별적으로 조인이 수행되므로 CPU 만 가장 큰 병목 구간

 

다음 코드를 통해 스파크가 자동으로 데이터셋을 브로드캐스트 조인으로 설정한 것을 알 수 있다.

val joinExpr = person.col("graduate_program") === graduateProgram.col("id")

person.join(graduateProgram, joinExpr).explain()

 

DataFrame APi 를 통해 옵티마이저에게 브로드캐스트 조인할 수 있도록 힌트 제공도 가능하다.

  • 방법: broadcast() 함수에 작은 크기의 DF 를 인수로 전달
  • 하지만, 힌트 제공이 옵티마이저에게 강제성을 부여하는 것은 아니기 때문에, 늘 동일한 실행 계획을 세우는 것은 아니다.
import org.apache.spark.sql.functions.broadcast 

val joinExpr = person.col("graduate_program") === graduateProgram.col("id")

person.join(broadcast(graduateProgram), joinExpr).explain()

 

아주 작은 테이블 사이의 조인 

아주 작은 테이블 사이의 조인의 경우에는 스파크가 알아서 조인을 결정하도록 내버려두는 것이 제일 좋다. 

 

 

 

ref :

728x90
반응형