본문 바로가기

Data Engineering

[Databricks] dbutils.widget을 사용하여 데이터 검증 수행하기

Databricks를 사용하여 테이블 컬럼 별로 검증 수행하기

데이터 엔지니어라 함은 데이터 마이그레이션도 중요하지만, 정합성 검증 또한 중요하다.

이번 블로그에 작성할 내용은 데이터브릭스에서 정합성 검증을 진행했던 경험에 대해서 작성해보려고 한다.

데이터브릭스는 데이터 분석가와 엔지니어, 데이터 싸이언티스트들이 머신러닝 작업이나 ETL작업을 쉽게 공간을 공유해서 사용할 수 있는 데이터 플랫폼이다. 파라미터 변수들을 동적으로 받아와서 진행할 수 있는데, Databricks에서는 dbutils.widgets기능을 제공한다.

사용자가 파라미터에 입력을 해서 데이터베이스와 테이블을 지정하면, 해당 테이블을 자동으로 리프레쉬되어 테이블을 읽는다.

따라서 코드를 전부 작성한 뒤, 위젯에 파라미터를 넣기만 하면 각 테이블이 자동으로 바뀌어 각 테이블에 대해 정합성 검증을 할 수 있다는 말로 이해하면 될 것 같다. 


진행했던 방식은 아래와 같다.

1. date 형식 → replace('-','') → min, max → string 형식
2. decimal & float 형식 → min,max,sum → format(.4f) → string 형식
3. Int & bigint → min,max,sum → string 형식
4. string & varchar → min,max → string 형식
※ string은 sum이 되지 않는다.

 

RDBMS에서는 varchar 데이터 타입을 주로 사용하지만, 데이터브릭스에선 varchar타입을 잘 사용하지 않는다.

데이터브릭스는 hive와 호환이 잘 되기 때문에 마이그레이션 한 브론즈에는 타입을 varchar로 두었으나 실버에서는 타입을 string으로 cast하여 진행했다.

 

  • VARCHAR는 가변 길이 문자열을 저장하며, RDBMS에서 자주 사용된다.
    예: RDBMS(mysql 등)
  • STRING은 길이를 지정하지 않는 문자열 타입으로, 주로 NoSQL이나 빅 데이터 환경에서 사용된다.
    예: Apache Hive, Google BigQuery 등.

 

1. 위젯 생성

먼저, 사용자 입력을 받을 수 있는 위젯을 생성한다. 위젯은 Databricks 노트북의 상단에 텍스트 입력 상자로 나타난다.

# 위젯 생성
dbutils.widgets.text('db', 'A')
dbutils.widgets.text('table_a', 'a')
dbutils.widgets.text('table_b', 'b')

 

2. 사용자 입력값 가져오기

생성된 위젯에서 사용자 입력값을 가져오는 명령어

# 위젯에서 값 가져오기
db = dbutils.widgets.get('db')
a = dbutils.widgets.get('table_a')
b = dbutils.widgets.get('table_b')

3. Spark SQL을 사용하여 테이블 읽기

지정된 데이터베이스와 테이블을 읽어옵니다.

# spark.sql을 이용하여 테이블 읽기
df_a = spark.sql(f"select * from {db}.{a}")
df_b = spark.sql(f"select * from {db}.{b}")

 

4. 데이터 타입 변환

decimal 타입의 컬럼을 string으로 변환하기 전, string으로 변환하고 min,max를 구하면 값이 달라진다.

따라서 decimal은 min,max를 구하고 insert할때 string으로 변환한다.

from pyspark.sql.functions import *

# df_a를 source로 예정
df_a_list = []
for column in df_a.columns:
# datatype이 numeric type일때 min,max를 구하는 방법
    if isinstance(df_a.schema[column].dataType, NumericType):
        min_v = df_a.select(spark_min(col(column))).collect()[0][0]
        min_v = str(min_v)
        max_v = df_a.select(spark_max(col(column))).collect()[0][0]
        max_v = str(max_v)
        sum_v = df_a.select(spark_sum(col(column))).collect()[0][0]
        sum_v = str(sum_v)
    ...

    df_a_list.append((column, min_v, max_v, sum_v))

 

이처럼 schema의 데이터 타입이 numerictype, datetype, stringtype인지에 따라서 데이터 형식에 맞춰 min, max, sum을 계산할 수 있다. target도 위와 같은 방식으로 진행한 후, source와 target을 마지막으로 합쳐 최종으로 데이터프레임을 생성한다

combined_df.withColumn("min_t_f", when((col("df_a_min") == col("df_b_min")) | 
                                  ((col("df_a_min") == " ") & (col("df_b_min").isNull())) |
                                  ((col("df_a_min").isNull()) & (col("df_b_min") == " ")) | 
                                  (col("df_a_min").isNull() & col("df_b_min").isNull()), True)
                                   .otherwise(False))\
				...

 

마지막으로 withcolumn을 사용하여 min_tf 컬럼을 생성한다.

이때 각각의 테이블에 대해서 Null값과 공백은 값이 없다 할지라도 같지 않기 때문에 false로 지정된다.

브론즈에서 실버로 간 테이블들끼리 비교를 했기 때문에 브론즈에 공백일지라도 실버에서도 값이 없는 null이 들어가있는 것이 맞아 공백과 Null은 같다라는 조건을 주어 true로 진행했다.


isinstance type으로 확인하는 방법이 아닌 sql describe를 이용한 방법

※ sql 명령어로 describe로 읽어 테이블의 datatype 확인한 뒤, for문으로 데이터 타입에 맞춰 startwith으로 진행하는 방법도 있음
df_a_list = []
for row in df_a.collect():
    col_name = col_row['row_1']
    col_type = col_row['row_2']

    if col_name in df_a.columns:
    # 날짜 형식 처리
        if col_type.startswith("date"):
            min_v = df_a.select(min(col_name)).collect()[0][0]
            min_v = str(min_v)
    ...

Null도 true로 되어있는 것을 확인할 수 있다.

반응형