데이터 이관했을 당시 Source Table과 Target Table에 대한 데이터 검증을 경험한 바가 있다.

데이터 엔지니어라 함은 데이터 검증을 피할 수가 없는데, 보통은 count로 이를 확인하고 더 나아가서 Min, Max, Sum을 추출하여 데이터 검증을 진행한다.

컬럼의 수가 많을 수록, 또한 데이터가 클 수록 이에 대한 검증 시간은 비례하다.

기존에는 검증에 대한 로직이 없었으며 작업을 수행해야하는 시간이 다소 짧았었다. 빠르게 검증 로직에 대한 템플릿을 만들었고 데이터가 큰 테이블들은 시간이 오래 걸렸음에도 흐린눈 하며 검증 코드 고도화를 미루다가 하나의 테이블이 생각보다(...) 너~무 오래걸리는 이슈로 인해 로직에 대한 고도화를 진행해야만 했다.

(고도화 작업을 진행할때 기존 코드를 보다가 내가 짰지만 내가 짠 코드에 자책감은 이루 말할 수 없었던 .. 다신 이런 실수를 하지 않기 위해 블로그에 기록하는 용도입니다ㅠ )


기존의 검증에 대한 로직 순서는 이러했다.

1. 특정 테이블에 대한 소스의 메타 정보를 가져오기

2. (sum이 안되는 타입이 있으므로 각 데이터 형식에 맞춰) 각 컬럼에 대한 최소값, 최대값, 합계 계산 (date, timestamp, decimal, string,int etc)

min = tmp.select(min(column)).collect()[0][0]

3. 결과 Dataframe 스키마 정의

4. 결과 df에 최소값, 최대값, 합계 결과 추가

5. monotonically_increasing_id()를 추가하여 인덱스 추가

6~10.타겟에 대한 로직 순서도 위와 동일했다.

11. 마지막으로 소스와 타겟에 대한 최소값,최대값,합계를 구한 뒤에 인덱스 순서에 맞춰 이에 대한 True/ False까지 하는 과정을 테이블마다 반복했다.


검증 코드를 다시 돌려보니, 시간이 오래 걸릴 수 밖에 이유를 발견했다.

1. .collect()[0][0]은 한 번에 하나의 집계 연산만을 수행하는데 이와 같은 연산을 여러 번 반복하여 수행했다.

collect() 메서드는 데이터를 클러스터의 각 노드에서 드라이버로 가져오기 때문에 I/O가 많이 발생한다고 한다.

기존 로직에서 select() 메서드를 호출하여 데이터를 읽고, collect() 메서드를 통해 클라이언트로 데이터를 전송했다. 이 과정은 각 컬럼마다 반복됐고 따라서, 컬럼이 많으면 많을 수록 데이터 스캔 및 수집 작업이 반복되어 시간이 오래 걸렸다는 점이 성능 저하를 유발한 것 같다.

2. 각 컬럼에 대해 최소값, 최대값, 합계를 각각 개별적으로 계산했다. 

 

문제점 : 코드를 이해하기 쉽게 만들어서 사용했던 게 가장 큰 문제였고, 성능 측면에서 위와 같은 문제를 고려하지 않은 점이 가장 큰 문제점이라고 생각했다. 


검증고도화에 대한 로직 순서

1. 특정 테이블에 대한 소스, 타겟의 메타 정보를 함께 가져오기

2. 데이터 형식에 맞춰 min(column),max(column),sum(column)의 동적 쿼리를 생성

query= min(a), max(a), sum(a), min(b), max(b), ...

3. 집계연산들을 spark.sql(query)로 던져 최소값, 최대값, 합계를 한번에 구하기

4. 결과 Dataframe 스키마 정의

5. 결과 df에 최소값, 최대값, 합계 결과 추가

6. 각각의 최소값,최대값,합계에 대한 True/ False 구하기


결론 ( 개선한 점 )

1. collect() 호출 감소

2. 개별 집계연산 (하나의 컬럼에 min : end :

                           하나의 컬럼에 max : end : 여러번 스캔) 다수 집계 연산으로 변경 (하나의 컬럼에 min,max,sum : end : 한번 스캔)

min_a = tmp.select(min(col("a"))).collect()[0][0]
max_a = tmp.select(max(col("a"))).collect()[0][0]
sum_a = tmp.select(sum(col("a"))).collect()[0][0]

agg_results = tmp.agg(
    min(col("a")).alias("min_a"),
    max(col("a")).alias("max_a"),
    sum(col("a")).alias("sum_a")
).collect()[0]

min_a = agg_results["min_a"]
max_a = agg_results["max_a"]
sum_a = agg_results["sum_a"]

 

3. 이전은 템플릿 코드로만 작성하여 검증 노트북을 들어가 테이블에 대한 이름들을 줘야했다면,

고도화 작업에 대해선 각각의 과정들을 UDF들로 만들어, 각각의 소스와 타겟에 대한 테이블 이름만 주면 user_id에 맞춰 (본인이 작업할 테이블에 대한 메타 정보에 'y'로 작업을 한 뒤) 검증하여 테이블로 append해서 저장까지 해주는 파이프라인을 만들었다.

as-is

 

to-be


결과는 12분 걸렸던 테이블이 약 54초로 시간 단축이 기하급수적으로 줄어든 현상을 볼 수 있었다.

고도화 작업을 하면서 너무 비효율적인 코드를 짠 것 같아 너무 자괴감이 밀려왔다,,ㅋ,, 좀 더 열심히 해야겠다,, ㅎㅎ,,

반응형
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기