본문 바로가기

Data Engineering

[Spark/Databricks] Change data capture (CDC) Option

CDC란?

데이터 베이스 즉 DB에 대한 변경 내용을 식별하여 데이터에서 일어난 변경된 사실을 자동화로 적용해주는 기술이자 실시간으로 데이터 웨어하우스 및 기타 데이터 저장소에서 일어나는 데이터에 대해서 변경 부분을 캡쳐하는 식을 이야기한다.

Databricks에서 Auto Loader를 사용

Databricks와 S3 Mount

1. Iam에 s3 mount 권한 부여

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::migration"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject",
                "s3:PutObjectAcl",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::migration/*"
            ]
        }
    ]
}

Spark CDC Option

델타 테이블에서 CDF 기능을 활성화 해야한다. 또한 테이블에 대한 업데이트로 테이블에서 CDF를 활성화할 수도 있다.
# table에서 활성화
create table bronze_df
	(pk int, name string, estimate_s string, date string)
    using delta
    tblproperties (delta.enableChangeDataFeed = true);

set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

쓰기 중 자동 압축은 델타 테이블 파티션 내의 작은 파일을 결합하여 작은 파일들을 자동으로 줄입니다

set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

변경 데이터 피드 옵션을 활성화

https://docs.databricks.com/en/delta/delta-change-data-feed.html#enable-change-data-feed

 

Databricks documentation

 

docs.databricks.com

Merge Into 문으로 CDF 기능 수행

#foreachbatch는 2개의 변수를 가짐 df, long
def merge(df,i):
    df._jdf.sparkSession().sql(#'''
    merge into migration.bronze_df as b
    using migration.cdc_df as c
    on b.pk = c.pk
    when matched and c.Op = 'D' Then delete
    when matched and c.Op = 'U' then update set *
    when not matched and c.Op = 'I' then insert *''')

# 제외하기

반응형