본문 바로가기

Data Engineering

[Spark/Databricks] Change of Capture / Auto loader

스키마 정의

일단 정적인 스키마를 적용하려면 스키마부터 정의하고 진행해야 한다

또한 작은 파일들을 최적화하기 위해서 spark.conf.set을 적용한다

from pyspark.sql.functions import *
from pyspark.sql.types import *

spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true")
spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true")
spark.conf.set("spark.databricks.delta.preview.enabled", "true")

 

화자가 이번에 진행한 프로젝트에서는 json을 파싱하는 것을 주로 진행하였는데 json 안에 json이 들어가있어 이를 모두 정의하고 진행했다. 또한 차례대로 정의하기 때문에, B_schema_a를 정의해준 뒤 B_schema를 정의했다

# 대문자 schema json
A_schema = StructType([
    StructField("A" , StringType(), True),
    StructField("B" , StringType(), True),
    StructField("C" , StringType(), True),
])

# B_schema의 b json
B_schema_a = StructType([StructField('aa', StringType(), True), 
                         StructField('bb', StringType(), True), 
                         StructField('cc', StringType(), True), 
                         StructField('dd', StringType(), True)])

# A_schema안의 json
B_schema = StructType([StructField('a', StringType(), True), 
                       StructField('b', a_schema_a, True)])

 

Auto Loader

오토로더란, 폴더 내에서 변경 분이 일어났을때 이를 감지해서 자동으로 변경 분이 일어나게 만들어주는 데이터브릭스의 기능 중 하나이다

즉, 클라우드 스토리지에 도착하는 새로운 데이터 파일을 점진적으로 처리해주는 기능이라고 말할 수 있다.

체크포인트 폴더도 같이 주어, 오토 로더의 체크포인트 위치에서 유지하고 오류가 발생하는 경우 오토 로더는 체크포인트 위치에 저장된 정보에 따라 중단된 부분부터 다시 시작하고 Delta Lake에 데이터를 쓴다. 

# checkpoint path
checkpoint_path = "s3://{your-s3}/"

# json read
json_stream = (spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "json")
          .schema(A_schema)
          .option("cloudFiles.schemaLocation", checkpoint_path)
          .load("{your-s3}"))

# B_schema parsing                       
json2_stream = json_stream \
    .withColumn("B_schema", from_json(col("b"), B_schema_a)) \
    .select("*", ("B_schema.a"),("B_schema.b.*")) \
    .drop("B_schema","b")

# 프로시저
def r_w_stream(stream, table_name):
    query = stream.writeStream \
            .outputMode("append") \
            .format('delta') \
            .option("checkpointLocation", checkpoint_path) \
            .option("path", f"{your-external-path}")\
            .toTable(f'{your-table}')

r_w_stream(json2_stream, "your-table")

 

https://docs.databricks.com/en/ingestion/auto-loader/index.html

 

Databricks documentation

 

docs.databricks.com

델타 포맷으로 s3 방면으로 read하면 df가 보이는 것을 알 수 있다.

df = spark.read.format("delta").load("{your-s3}")
df.display()

 

또한 업데이트 부분이 일어났어야 하는 이슈가 있었는데, foreachBatch를 사용하면 배치성으로 이를 해결할 수 있다

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()

 

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch

 

Structured Streaming Programming Guide - Spark 3.5.0 Documentation

Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s

spark.apache.org

기타사항

동적스키마를 적용할때 (inferschema)

from pyspark.sql.functions import col, from_json, schema_of_json

 

schema_of_json을 적용한다면 스키마를 추론하고 추론된 스키마를 사용하여 json data를 파싱할 수 있다

반응형