Delta Table Vacuum

Once we start appending/overwriting/merging data into delta tables, the number of parquet files in the target location keeps increasing. It is a good practice to keep the number of files in check as this might soon start affecting the read performance.

Delta lake deals with this with “vacuum” operation. Vacuum operation accepts a value for number of hours and deletes all the files that are older than that. By default, this limit is 7 days or 168 hours.

Default Vacuum

scala> import io.delta.tables._
import io.delta.tables._

scala> val dt = DeltaTable.forPath(spark, "D:/temp/delta/d1")
dt: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@45f4a0ee

scala> dt.history.toDF.show()
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|      1|2020-05-28 16:56:...|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|          0|          null|        false|[numFiles -> 1, n...|
|      0|2020-05-14 12:29:...|  null|    null|    WRITE|[mode -> ErrorIfE...|null|    null|     null|       null|          null|         true|                null|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+

scala> dt.vacuum()
20/05/28 18:59:12 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Deleted 0 files and directories in a total of 1 directories.
res6: org.apache.spark.sql.DataFrame = []

As shown above, I ran vacuum with no value passed for the number of hours. This, by default tried to delete files older than 7 days. In this case, 0 files were deleted as all my files were created more recently and there was nothing to delete.

Vacuum files less than 7 days old

While for most cases 7 days is a good value and works well, there might be a need to delete files that are less than 7 days old. In certain cases, we might also need to delete all the files that are older than the most recent version.

scala> dt.vacuum(0)
java.lang.IllegalArgumentException: requirement failed: Are you sure you would like to vacuum files with such a low retention period? If you have
writers that are currently writing to this table, there is a risk that you may corrupt the
state of your Delta table.

If you are certain that there are no operations being performed on this table, such as
insert/upsert/delete/optimize, then you may turn off this check by setting:
spark.databricks.delta.retentionDurationCheck.enabled = false

If you are not sure, please use a value not less than "168 hours".

  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.sql.delta.commands.VacuumCommand$.checkRetentionPeriodSafety(VacuumCommand.scala:63)
  at org.apache.spark.sql.delta.commands.VacuumCommand$$anonfun$gc$1.apply(VacuumCommand.scala:112)
  at org.apache.spark.sql.delta.commands.VacuumCommand$$anonfun$gc$1.apply(VacuumCommand.scala:98)
  at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
  at org.apache.spark.sql.delta.commands.VacuumCommand$.recordOperation(VacuumCommand.scala:46)
  at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
  at org.apache.spark.sql.delta.commands.VacuumCommand$.recordDeltaOperation(VacuumCommand.scala:46)
  at org.apache.spark.sql.delta.commands.VacuumCommand$.gc(VacuumCommand.scala:98)
  at io.delta.tables.execution.DeltaTableOperations$class.executeVacuum(DeltaTableOperations.scala:109)
  at io.delta.tables.DeltaTable.executeVacuum(DeltaTable.scala:42)
  at io.delta.tables.DeltaTable.vacuum(DeltaTable.scala:91)
  ... 51 elided

The code above shows that I tried to vacuum all the files older than the most recent version by passing 0 for the number of hours to the vacuum operation. But that resulted in an error along with a message stating by default I was not allowed to do that. Delta provides a check on such operations to make sure this kind of operation is not done by mistake. We can turn off this flag if we know we might need to delete recent files for some reason.

Below, I explicitly turned off that check and was then able to delete more recent files. In this case, 1 file was deleted as a result of this operation.

scala> spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

scala> dt.vacuum(0)
Deleted 1 files and directories in a total of 1 directories.
res5: org.apache.spark.sql.DataFrame = []

Accessing deleted files

While vacuum deletes the data files, it does not adjust the history of the delta table. This means the history still shows all the versions even though the files associated with a particular version might already have been vacuumed. Accessing data for such a version can result in a FileNotFoundException as shown below, when I try to access version 0 which I already vacuumed in the previous section.

scala> spark.read.format("delta").option("versionAsOf",0).load("D:/temp/delta/d1").show()
20/05/28 19:18:32 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
20/05/28 19:18:36 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 211)
java.io.FileNotFoundException: File file:/D:/temp/delta/d1/part-00000-b6534dbb-8487-4666-b5a4-644e7a9e8540-c000.snappy.parquet does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

Leave a Reply

Your email address will not be published. Required fields are marked *