Intro
Spark 공부를 하면서 업무적으로 기회를 얻으려고 진행한 자격증이였다. 원래 자격증은 한번에 붙는 나였는데 이 자격증은 계속 떨어졌다.
팀장님의 계속적인 바우처 도움이 아니였으면 애초에 포기했었을 자격증이였다. 시험과 답이 모두 영어로 나오기 때문에 영어를 못하는 본인으로써는 조금 난해했던 자격증이였다. 역시 본인이 제일 공부해야하는 언어는 영어 일듯 싶다 ^^
그래도 계속 도전해보면 언젠가 따겠지 하는 마인드로 밀어붙였더니 고득점으로 합격했다
뭐든지 계속 하다보면 안되는 건 없다 계속 떨어져서 눈치는 많이 봤지만 그래도 합격하게 되면 앞선 과정들은 아무것도 아니라고 생각했다
준비기간 ( 약 2달 )
이번 자격증은 자격증이 주가 아니였고, 업무를 녹여내게끔 하는 것이 주였다. 따라서 실제로 코딩하면서 진행했기 때문에 시간이 다소 소요됐다.
- Databricks Learning에서 dbc를 복사하여 실제 Lab 과정 수행
- Databricks Learning에서 나오는 연습 시험 2회독
- Udemy에 있는 Databricks Certified Data Engineer Associate Practice Exams 5회 풀기
- Examtopics에 나와있는 Databricks 20문제 풀기
- Nothion에 오답노트 & 개념 정리
결과
앞서 떨어졌던 과정들이 무색하게나마 높은 고득점으로 합격했다. 실제로 자격증을 딴 이후로는, 자격증보다 프로젝트나 PoC에 참여해서 업무하는 것이 베스트라고 생각한다.
키워드 정리
아래의 키워드들은 공부하면서 정리한 개념적인 부분들과 답을 유추하게끔 하는 부분이다. 참고하세요 :)
External vs manage table
- external → drop table 할 시, 메타스토어에 있는 테이블만 삭제되며 델타로그, 히스토리는 스토리지에 남아있음
- manage → drop table 할 시, 메타스토어,델타로그,히스토리 모두 삭제
DDL 구문에서 소유자 식별하며 테이블 속성 채우는 구문
Create table inventory(id int, units float)
TBLPROPERTIES (business_owner = 'supply chain')
Ater table inventory
SET TBLPROPERTIES (business_owner, 'operations')
Struct Json
- From_json(json 값, json 스키마)
Python shell에서 python 변수 전달
order_date = dbuitls.widget.get("widget_order_date")
spark.sql(f"select * from sales where orderDate = '{order_date}'")
Autoloader의 성능을 높이는 방법
- maxFilesPerTrigger의 옵션을 높은 수로 늘린다. 기본값은 1000이며 높은 숫자로 늘릴수록 큰 컴퓨팅이 필요하다
Bronze Layer
- 수집된 데이터의 원시 복사본
- 기존 데이터 레이크 대체
- 처리되지 않은 전체 데이터 기록의 효율적인 저장 및 쿼리 제공
- 스키마가 적용되지 않음
- 타임스탬프를 사람이 읽을 수 있는 형식으로 구문 분석하여 데이터를 보강하는 작업
Silver layer
- 데이터 품질검사, 손상된 데이터 격리
- 데이터 스토리지 복잡성, 대기시간 및 중복성 감소
- 집계없이 원래 데이터의 입자 유지
- 중복 기록 제거
- 스키마 진화 적용
- 정리된 데이터를 집계하여 표준 요약 통계를 생성하는 작업
Continuous pipeline & Production mode
- 모든 데이터 세트는 지속적으로 업데이트 되며, 파이프라인은 종료되지 않습니다. 컴퓨팅 리소스는 파이프라인과 함께 유지됩니다.
단일 노트북에서 각각의 매개변수를 받아 데이터를 처리하는 방법
- Task는 키-값 쌍의 매개변수로 진행하고 모두 병렬로 실행할 수 있게 작업에 종속되지 않고 매개변수에 값을 전달하는 태스크를 생성한다
SQL 쿼리 대시보드 자동화
- 데이터브릭스 SQL의 쿼리 페이지에서 쿼리가 1일마다 새롭게 고쳐지도록 예약할 수 있다
Unity Catalog 기능을 사용하려면 데이터브릭스 작업영역에서 수행해야 하는 단계
- 작업 공간 관리/외부테이블/뷰의 개체를 통합 카탈로그로 마이그레이션/업그레이드
Vaccum table retain 0
- 데이터의 모든 기록 버전이 손실됨
Lakehouse = DataLake + DataWareHouse
- 기존 데이터 웨어하우스는 스토리지와 컴퓨팅이 결합되어있었으나, Lakehouse는 분리되어있음
COLLECT SET
- 모든 행의 열 값을 고유한 목록으로 나눠주는 일종의 집계 함수
[1,100,200,300]
[1,250,300]
=> [[1,100,200,300],[1,250,300]]
ARRAY_UNION
- 중복 항목을 결합하고 제거하는 함수
[[1,100,200,300],[1,250,300]]
=> [1,100,200,250,300]
FILTER
- 식을 기반으로 배열을 필터링하는데 사용할 수 있는 함수
[99.00,99.00,100.10,100.9] > 100.0
=> [100.10,100.9]
SIZE
- 배열의 크기를 얻을 수 있는 함수
[100.10,100.9]
=> 2
ARRAY_DISTINCT
- 모든 중복 값이 제거된 입력 인수와 동일한 유형의 배열을 반환
[1,100,200,300,1,250,300]
=> [1,100,200,250,300]
TRANSFORM
- expr 함수를 사용하여 배열의 요소를 변환하는 함수
Spark stream process mode
Complete
→ 현재까지 수신된 모든 데이터의 처리 결과를 출력. 즉, 이전 배치의 결과와 현재 배치의 결과를 모두 출력overwrite
→ 기존에 쓰여진 결과를 삭제하고 새로운 결과를 출력. 즉, 이전에 쓰여진 결과를 삭제하고 새로운 결과를 출력append
→ 스트리밍 데이터를 실시간 db에 추가하거나, 스트리밍 처리의 결과를 기존 출력에 추가하여 새 레코드를 추가하는 모드
Spark stream write mode
append
→ 새 행만 출력.complete
→ 모든 트리거 후 전체 결과 테이블이 출력. 즉, 각 배치에 대해 대상 테이블을 덮어 씀update
→ 업데이트 된 결과 테이블의 행만 출력.
ETL Spark stream
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
**.option("cloudFiles.schemaLocation"**, checkpoint_path)
.load(file_path)
.select("*", current_timestamp().alias("processing_time"))
.writeStream
**.option("checkpointLocation",** checkpoint_path)
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable(table_name))
각 작업 실행의 비용을 추적
- 작업 생성 중에 태그를 사용
사용자 액세스를 기반으로 세분화된 액세스 제어 구현할 수 있는 기술
- Dynamic View Function
Grant / Revoke list
- Select : 읽기 액세스 권한 부여
- Create : 스키마의 테이블 생성하는 기능 제공
- Modify : 데이터를 추가, 삭제, 수정할 수 있는 기능 제공
- Usage : 어떤 기능도 제공하지 않지만 스키마에 대한 작업을 수행하기 위한 추가 요구 사항
- All Privileges : 모든 권한 부여
Show vs Describe
Show table | database의 모든 테이블 반환 |
---|---|
Describe table | External or managed table check |
Databricks Repo
- Databricks Repo를 사용하면 주석을 추가하고 커밋하려는 변경 사항을 선택 할 수 있습니다.
Auto loader (정수열 포함해야하는데 문자열로 출력되는 이유)
- 스키마 힌트를 제공하면 해결
- autoloader는 체크포인트를 사용하여 로드되지 않은 데이터를 자동으로 재처리
spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("cloudFiles.schemaLocation", schema_location) \
**.option("cloudFiles.schemaHints", "id int, description string")**
.load(raw_data_location)
.writeStream \
**.option("checkpointLocation", checkpoint_location) \**
.start(target_delta_table_location)
View / Temp View / Global View
View | Temp view | Global view |
---|---|---|
여러 사용자나 세션에서 공유 가능 (명시적으로 삭제 전까지 사용) | 다른 노트북에 액세스 불가 | 클러스터의 모든 노트북에서 사용 |
데이터베이스에 저장된 테이블과 관련된 논리적인 가상 테이블이며 실제 데이터를 저장하지 않고 원본 테이블에 대한 참조만 저장 | 노트북이 분리되었다가 다시 연결되면 로컬 임시 보기가 손실 | 노트북이 분리되었다가 다시 연결되더라도 여전히 액세스할 수 있지만 클러스터가 다시 시작되면 전역 임시 보기가 손실 |
두 테이블에서 데이터를 가져와서 관계형 개체를 생성 → 물리적 저장 X |
Data plane vs Control plane
Data palne | Control palne |
---|---|
driver node | web application |
worker node | job / pipelines / queris |
jdbc data source | notebooks |
databricks file system | cluster management |
SQL
클러스터를 사용하지 않을 때 → Auto Stop On
피크 시간때 사용자 수가 증가함, 병렬적으로 실행, → extend Scale range
반응형