CDC란?
데이터 베이스 즉 DB에 대한 변경 내용을 식별하여 데이터에서 일어난 변경된 사실을 자동화로 적용해주는 기술이자 실시간으로 데이터 웨어하우스 및 기타 데이터 저장소에서 일어나는 데이터에 대해서 변경 부분을 캡쳐하는 식을 이야기한다.
Databricks에서 Auto Loader를 사용
Databricks와 S3 Mount
1. Iam에 s3 mount 권한 부여
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::migration"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:DeleteObject",
"s3:PutObjectAcl",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::migration/*"
]
}
]
}
Spark CDC Option
델타 테이블에서 CDF 기능을 활성화 해야한다. 또한 테이블에 대한 업데이트로 테이블에서 CDF를 활성화할 수도 있다.
# table에서 활성화
create table bronze_df
(pk int, name string, estimate_s string, date string)
using delta
tblproperties (delta.enableChangeDataFeed = true);
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
쓰기 중 자동 압축은 델타 테이블 파티션 내의 작은 파일을 결합하여 작은 파일들을 자동으로 줄입니다
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
변경 데이터 피드 옵션을 활성화
https://docs.databricks.com/en/delta/delta-change-data-feed.html#enable-change-data-feed
Merge Into 문으로 CDF 기능 수행
#foreachbatch는 2개의 변수를 가짐 df, long
def merge(df,i):
df._jdf.sparkSession().sql(#'''
merge into migration.bronze_df as b
using migration.cdc_df as c
on b.pk = c.pk
when matched and c.Op = 'D' Then delete
when matched and c.Op = 'U' then update set *
when not matched and c.Op = 'I' then insert *''')
# 제외하기
반응형