spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-17624][SQL][STREAMING][TEST] Fixed flaky StateStoreSuite.maintenance
Date Tue, 25 Oct 2016 00:21:22 GMT
Repository: spark
Updated Branches:
  refs/heads/master 81d6933e7 -> 407c3cedf


[SPARK-17624][SQL][STREAMING][TEST] Fixed flaky StateStoreSuite.maintenance

## What changes were proposed in this pull request?

The reason for the flakiness was follows. The test starts the maintenance background thread,
and then writes 20 versions of the state store. The maintenance thread is expected to create
snapshots in the middle, and clean up old files that are not needed any more. The earliest
delta file (1.delta) is expected to be deleted as snapshots will ensure that the earliest
delta would not be needed.

However, the default configuration for the maintenance thread is to retain files such that
last 2 versions can be recovered, and delete the rest. Now while generating the versions,
the maintenance thread can kick in and create snapshots anywhere between version 10 and 20
(at least 10 deltas needed for snapshot). Then later it will choose to retain only version
20 and 19 (last 2). There are two cases.

- Common case: One of the version between 10 and 19 gets snapshotted. Then recovering versions
19 and 20 just needs 19.snapshot and 20.delta, so 1.delta gets deleted.

- Uncommon case (reason for flakiness): Only version 20 gets snapshotted. Then recovering
versoin 20 requires 20.snapshot, and recovering version 19 all the previous 19...1.delta.
So 1.delta does not get deleted.

This PR rearranges the checks such that it create 20 versions, and then waits that there is
at least one snapshot, then creates another 20. This will ensure that the latest 2 versions
cannot require anything older than the first snapshot generated, and therefore will 1.delta
will be deleted.

In addition, I have added more logs, and comments that I felt would help future debugging
and understanding what is going on.

## How was this patch tested?

Ran the StateStoreSuite > 6K times in a heavily loaded machine (10 instances of tests running
in parallel). No failures.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15592 from tdas/SPARK-17624.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/407c3ced
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/407c3ced
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/407c3ced

Branch: refs/heads/master
Commit: 407c3cedf29a4413339dcde758295dc3225a0054
Parents: 81d6933
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Mon Oct 24 17:21:16 2016 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Mon Oct 24 17:21:16 2016 -0700

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    | 18 ++++---
 .../streaming/state/StateStoreCoordinator.scala | 18 +++++--
 .../streaming/state/StateStoreSuite.scala       | 49 +++++++++++++-------
 3 files changed, 57 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/407c3ced/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 7d71f52..f1e7f1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -159,7 +159,7 @@ private[state] class HDFSBackedStateStoreProvider(
       } catch {
         case NonFatal(e) =>
           throw new IllegalStateException(
-            s"Error committing version $newVersion into ${HDFSBackedStateStoreProvider.this}",
e)
+            s"Error committing version $newVersion into $this", e)
       }
     }
 
@@ -205,6 +205,10 @@ private[state] class HDFSBackedStateStoreProvider(
     override private[state] def hasCommitted: Boolean = {
       state == COMMITTED
     }
+
+    override def toString(): String = {
+      s"HDFSStateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
+    }
   }
 
   /** Get the state store for making updates to create a new `version` of the store. */
@@ -215,7 +219,7 @@ private[state] class HDFSBackedStateStoreProvider(
       newMap.putAll(loadMap(version))
     }
     val store = new HDFSBackedStateStore(version, newMap)
-    logInfo(s"Retrieved version $version of $this for update")
+    logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update")
     store
   }
 
@@ -231,7 +235,7 @@ private[state] class HDFSBackedStateStoreProvider(
   }
 
   override def toString(): String = {
-    s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
+    s"HDFSStateStoreProvider[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
   }
 
   /* Internal classes and methods */
@@ -493,10 +497,12 @@ private[state] class HDFSBackedStateStoreProvider(
             val mapsToRemove = loadedMaps.keys.filter(_ < earliestVersionToRetain).toSeq
             mapsToRemove.foreach(loadedMaps.remove)
           }
-          files.filter(_.version < earliestFileToRetain.version).foreach { f =>
+          val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
+          filesToDelete.foreach { f =>
             fs.delete(f.path, true)
           }
-          logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this")
+          logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: "
+
+            filesToDelete.mkString(", "))
         }
       }
     } catch {
@@ -560,7 +566,7 @@ private[state] class HDFSBackedStateStoreProvider(
       }
     }
     val storeFiles = versionToFiles.values.toSeq.sortBy(_.version)
-    logDebug(s"Current set of files for $this: $storeFiles")
+    logDebug(s"Current set of files for $this: ${storeFiles.mkString(", ")}")
     storeFiles
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/407c3ced/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index d945d7a..267d176 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -38,7 +38,7 @@ private case class VerifyIfInstanceActive(storeId: StateStoreId, executorId:
Str
 private case class GetLocation(storeId: StateStoreId)
   extends StateStoreCoordinatorMessage
 
-private case class DeactivateInstances(storeRootLocation: String)
+private case class DeactivateInstances(checkpointLocation: String)
   extends StateStoreCoordinatorMessage
 
 private object StopCoordinator
@@ -111,11 +111,13 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef)
{
  * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster,
  * and get their locations for job scheduling.
  */
-private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
{
+private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
+    extends ThreadSafeRpcEndpoint with Logging {
   private val instances = new mutable.HashMap[StateStoreId, ExecutorCacheTaskLocation]
 
   override def receive: PartialFunction[Any, Unit] = {
     case ReportActiveInstance(id, host, executorId) =>
+      logDebug(s"Reported state store $id is active at $executorId")
       instances.put(id, ExecutorCacheTaskLocation(host, executorId))
   }
 
@@ -125,19 +127,25 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends
ThreadS
         case Some(location) => location.executorId == execId
         case None => false
       }
+      logDebug(s"Verified that state store $id is active: $response")
       context.reply(response)
 
     case GetLocation(id) =>
-      context.reply(instances.get(id).map(_.toString))
+      val executorId = instances.get(id).map(_.toString)
+      logDebug(s"Got location of the state store $id: $executorId")
+      context.reply(executorId)
 
-    case DeactivateInstances(loc) =>
+    case DeactivateInstances(checkpointLocation) =>
       val storeIdsToRemove =
-        instances.keys.filter(_.checkpointLocation == loc).toSeq
+        instances.keys.filter(_.checkpointLocation == checkpointLocation).toSeq
       instances --= storeIdsToRemove
+      logDebug(s"Deactivating instances related to checkpoint location $checkpointLocation:
" +
+        storeIdsToRemove.mkString(", "))
       context.reply(true)
 
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been deregistered
+      logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/407c3ced/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 06f1bd6..fcf300b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -367,7 +367,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with
PrivateMeth
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test")
+      // Make maintenance thread do snapshots and cleanups very fast
       .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
+      // Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
+      // fails to talk to the StateStoreCoordinator and unloads all the StateStores
       .set("spark.rpc.numRetries", "1")
     val opId = 0
     val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
@@ -377,37 +380,49 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with
PrivateMeth
     val provider = new HDFSBackedStateStoreProvider(
       storeId, keySchema, valueSchema, storeConf, hadoopConf)
 
+    var latestStoreVersion = 0
+
+    def generateStoreVersions() {
+      for (i <- 1 to 20) {
+        val store = StateStore.get(
+          storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
+        put(store, "a", i)
+        store.commit()
+        latestStoreVersion += 1
+      }
+    }
 
     quietly {
       withSpark(new SparkContext(conf)) { sc =>
         withCoordinatorRef(sc) { coordinatorRef =>
           require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running")
 
-          for (i <- 1 to 20) {
-            val store = StateStore.get(
-              storeId, keySchema, valueSchema, i - 1, storeConf, hadoopConf)
-            put(store, "a", i)
-            store.commit()
-          }
+          // Generate sufficient versions of store for snapshots
+          generateStoreVersions()
 
           eventually(timeout(10 seconds)) {
+            // Store should have been reported to the coordinator
             assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not
reported")
-          }
 
-          // Background maintenance should clean up and generate snapshots
-          assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
-
-          eventually(timeout(10 seconds)) {
-            // Earliest delta file should get cleaned up
-            assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
+            // Background maintenance should clean up and generate snapshots
+            assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
 
             // Some snapshots should have been generated
-            val snapshotVersions = (0 to 20).filter { version =>
+            val snapshotVersions = (1 to latestStoreVersion).filter { version =>
               fileExists(provider, version, isSnapshot = true)
             }
             assert(snapshotVersions.nonEmpty, "no snapshot file found")
           }
 
+          // Generate more versions such that there is another snapshot and
+          // the earliest delta file will be cleaned up
+          generateStoreVersions()
+
+          // Earliest delta file should get cleaned up
+          eventually(timeout(10 seconds)) {
+            assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
+          }
+
           // If driver decides to deactivate all instances of the store, then this instance
           // should be unloaded
           coordinatorRef.deactivateInstances(dir)
@@ -416,7 +431,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
           }
 
           // Reload the store and verify
-          StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf)
+          StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf,
hadoopConf)
           assert(StateStore.isLoaded(storeId))
 
           // If some other executor loads the store, then this instance should be unloaded
@@ -426,14 +441,14 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with
PrivateMeth
           }
 
           // Reload the store and verify
-          StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf)
+          StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf,
hadoopConf)
           assert(StateStore.isLoaded(storeId))
         }
       }
 
       // Verify if instance is unloaded if SparkContext is stopped
-      require(SparkEnv.get === null)
       eventually(timeout(10 seconds)) {
+        require(SparkEnv.get === null)
         assert(!StateStore.isLoaded(storeId))
         assert(!StateStore.isMaintenanceRunning)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message