본문 바로가기

Data Engineering/Cloud

[AWS] RDS(MYSQL)에서 DMS를 사용하여 Databricks로 Migration하기

데이터의 변경된 내용을 자동으로 식별하기 위해서 Change Data Capture(CDC)의 기술을 접목할 것이다.  데이터 브릭스로 CDC를 진행할때는 어떤 방식으로 이루어지는 알아보자.

사용한 서비스 : AWS의 DMS, AWS RDS(MYSQL),Databricks(Spark)

1. RDS 생성

DB는 Mysql을 생성해서 사용했다. 비용효율적으로 EC2에 mysql을 설치해서 사용해도 되지만, RDS를 사용해본 적이 없기에, CDC를 통해서 겸사겸사 진행했다. 또한 프리티어를 사용해서 Amazon RDS 단일 AZ(개발자용) db.t2 micro 인스턴스는 750시간 무료로 사용할 수 있다고 하니 클라우드 환경에서 DB를 사용할때 생성하면 좋을 듯 싶다.

주의 : 버스터블 클래스인 t로 선택을 했을때 과금 방지를 위해 스토리지 자동 조정 활성화를 꼭 꺼줘야 한다. 

 

 

RDS를 생성한 뒤 파라미터 그룹을 생성한 뒤 파라미터를 생성하여 RDS에 적용해주어야 한다.

  • character_set_client : utf8mb4
  • character_set_connection : utf8mb4
  • character_set_database : utf8mb4
  • character_set_filesystem : utf8mb4
  • character_set_results : utf8mb4
  • character_set_server : utf8mb4
  • collation_connection : utf8mb4_general_ci
  • collation_server : utf8mb4_general_ci
  • binlog_format : ROW
  • binlog_row_image : full
  • sync_binlog : 1

등등 .. 파라미터를 설정해 준 뒤 RDS에 적용한다.

utf8mb4는 utf8에서 이모지 저장이 가능한 Character Set임

 

2. DMS 생성

데이터 마이그레이션 탭에 있는 복제 인스턴스를 생성

각각의 소스, 대상 엔드포인트 설정 후 데이터 베이스 마이그레이션 태스크를 생성한다.

생성하기 전 IAM role을 사용해야 하는데, 신뢰 정책에 dms.amazonaws.com을 넣어준다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": "dms.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

DMS 대상 엔드포인트 설정 시 적용하는 파라미터 값이다.

 

Error 

Task setting의 파라미터인 FailOnNoTablesCaptured 를 false로 변경했는데도 불구하고 계속 stop Reason FATAL_ERROR error Level FATAL이라는 에러로 CDC가 진행이 되지 않았다. 그래서 Cloudwatch를 활성화 한 뒤 로그를 뜯어보니 Binary Logging이 활성화 되지 않았다는 에러를 발견했다.

공식문서를 찾아보니 CDC를 사용하려면 바이너리 로깅이 활성화되어야 했고, 이는 RDS에서 자동백업을 활성화 한 상태여야 바이너리 로깅이 활성화 된다고 한다. 바이너리 로그란, Mysql 서버 인스턴스의 데이터 변경사항들에 대한 정보를 포함하는 로그파일들이다.

 

3. 데이터브릭스 CDC설정

브릭스에서 CDC를 하기위한 스파크 옵션들을 켜준 뒤, Auto loader를 사용해서 CDC를 진행했다. 

Mysql은 DBeaver를 사용해서 실시간으로 변경된 내역이 데이터브릭스에도 잘 적용이 되는지 확인했다.

schema = "Op string, pk int, name string, hobby string"

(spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "parquet")
                .schema(schema)
                .load("/mnt/migration/jennydb/raw/")
                .writeStream
                .format("delta")
                .outputMode("append")
                .trigger(once=True)
                .option("checkpointLocation", "dbfs:/user/***/jenny_cdc_df")
                .table("migration.jenny_cdc_df"))

jennydb/raw에는 cdc의 로그들이 쌓이고 이를 델타 테이블로 말아서 저장한 형태이다.

Merge into 문으로 foreachbatch인 배치형 함수를 사용했으며, foreachbatch는 2개의 인수인 df와 Long을 인수로 가진다.

 

DMS의 Transaction Log를 사용해서 Op를 확인했을때, Pk의 11은 Insert이며, 1은 Delete이다.

따라서 데이터브릭스에서 Auto loader를 사용해서 CDC까지 진행했음을 마무리한다.

반응형