Delta Lake is an open source storage layer that provides ACID transactions. Spark DataFrames can be saved in delta format by just specifying the format as “delta”.
Below are some examples showing different modes in which a DataFrame can be saved as delta. I have used the Scala REPL to capture these. I started my REPL using the command below –
spark-shell --packages io.delta:delta-core_2.11:0.6.0
Create DataFrames
Once in the REPL, I used the below to create the test DataFrames that I will use in the later sections to show various modes.
scala> val values1 = List(List("Tom", "10", "12") ,List("Tim", "10", "13"),List("Harry", "11", "14")).map(x =>(x(0), x(1),x(2)))
values1: List[(String, String, String)] = List((Tom,10,12), (Tim,10,13), (Harry,11,14))
scala> val df1 = values1.toDF("name","age","marks")
df1: org.apache.spark.sql.DataFrame = [name: string, age: string ... 1 more field]
scala> val values2 = List(List("Tom", "10", "11") ,List("Jim", "11", "9"),List("Harry", "11", "14")).map(x =>(x(0), x(1),x(2)))
values2: List[(String, String, String)] = List((Tom,10,11), (Jim,11,9), (Harry,11,14))
scala> val df2 = values2.toDF("name","age","marks")
df2: org.apache.spark.sql.DataFrame = [name: string, age: string ... 1 more field]
scala> df1.show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
| Tom| 10| 12|
| Tim| 10| 13|
|Harry| 11| 14|
+-----+---+-----+
scala> df2.show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
| Tom| 10| 11|
| Jim| 11| 9|
|Harry| 11| 14|
+-----+---+-----+
Append Mode
The most basic mode is append. As the name suggests, it will just append the incoming records to the existing records.
scala> df1.write.format("delta").save("D:/temp/delta/append")
scala> spark.read.format("delta").load("D:/temp/delta/append").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
| Tim| 10| 13|
| Tom| 10| 12|
+-----+---+-----+
scala> df2.write.format("delta").mode("append").save("D:/temp/delta/append")
scala> spark.read.format("delta").load("D:/temp/delta/append").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
|Harry| 11| 14|
| Tom| 10| 11|
| Tim| 10| 13|
| Tom| 10| 12|
| Jim| 11| 9|
+-----+---+-----+
The resulting DataFrame here has a total of 6 records, the 3 that were written by the first save and other 3 from the append that was done using df2. The duplicate records in df2 can be seen in the final result.
Overwrite Mode
In overwrite mode, the existing data is completely overwritten by the new data.
scala> df1.write.format("delta").save("D:/temp/delta/overwrite")
scala> spark.read.format("delta").load("D:/temp/delta/overwrite").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
| Tom| 10| 12|
| Tim| 10| 13|
+-----+---+-----+
scala> df2.write.format("delta").mode("overwrite").save("D:/temp/delta/overwrite")
scala> spark.read.format("delta").load("D:/temp/delta/overwrite").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
| Tom| 10| 11|
| Jim| 11| 9|
+-----+---+-----+
The resulting DataFrame here shows the 3 records that made up df2.
Merge Mode
Merge is the most powerful write mode supported by delta. In this mode, the incoming records can be “merged” into existing data. For this, we need to specify one or more fields that can serve as the key on which the 2 datasets can be joined on. Based on whether the key resulted in a match or not, existing records can be updated or new records be inserted.
scala> df1.write.format("delta").save("D:/temp/delta/merge")
scala> spark.read.format("delta").load("D:/temp/delta/merge").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
| Tim| 10| 13|
| Tom| 10| 12|
+-----+---+-----+
scala> import io.delta.tables._
import io.delta.tables._
scala> val dt = DeltaTable.forPath(spark, "D:/temp/delta/merge")
dt: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@5b53abd3
scala> dt.as("existing").merge(df2.as("new"), "existing.name=new.name").whenMatched.updateAll().whenNotMatched.insertAll().execute()
scala> spark.read.format("delta").load("D:/temp/delta/merge").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
| Tom| 10| 11|
| Tim| 10| 13|
| Jim| 11| 9|
+-----+---+-----+
I have used the field “name” as the key in this case. When merging df2, the records for “Tom” and “Tim” in df2 were matched and updated with the values in df2. This resulted in an update of marks for “Tom” from 12 to 11. “Jim” however was not matched and hence inserted as a new records, resulting in the total of 4 records – 3 existing, out of which 2 update and 1 new inserted.
The merge also supports predicates which serve as additional conditions on top of the key matching that is done. In the example above, marks for “Tom” got updated from 12 to 11. But I do not want to update marks for existing records unless the new records have higher marks than what exist. Below is how I can do that using a condition for the matched records.
scala> df1.write.format("delta").save("D:/temp/delta/merge2")
scala> val dt2 = DeltaTable.forPath(spark, "D:/temp/delta/merge2")
dt2: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@3832ecba
scala> dt2.as("existing").merge(df2.as("new"), "existing.name=new.name").whenMatched("new.marks > existing.marks").updateAll().whenNotMatched.insertAll().execute()
scala> spark.read.format("delta").load("D:/temp/delta/merge2").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
| Tim| 10| 13|
| Tom| 10| 12|
| Jim| 11| 9|
+-----+---+-----+
This merge behaved like the one before this, with a difference that now marks for “Tom” remained at 12.
History
While we append, overwrite and merge data, we can always check on the history of operations that were performed on a given delta.
scala> dt2.history.show()
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
| 1|2020-05-23 22:36:...| null| null| MERGE|[predicate -> (ex...|null| null| null| 0| null| false|[numTargetRowsCop...|
| 0|2020-05-23 22:35:...| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| null| true|[numFiles -> 3, n...|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
The history above shows that the version 0 was a “Write” operation, while version 1 was a “Merge” operation. It gives many other used details about these operations.
We can always read an older version of the data if needed. By default, when reading, we always get the latest copy of the data.
scala> spark.read.format("delta").load("D:/temp/delta/merge2").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
| Tim| 10| 13|
| Tom| 10| 12|
| Jim| 11| 9|
+-----+---+-----+
scala> spark.read.format("delta").option("versionAsOf",0).load("D:/temp/delta/merge2").show()
+-----+---+-----+
| name|age|marks|
+-----+---+-----+
|Harry| 11| 14|
| Tim| 10| 13|
| Tom| 10| 12|
+-----+---+-----+
Here, while the latest data after our merge shows the newly added record for “Jim”, the version 0 did not have that record.
Good info, thanks for sharing.
This information will be useful to many people. Keep adding more! 😀
awesome content, thank you so much