데이터 이관했을 당시 Source Table과 Target Table에 대한 데이터 검증을 경험한 바가 있다.
데이터 엔지니어라 함은 데이터 검증을 피할 수가 없는데, 보통은 count로 이를 확인하고 더 나아가서 Min, Max, Sum을 추출하여 데이터 검증을 진행한다.
컬럼의 수가 많을 수록, 또한 데이터가 클 수록 이에 대한 검증 시간은 비례하다.
기존에는 검증에 대한 로직이 없었으며 작업을 수행해야하는 시간이 다소 짧았었다. 빠르게 검증 로직에 대한 템플릿을 만들었고 데이터가 큰 테이블들은 시간이 오래 걸렸음에도 흐린눈 하며 검증 코드 고도화를 미루다가 하나의 테이블이 생각보다(...) 너~무 오래걸리는 이슈로 인해 로직에 대한 고도화를 진행해야만 했다.
(고도화 작업을 진행할때 기존 코드를 보다가 내가 짰지만 내가 짠 코드에 자책감은 이루 말할 수 없었던 .. 다신 이런 실수를 하지 않기 위해 블로그에 기록하는 용도입니다ㅠ )
기존의 검증에 대한 로직 순서는 이러했다.
1. 특정 테이블에 대한 소스의 메타 정보를 가져오기
2. (sum이 안되는 타입이 있으므로 각 데이터 형식에 맞춰) 각 컬럼에 대한 최소값, 최대값, 합계 계산 (date, timestamp, decimal, string,int etc)
min = tmp.select(min(column)).collect()[0][0]
3. 결과 Dataframe 스키마 정의
4. 결과 df에 최소값, 최대값, 합계 결과 추가
5. monotonically_increasing_id()를 추가하여 인덱스 추가
6~10.타겟에 대한 로직 순서도 위와 동일했다.
11. 마지막으로 소스와 타겟에 대한 최소값,최대값,합계를 구한 뒤에 인덱스 순서에 맞춰 이에 대한 True/ False까지 하는 과정을 테이블마다 반복했다.
검증 코드를 다시 돌려보니, 시간이 오래 걸릴 수 밖에 이유를 발견했다.
1. .collect()[0][0]은 한 번에 하나의 집계 연산만을 수행하는데 이와 같은 연산을 여러 번 반복하여 수행했다.
collect() 메서드는 데이터를 클러스터의 각 노드에서 드라이버로 가져오기 때문에 I/O가 많이 발생한다고 한다.
기존 로직에서 select() 메서드를 호출하여 데이터를 읽고, collect() 메서드를 통해 클라이언트로 데이터를 전송했다. 이 과정은 각 컬럼마다 반복됐고 따라서, 컬럼이 많으면 많을 수록 데이터 스캔 및 수집 작업이 반복되어 시간이 오래 걸렸다는 점이 성능 저하를 유발한 것 같다.
2. 각 컬럼에 대해 최소값, 최대값, 합계를 각각 개별적으로 계산했다.
문제점 : 코드를 이해하기 쉽게 만들어서 사용했던 게 가장 큰 문제였고, 성능 측면에서 위와 같은 문제를 고려하지 않은 점이 가장 큰 문제점이라고 생각했다.
검증고도화에 대한 로직 순서
1. 특정 테이블에 대한 소스, 타겟의 메타 정보를 함께 가져오기
2. 데이터 형식에 맞춰 min(column),max(column),sum(column)의 동적 쿼리를 생성
query= min(a), max(a), sum(a), min(b), max(b), ...
3. 집계연산들을 spark.sql(query)로 던져 최소값, 최대값, 합계를 한번에 구하기
4. 결과 Dataframe 스키마 정의
5. 결과 df에 최소값, 최대값, 합계 결과 추가
6. 각각의 최소값,최대값,합계에 대한 True/ False 구하기
결론 ( 개선한 점 )
1. collect() 호출 감소
2. 개별 집계연산 (하나의 컬럼에 min : end :
하나의 컬럼에 max : end : 여러번 스캔) → 다수 집계 연산으로 변경 (하나의 컬럼에 min,max,sum : end : 한번 스캔)
min_a = tmp.select(min(col("a"))).collect()[0][0]
max_a = tmp.select(max(col("a"))).collect()[0][0]
sum_a = tmp.select(sum(col("a"))).collect()[0][0]
agg_results = tmp.agg(
min(col("a")).alias("min_a"),
max(col("a")).alias("max_a"),
sum(col("a")).alias("sum_a")
).collect()[0]
min_a = agg_results["min_a"]
max_a = agg_results["max_a"]
sum_a = agg_results["sum_a"]
3. 이전은 템플릿 코드로만 작성하여 검증 노트북을 들어가 테이블에 대한 이름들을 줘야했다면,
고도화 작업에 대해선 각각의 과정들을 UDF들로 만들어, 각각의 소스와 타겟에 대한 테이블 이름만 주면 user_id에 맞춰 (본인이 작업할 테이블에 대한 메타 정보에 'y'로 작업을 한 뒤) 검증하여 테이블로 append해서 저장까지 해주는 파이프라인을 만들었다.
결과는 12분 걸렸던 테이블이 약 54초로 시간 단축이 기하급수적으로 줄어든 현상을 볼 수 있었다.
고도화 작업을 하면서 너무 비효율적인 코드를 짠 것 같아 너무 자괴감이 밀려왔다,,ㅋ,, 좀 더 열심히 해야겠다,, ㅎㅎ,,