Schema Evolution With Delta

Delta Lake is an open source storage layer that provides ACID transactions. Spark DataFrames can be saved in delta format by just specifying the format as “delta”. Similarly, the saved delta table can be read by reading the format as “delta”.

The longer we use Delta, the more likely it is that we will run into a scenario where the incoming data has a schema that is slightly different from the target Delta table schema. Like with every other thing around us, evolution of schema over time is a very common scenario.

Delta does have a way to support schema evolution, start version 0.6.0. It does so with the setting – “spark.databricks.delta.schema.autoMerge.enabled” in the Spark config. The default value is false, and will need to be explicitly enabled to support this feature.

scala> spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

Below is an example of how this works in code.

Starting the REPL

I am using Scala 2.12, Spark 3.1.2 and its compatible Delta version 1.0.0. I started the REPL using the command –

spark-shell --packages io.delta:delta-core_2.12:1.0.0

Create DataFrames

Once in the REPL, lets create our test DataFrames.

scala> val values1 = List(List("Tom", "10", "12") ,List("Tim", "10", "13"),List("Harry", "11", "14")).map(x =>(x(0), x(1),x(2)))
values1: List[(String, String, String)] = List((Tom,10,12), (Tim,10,13), (Harry,11,14))

scala> val df1 = values1.toDF("name","age","marks")
df1: org.apache.spark.sql.DataFrame = [name: string, age: string ... 1 more field]

scala> val values2 = List(List("Tom", "10", "11", "existing") ,List("Jim", "11", "9", "new"),List("Harry", "11", "14", "existing")).map(x =>(x(0), x(1),x(2), x(3)))
values2: List[(String, String, String, String)] = List((Tom,10,11,existing), (Jim,11,9,new), (Harry,11,14,existing))

scala> val df2 = values2.toDF("name","age","marks", "status")
df2: org.apache.spark.sql.DataFrame = [name: string, age: string ... 2 more fields]

scala> df1.show
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|  Tom| 10|   12|
|  Tim| 10|   13|
|Harry| 11|   14|
+-----+---+-----+


scala> df2.show
+-----+---+-----+--------+
| name|age|marks|  status|
+-----+---+-----+--------+
|  Tom| 10|   11|existing|
|  Jim| 11|    9|     new|
|Harry| 11|   14|existing|
+-----+---+-----+--------+

scala> df1.write.format("delta").save("/home/temp/delta/schematest")

In the above code snippet, I have created a df1 with 3 columns – name, age and marks, and saved it as Delta. I have also created df2, which has an added column status.

Merge without Schema Evolution

Lets merge df2 into our Delta Table, which was originally created from df1 above.

scala> import io.delta.tables._
import io.delta.tables._

scala> val dt = DeltaTable.forPath(spark, "/home/temp/delta/schematest")
dt: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@4836c558


scala> dt.as("existing").merge(df2.as("new"), "existing.name=new.name").whenMatched.updateAll().whenNotMatched.insertAll().execute()

scala> spark.read.format("delta").load("/home/temp/delta/schematest").show
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11|   14|
|  Tim| 10|   13|
|  Tom| 10|   11|
|  Jim| 11|    9|
+-----+---+-----+

Above, I merged df2 (which has an added column status) into our Delta Table (which has 3 columns). When I read back the merged data and show the loaded DF, I see that the rows were merged as expected – marks for Tom got updated (12 -> 11), rows for Harry and Tim remained unchanged, and a new row for Jim got inserted.

The columns did not change though, and we lost the status column from the df2 that was merged in. While this may be desirable in some cases, in most cases we would want to update the schema and retain the schema changes, a new column in this case.

Merge with Schema Evolution – added column

Lets save df1 to a different location, and this time merge df2 with Schema Evolution enabled.

scala> spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

scala> df1.write.format("delta").save("/home/temp/delta/schematest2")

scala> val dt2 = DeltaTable.forPath(spark, "/home/temp/delta/schematest2")
dt2: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@7b4b6695

scala> dt2.as("existing").merge(df2.as("new"), "existing.name=new.name").whenMatched.updateAll().whenNotMatched.insertAll().execute()

scala> spark.read.format("delta").load("/home/temp/delta/schematest2").show
+-----+---+-----+--------+
| name|age|marks|  status|
+-----+---+-----+--------+
|Harry| 11|   14|existing|
|  Tom| 10|   11|existing|
|  Jim| 11|    9|     new|
|  Tim| 10|   13|    null|
+-----+---+-----+--------+

Above, I enabled the schema merge option on my Spark Session, saved df1 to a different location than in previous case and merged df2 into the Delta Table created from the new location.

On reading data from this new location, I can see the newly added status column.

Note that while the 3 incoming rows have a value for the status column, the existing row, which does not have a value for the new column, was saved with a null for that new column.

Merge with Schema Evolution – removed column

In the above example, we added a new column. Lets see what happens when we remove a column in the incoming data.

scala> val values3 = List(List("Bill", "12","new") ,List("Harry", "15","updated")).map(x =>(x(0), x(1),x(2)))
values3: List[(String, String, String)] = List((Bill,12,new), (Harry,15,updated))

scala> val df3 = values3.toDF("name","marks", "status")
df3: org.apache.spark.sql.DataFrame = [name: string, marks: string ... 1 more field]

scala> df3.show
+-----+-----+-------+
| name|marks| status|
+-----+-----+-------+
| Bill|   12|    new|
|Harry|   15|updated|
+-----+-----+-------+

scala> val dt2 = DeltaTable.forPath(spark, "/home/temp/delta/schematest2")
dt2: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@6e3cafc7

scala> dt2.as("existing").merge(df3.as("new"), "existing.name=new.name").whenMatched.updateAll().whenNotMatched.insertAll().execute()
                                                                                
scala> spark.read.format("delta").load("/home/temp/delta/schematest2").show
+-----+----+-----+--------+
| name| age|marks|  status|
+-----+----+-----+--------+
|Harry|  11|   15| updated|
|  Tom|  10|   11|existing|
|  Jim|  11|    9|     new|
| Bill|null|   12|     new|
|  Tim|  10|   13|    null|
+-----+----+-----+--------+

Above, I have created a new DataFrame df3, where the age column is missing. I reload the Delta Table (needed to get the latest schema) and merge df3 to it. Our merge schema option in this Spark Session is still enabled.

On reading back the merged data, I still see the dropped column ages. This is desirable as we do not want to lose the existing data. But note that the new row added for Bill has a value null for that column.

Append with Schema Evolution

The above example focused on merge operation, which is most commonly used. Overwrite is the simplest one, where the Delta Table is created new with the incoming data. In that case, schema evolution is not a problem as the table will be created with the latest schema.

Append works similar to merge. Lets see an example for it.

scala> val values4 = List(List("John", "12","new","USA") ).map(x =>(x(0), x(1),x(2),x(3)))
values4: List[(String, String, String, String)] = List((John,12,new,USA))

scala> val df4 = values4.toDF("name","marks", "status","country")
df4: org.apache.spark.sql.DataFrame = [name: string, marks: string ... 2 more fields]

scala> df4.show
+----+-----+------+-------+
|name|marks|status|country|
+----+-----+------+-------+
|John|   12|   new|    USA|
+----+-----+------+-------+


scala> val dt3 = DeltaTable.forPath(spark, "/home/temp/delta/schematest")
dt3: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@534400d

scala> dt3.toDF.show
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11|   14|
|  Tim| 10|   13|
|  Tom| 10|   11|
|  Jim| 11|    9|
+-----+---+-----+

scala> df4.write.format("delta").mode("append").save("/home/temp/delta/schematest")

scala> spark.read.format("delta").load("/home/temp/delta/schematest")
res29: org.apache.spark.sql.DataFrame = [name: string, age: string ... 3 more fields]

scala> spark.read.format("delta").load("/home/temp/delta/schematest").show
+-----+----+-----+------+-------+
| name| age|marks|status|country|
+-----+----+-----+------+-------+
| John|null|   12|   new|    USA|
|Harry|  11|   14|  null|   null|
|  Tim|  10|   13|  null|   null|
|  Tom|  10|   11|  null|   null|
|  Jim|  11|    9|  null|   null|
+-----+----+-----+------+-------+

In the example above, I have created df4 with 3 columns – name, marks , status and country. Our original DT has columns name, age and marks.

On appending df4 to DT, and reading back the output, we see that 2 new columns now exist – status and country. All the existing rows show a value null for the newly added columns. Also note that the column age which was missing in df4 has a value null for the newly added row as df4 did not have that value.

Leave a Reply

Your email address will not be published. Required fields are marked *