apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-core git commit: APEXCORE-445 - Race condition in AsynFSStorageAgent.save()
Date Wed, 27 Apr 2016 02:22:27 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 a9ab4d373 -> f2fa255e1


APEXCORE-445 - Race condition in AsynFSStorageAgent.save()


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f2fa255e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f2fa255e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f2fa255e

Branch: refs/heads/release-3.2
Commit: f2fa255e1def44472721ed61aea536d8224a0abd
Parents: a9ab4d3
Author: Vlad Rozov <v.rozov@datatorrent.com>
Authored: Tue Apr 26 15:04:20 2016 -0700
Committer: Vlad Rozov <v.rozov@datatorrent.com>
Committed: Tue Apr 26 18:09:55 2016 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java            | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f2fa255e/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index d0f00fa..a584dfe 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -47,7 +47,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   {
     super();
     conf = null;
-    localBasePath = null;
   }
 
   public AsyncFSStorageAgent(String path, Configuration conf)
@@ -63,20 +62,25 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf)
   {
     this(path, conf);
+    this.localBasePath = localBasePath;
   }
 
   @Override
   public void save(final Object object, final int operatorId, final long windowId) throws
IOException
   {
-    // save() is only called by one thread in the worker container so the following is okay
-    if (this.localBasePath == null) {
-      this.localBasePath = Files.createTempDirectory("chkp").toString();
-      logger.info("using {} as the basepath for checkpointing.", this.localBasePath);
-    }
     if (syncCheckpoint) {
       super.save(object, operatorId, windowId);
       return;
     }
+
+    if (localBasePath == null) {
+      synchronized (this) {
+        if (localBasePath == null) {
+          localBasePath = Files.createTempDirectory("chkp").toString();
+          logger.info("using {} as the basepath for checkpointing.", localBasePath);
+        }
+      }
+    }
     String operatorIdStr = String.valueOf(operatorId);
     File directory = new File(localBasePath, operatorIdStr);
     if (!directory.exists()) {


Mime
View raw message