스키마 정의
일단 정적인 스키마를 적용하려면 스키마부터 정의하고 진행해야 한다
또한 작은 파일들을 최적화하기 위해서 spark.conf.set을 적용한다
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true")
spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true")
spark.conf.set("spark.databricks.delta.preview.enabled", "true")
화자가 이번에 진행한 프로젝트에서는 json을 파싱하는 것을 주로 진행하였는데 json 안에 json이 들어가있어 이를 모두 정의하고 진행했다. 또한 차례대로 정의하기 때문에, B_schema_a를 정의해준 뒤 B_schema를 정의했다
# 대문자 schema json
A_schema = StructType([
StructField("A" , StringType(), True),
StructField("B" , StringType(), True),
StructField("C" , StringType(), True),
])
# B_schema의 b json
B_schema_a = StructType([StructField('aa', StringType(), True),
StructField('bb', StringType(), True),
StructField('cc', StringType(), True),
StructField('dd', StringType(), True)])
# A_schema안의 json
B_schema = StructType([StructField('a', StringType(), True),
StructField('b', a_schema_a, True)])
Auto Loader
오토로더란, 폴더 내에서 변경 분이 일어났을때 이를 감지해서 자동으로 변경 분이 일어나게 만들어주는 데이터브릭스의 기능 중 하나이다
즉, 클라우드 스토리지에 도착하는 새로운 데이터 파일을 점진적으로 처리해주는 기능이라고 말할 수 있다.
체크포인트 폴더도 같이 주어, 오토 로더의 체크포인트 위치에서 유지하고 오류가 발생하는 경우 오토 로더는 체크포인트 위치에 저장된 정보에 따라 중단된 부분부터 다시 시작하고 Delta Lake에 데이터를 쓴다.
# checkpoint path
checkpoint_path = "s3://{your-s3}/"
# json read
json_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(A_schema)
.option("cloudFiles.schemaLocation", checkpoint_path)
.load("{your-s3}"))
# B_schema parsing
json2_stream = json_stream \
.withColumn("B_schema", from_json(col("b"), B_schema_a)) \
.select("*", ("B_schema.a"),("B_schema.b.*")) \
.drop("B_schema","b")
# 프로시저
def r_w_stream(stream, table_name):
query = stream.writeStream \
.outputMode("append") \
.format('delta') \
.option("checkpointLocation", checkpoint_path) \
.option("path", f"{your-external-path}")\
.toTable(f'{your-table}')
r_w_stream(json2_stream, "your-table")
https://docs.databricks.com/en/ingestion/auto-loader/index.html
델타 포맷으로 s3 방면으로 read하면 df가 보이는 것을 알 수 있다.
df = spark.read.format("delta").load("{your-s3}")
df.display()
또한 업데이트 부분이 일어났어야 하는 이슈가 있었는데, foreachBatch를 사용하면 배치성으로 이를 해결할 수 있다
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch
기타사항
동적스키마를 적용할때 (inferschema)
from pyspark.sql.functions import col, from_json, schema_of_json
schema_of_json을 적용한다면 스키마를 추론하고 추론된 스키마를 사용하여 json data를 파싱할 수 있다