apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pra...@apache.org
Subject [1/2] apex-core git commit: APEXCORE-575 Improve application restart time.
Date Mon, 10 Apr 2017 22:27:00 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 412a3bd81 -> 88bf33627


APEXCORE-575 Improve application restart time.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8825f5fa
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8825f5fa
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8825f5fa

Branch: refs/heads/master
Commit: 8825f5fa3e22beaf360f111f37ec0c4dba24ad1c
Parents: 9054fd2
Author: Tushar R. Gosavi <tushar@apache.org>
Authored: Mon Feb 27 17:23:20 2017 +0530
Committer: Tushar R. Gosavi <tushar@apache.org>
Committed: Tue Apr 4 01:03:21 2017 +0530

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java        |  14 +-
 .../datatorrent/common/util/FSStorageAgent.java |  11 +
 .../apex/common/util/AsyncStorageAgent.java     |  54 +++++
 .../apex/common/util/CascadeStorageAgent.java   | 202 +++++++++++++++++++
 .../common/util/CascadeStorageAgentTest.java    | 116 +++++++++++
 .../java/com/datatorrent/stram/StramClient.java |  12 +-
 .../stram/StreamingContainerManager.java        |  49 +++--
 .../java/com/datatorrent/stram/engine/Node.java |  17 +-
 .../stram/plan/physical/PhysicalPlan.java       |   9 +-
 .../datatorrent/stram/StramRecoveryTest.java    |  45 ++++-
 10 files changed, 493 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 24d850e..0c389a4 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -30,6 +30,7 @@ import java.util.EnumSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.AsyncStorageAgent;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -45,7 +46,7 @@ import com.google.common.base.Throwables;
  *
  * @since 3.1.0
  */
-public class AsyncFSStorageAgent extends FSStorageAgent
+public class AsyncFSStorageAgent extends FSStorageAgent implements AsyncStorageAgent
 {
   private final transient Configuration conf;
   private transient volatile String localBasePath;
@@ -146,6 +147,16 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   }
 
   @Override
+  public void finalize(int operatorId, long windowId) throws IOException
+  {
+    // Checkpoint already present in HDFS during save, when syncCheckpoint is true.
+    if (isSyncCheckpoint()) {
+      return;
+    }
+    copyToHDFS(operatorId, windowId);
+  }
+
+  @Override
   public Object readResolve() throws ObjectStreamException
   {
     AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.path, null);
@@ -153,6 +164,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent
     return asyncFSStorageAgent;
   }
 
+  @Override
   public boolean isSyncCheckpoint()
   {
     return syncCheckpoint;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index fe90b86..b5a43fe 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.common.util;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectStreamException;
@@ -153,6 +154,16 @@ public class FSStorageAgent implements StorageAgent, Serializable
   public long[] getWindowIds(int operatorId) throws IOException
   {
     Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId));
+    try {
+      FileStatus status = fileContext.getFileStatus(lPath);
+      if (!status.isDirectory()) {
+        throw new IOException("Checkpoint location is not a directory ");
+      }
+    } catch (FileNotFoundException ex) {
+      // During initialization this directory may not exists.
+      // return an empty array.
+      return new long[0];
+    }
 
     RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
     if (!fileStatusRemoteIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
new file mode 100644
index 0000000..632a7f2
--- /dev/null
+++ b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.common.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.StorageAgent;
+
+/**
+ * Storage agent which can take checkpoints asynchronously.
+ * An AsyncStorageAgent enables quick checkpoints by taking local snapshot of an operator
+ * and unblocking the operator to process more data, while storage engine is pushing local
snapshot to
+ * the distributed or globally accessible location for recovery.
+ */
+@InterfaceStability.Evolving
+public interface AsyncStorageAgent extends StorageAgent
+{
+  /**
+   * Make checkpoint for given windowID final. i.e after this method returns,
+   * the checkpoint is accessible for recovery.
+   *
+   * @param operatorId
+   * @param windowId
+   * @throws IOException
+   */
+  public void finalize(int operatorId, long windowId) throws IOException;
+
+  /**
+   * Check if StorageAgent is configured to take synchronous checkpoints.
+   *
+   * @return true if StorageAgent is configured to take synchronous checkpoints.
+   * @return false otherwise.
+   */
+  public boolean isSyncCheckpoint();
+
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
new file mode 100644
index 0000000..d6fec8e
--- /dev/null
+++ b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.common.util;
+
+import java.io.IOException;
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.StorageAgent;
+
+/**
+ * A StorageAgent which chains two StorageAgent. It use the current storage-agent to store
+ * the checkpoint, and use the parent agent to read old checkpoints. For application having
+ * large number of physical operators, the size and number of files to be copied could be
+ * large impacting application restart time. This storage-agent is used during application
+ * restart to avoiding copying checkpoints from old application directory to improve application
+ * restart time.
+ */
+public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable
+{
+  private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class);
+  private final StorageAgent parent;
+  private final StorageAgent current;
+  private final transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
+
+  public CascadeStorageAgent(StorageAgent parent, StorageAgent current)
+  {
+    this.parent = parent;
+    this.current = current;
+    oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
+  }
+
+  /**
+   * does the checkpoint belong to parent
+   */
+  private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException
+  {
+    long[] wids = getParentWindowIds(operatorId);
+    if (wids.length != 0) {
+      return (wid <= wids[wids.length - 1]);
+    }
+    return false;
+  }
+
+  /**
+   * Return window-id of checkpoints available in old storage agent. This function
+   * will call getWindowIds of old storage agent only once for the fist time, and
+   * return cached data for next calls for same operator.
+   *
+   * @param operatorId
+   * @return
+   * @throws IOException
+   */
+  private long[] getParentWindowIds(int operatorId) throws IOException
+  {
+    long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId);
+    if (oldWindowIds == null) {
+      oldWindowIds = parent.getWindowIds(operatorId);
+      if (oldWindowIds == null) {
+        oldWindowIds = new long[0];
+      }
+      Arrays.sort(oldWindowIds);
+      oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds);
+      logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}",
operatorId, Arrays.toString(oldWindowIds));
+    }
+    return oldWindowIds;
+  }
+
+  /**
+   * Save object in current storage agent. This should not modify old storage agent
+   * in any way.
+   *
+   * @param object - The operator whose state needs to be saved.
+   * @param operatorId - Identifier of the operator.
+   * @param windowId - Identifier for the specific state of the operator.
+   * @throws IOException
+   */
+  @Override
+  public void save(Object object, int operatorId, long windowId) throws IOException
+  {
+    current.save(object, operatorId, windowId);
+  }
+
+  /**
+   * Delete old checkpoints from the storage agent.
+   *
+   * The checkpoints are deleted from current directory if it is present in current
+   * storage agent. and cached state for old storage agent is removed.
+   *
+   * @param operatorId
+   * @param windowId
+   * @throws IOException
+   */
+  @Override
+  public void delete(int operatorId, long windowId) throws IOException
+  {
+    if (!isCheckpointFromParent(operatorId, windowId)) {
+      current.delete(operatorId, windowId);
+    }
+  }
+
+  /**
+   * Load checkpoint from storage agents. Do a basic comparision of windowIds
+   * to check the storage agent which has the checkpoint.
+   *
+   * @param operatorId Id for which the object was previously saved
+   * @param windowId WindowId for which the object was previously saved
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public Object load(int operatorId, long windowId) throws IOException
+  {
+    long[] oldWindowIds = getParentWindowIds(operatorId);
+    if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length
- 1]) {
+      return parent.load(operatorId, windowId);
+    }
+    return current.load(operatorId, windowId);
+  }
+
+  @Override
+  public long[] getWindowIds(int operatorId) throws IOException
+  {
+    long[] currentIds = current.getWindowIds(operatorId);
+    long[] oldWindowIds = getParentWindowIds(operatorId);
+    return merge(currentIds, oldWindowIds);
+  }
+
+  private static final long[] EMPTY_LONG_ARRAY = new long[0];
+  private long[] merge(long[] currentIds, long[] oldWindowIds)
+  {
+    if (currentIds == null && oldWindowIds == null) {
+      return EMPTY_LONG_ARRAY;
+    }
+    if (currentIds == null) {
+      return oldWindowIds;
+    }
+    if (oldWindowIds == null) {
+      return currentIds;
+    }
+    long[] mergedArray = new long[currentIds.length + oldWindowIds.length];
+    System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length);
+    System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length);
+    Arrays.sort(mergedArray);
+    return mergedArray;
+  }
+
+  @Override
+  public void finalize(int operatorId, long windowId) throws IOException
+  {
+    if (current instanceof AsyncStorageAgent) {
+      ((AsyncStorageAgent)current).finalize(operatorId, windowId);
+    }
+  }
+
+  @Override
+  public boolean isSyncCheckpoint()
+  {
+    if (parent instanceof AsyncStorageAgent) {
+      return ((AsyncStorageAgent)parent).isSyncCheckpoint();
+    }
+    return true;
+  }
+
+  public Object readResolve() throws ObjectStreamException
+  {
+    return new CascadeStorageAgent(parent, current);
+  }
+
+  public StorageAgent getCurrentStorageAgent()
+  {
+    return current;
+  }
+
+  public StorageAgent getParentStorageAgent()
+  {
+    return parent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
new file mode 100644
index 0000000..40f24f0
--- /dev/null
+++ b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.common.util.CascadeStorageAgent;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+
+public class CascadeStorageAgentTest
+{
+
+  static class TestMeta extends TestWatcher
+  {
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      try {
+        FileUtils.forceMkdir(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSingleIndirection() throws IOException
+  {
+    String oldAppPath = testMeta.applicationPath;
+    FSStorageAgent storageAgent = new FSStorageAgent(oldAppPath, null);
+    storageAgent.save("1", 1, 1);
+    storageAgent.save("2", 1, 2);
+    storageAgent.save("3", 2, 1);
+
+    String newAppPath = oldAppPath + ".new";
+    CascadeStorageAgent cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath,
null));
+    long[] operatorIds = cascade.getWindowIds(1);
+    Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L, 2L});
+
+    operatorIds = cascade.getWindowIds(2);
+    Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L});
+
+    /* save should happen to new location */
+    cascade.save("4", 1, 4);
+    FileContext fileContext = FileContext.getFileContext();
+    Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 4)));
+    Assert.assertTrue("operator 1 window 4 file exists in new directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 4)));
+
+    // check for delete,
+    // delete for old checkpoint should be ignored
+    cascade.save("5", 1, 5);
+    cascade.delete(1, 2L);
+    Assert.assertTrue("operator 1 window 2 file exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 2)));
+    cascade.delete(1, 4L);
+    Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 4)));
+
+    /* chaining of storage agent */
+    String latestAppPath = oldAppPath + ".latest";
+    cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null));
+    CascadeStorageAgent latest = new CascadeStorageAgent(cascade, new FSStorageAgent(latestAppPath,
null));
+    operatorIds = latest.getWindowIds(1);
+    Assert.assertArrayEquals("Window ids ", operatorIds, new long[] {1,2,5});
+
+    latest.save("6", 1, 6);
+    Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 6)));
+    Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 6)));
+    Assert.assertTrue("operator 1 window 6 file exists in new directory", fileContext.util().exists(new
Path(latestAppPath + "/" + 1 + "/" + 6)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index dad42e3..b280aad 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -257,6 +257,7 @@ public class StramClient
   public void copyInitialState(Path origAppDir) throws IOException
   {
     // locate previous snapshot
+    long copyStart = System.currentTimeMillis();
     String newAppDir = this.dag.assertAppPath();
 
     FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(origAppDir.toString(), conf);
@@ -284,6 +285,7 @@ public class StramClient
     logOs.close();
     logIs.close();
 
+    List<String> excludeDirs = Arrays.asList(LogicalPlan.SUBDIR_CHECKPOINTS, LogicalPlan.SUBDIR_EVENTS,
LogicalPlan.SUBDIR_STATS);
     // copy sub directories that are not present in target
     FileStatus[] lFiles = fs.listStatus(origAppDir);
 
@@ -298,19 +300,19 @@ public class StramClient
     String newAppDirPath = Path.getPathWithoutSchemeAndAuthority(new Path(newAppDir)).toString();
 
     for (FileStatus f : lFiles) {
-      if (f.isDirectory()) {
+      if (f.isDirectory() && !excludeDirs.contains(f.getPath().getName())) {
         String targetPath = f.getPath().toString().replace(origAppDirPath, newAppDirPath);
         if (!fs.exists(new Path(targetPath))) {
-          LOG.debug("Copying {} to {}", f.getPath(), targetPath);
+          LOG.debug("Copying {} size {} to {}", f.getPath(), f.getLen(), targetPath);
+          long start = System.currentTimeMillis();
           FileUtil.copy(fs, f.getPath(), fs, new Path(targetPath), false, conf);
-          //FSUtil.copy(fs, f, fs, new Path(targetPath), false, false, conf);
+          LOG.debug("Copying {} to {} took {} ms", f.getPath(), f.getLen(), targetPath, System.currentTimeMillis()
- start);
         } else {
           LOG.debug("Ignoring {} as it already exists under {}", f.getPath(), targetPath);
-          //FSUtil.setPermission(fs, new Path(targetPath), new FsPermission((short)0777));
         }
       }
     }
-
+    LOG.info("Copying initial state took {} ms", System.currentTimeMillis() - copyStart);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index c68df14..51e85f7 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,6 +65,7 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.CascadeStorageAgent;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
 import org.apache.commons.io.IOUtils;
@@ -3238,23 +3239,43 @@ public class StreamingContainerManager implements PlanContext
 
       this.finals = new FinalVars(finals, lp);
       StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT);
-      if (sa instanceof AsyncFSStorageAgent) {
-        // replace the default storage agent, if present
-        AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa;
-        if (fssa.path.contains(oldAppId)) {
-          fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
-          lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
-        }
-      } else if (sa instanceof FSStorageAgent) {
-        // replace the default storage agent, if present
-        FSStorageAgent fssa = (FSStorageAgent)sa;
-        if (fssa.path.contains(oldAppId)) {
-          fssa = new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
-          lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
-        }
+      lp.setAttribute(OperatorContext.STORAGE_AGENT, updateStorageAgent(sa, oldAppId, appId,
conf));
+    }
+  }
+
+  private static StorageAgent updateStorageAgent(StorageAgent sa, String oldAppId, String
appId, Configuration conf)
+  {
+    if (sa instanceof AsyncFSStorageAgent || sa instanceof FSStorageAgent) {
+      FSStorageAgent newAgent = (FSStorageAgent)updateFSStorageAgent(sa, oldAppId, appId,
conf);
+      if (newAgent != sa) {
+        return new CascadeStorageAgent(sa, newAgent);
       }
+    } else if (sa instanceof CascadeStorageAgent) {
+      CascadeStorageAgent csa = (CascadeStorageAgent)sa;
+      StorageAgent currentStorageAgent = csa.getCurrentStorageAgent();
+      return new CascadeStorageAgent(csa, updateFSStorageAgent(currentStorageAgent, oldAppId,
appId, conf));
     }
+    return sa;
+  }
 
+  /**
+   * Return updated FileSystem based storage agent. Storage agent is updated only when
+   * they use application directory to store the checkpoints.
+   */
+  private static StorageAgent updateFSStorageAgent(StorageAgent sa, String oldAppId, String
appId, Configuration conf)
+  {
+    if (sa instanceof AsyncFSStorageAgent) {
+      AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa;
+      if (fssa.path.contains(oldAppId)) {
+        return new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
+      }
+    } else if (sa instanceof FSStorageAgent) {
+      FSStorageAgent fssa = (FSStorageAgent)sa;
+      if (fssa.path.contains(oldAppId)) {
+        return new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
+      }
+    }
+    return sa;
   }
 
   public interface RecoveryHandler

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index d779afe..c84a249 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.AsyncStorageAgent;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Throwables;
@@ -70,7 +71,6 @@ import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StatsListener.OperatorRequest;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.Pair;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.OperatorDeployInfo;
@@ -519,16 +519,16 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
           checkpointStats = new Stats.CheckpointStats();
           checkpointStats.checkpointStartTime = System.currentTimeMillis();
           ba.save(operator, id, windowId);
-          if (ba instanceof AsyncFSStorageAgent) {
-            AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)ba;
-            if (!asyncFSStorageAgent.isSyncCheckpoint()) {
+          if (ba instanceof AsyncStorageAgent) {
+            AsyncStorageAgent asyncStorageAgent = (AsyncStorageAgent)ba;
+            if (!asyncStorageAgent.isSyncCheckpoint()) {
               if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
                 CheckpointWindowInfo checkpointWindowInfo = new CheckpointWindowInfo();
                 checkpointWindowInfo.windowId = windowId;
                 checkpointWindowInfo.applicationWindowCount = applicationWindowCount;
                 checkpointWindowInfo.checkpointWindowCount = checkpointWindowCount;
                 CheckpointHandler checkpointHandler = new CheckpointHandler();
-                checkpointHandler.agent = asyncFSStorageAgent;
+                checkpointHandler.agent = asyncStorageAgent;
                 checkpointHandler.operatorId = id;
                 checkpointHandler.windowId = windowId;
                 checkpointHandler.stats = checkpointStats;
@@ -539,7 +539,7 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
                 checkpointStats = null;
                 return;
               } else {
-                asyncFSStorageAgent.copyToHDFS(id, windowId);
+                asyncStorageAgent.finalize(id, windowId);
               }
             }
           }
@@ -680,8 +680,7 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
 
   private class CheckpointHandler implements Callable<Stats.CheckpointStats>
   {
-
-    public AsyncFSStorageAgent agent;
+    public AsyncStorageAgent agent;
     public int operatorId;
     public long windowId;
     public Stats.CheckpointStats stats;
@@ -689,7 +688,7 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
     @Override
     public Stats.CheckpointStats call() throws Exception
     {
-      agent.copyToHDFS(id, windowId);
+      agent.finalize(id, windowId);
       stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime;
       return stats;
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ce22bfd..7547654 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.AsyncStorageAgent;
 import org.apache.commons.lang.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -68,7 +69,6 @@ import com.datatorrent.api.StatsListener.OperatorRequest;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.Journal.Recoverable;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.StramEvent;
@@ -1226,11 +1226,8 @@ public class PhysicalPlan implements Serializable
       long windowId = oper.isOperatorStateLess() ? Stateless.WINDOW_ID : checkpoint.windowId;
       StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
       agent.save(oo, oper.id, windowId);
-      if (agent instanceof AsyncFSStorageAgent) {
-        AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)agent;
-        if (!asyncFSStorageAgent.isSyncCheckpoint()) {
-          asyncFSStorageAgent.copyToHDFS(oper.id, windowId);
-        }
+      if (agent instanceof AsyncStorageAgent) {
+        ((AsyncStorageAgent)agent).finalize(oper.id, windowId);
       }
     } catch (IOException e) {
       // inconsistent state, no recovery option, requires shutdown

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 645598d..2f46049 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -44,6 +44,7 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.CascadeStorageAgent;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
@@ -428,6 +429,7 @@ public class StramRecoveryTest
     o1p1.getContainer().setExternalId("cid1");
     scm.writeJournal(o1p1.getContainer().getSetContainerState());
 
+    /* simulate application restart from app1 */
     dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath2);
     dag.setAttribute(LogicalPlan.APPLICATION_ID, appId2);
@@ -447,9 +449,50 @@ public class StramRecoveryTest
     o1p1 = plan.getOperators(dag.getOperatorMeta("o1")).get(0);
     assertEquals("journal copied", "cid1", o1p1.getContainer().getExternalId());
 
-    ids = new FSStorageAgent(appPath2 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(o1p1.getId());
+    CascadeStorageAgent csa = (CascadeStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
+    Assert.assertEquals("storage agent is replaced by cascade", csa.getClass(), CascadeStorageAgent.class);
+    Assert.assertEquals("current storage agent is of same type", csa.getCurrentStorageAgent().getClass(),
agent.getClass());
+    Assert.assertEquals("parent storage agent is of same type ", csa.getParentStorageAgent().getClass(),
agent.getClass());
+    /* parent and current points to expected location */
+    Assert.assertEquals(true, ((FSStorageAgent)csa.getParentStorageAgent()).path.contains("app1"));
+    Assert.assertEquals(true, ((FSStorageAgent)csa.getCurrentStorageAgent()).path.contains("app2"));
+
+    ids = csa.getWindowIds(o1p1.getId());
     Assert.assertArrayEquals("checkpoints copied", new long[] {o1p1.getRecoveryCheckpoint().getWindowId()},
ids);
 
+
+    /* simulate another application restart from app2 */
+    String appId3 = "app3";
+    String appPath3 = testMeta.getPath() + "/" + appId3;
+    dag = new LogicalPlan();
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath3);
+    dag.setAttribute(LogicalPlan.APPLICATION_ID, appId3);
+    sc = new StramClient(new Configuration(), dag);
+    try {
+      sc.start();
+      sc.copyInitialState(new Path(appPath2)); // copy state from app2.
+    } finally {
+      sc.stop();
+    }
+    scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(),
new Configuration(false)), dag, false);
+    plan = scm.getPhysicalPlan();
+    dag = plan.getLogicalPlan();
+
+    csa = (CascadeStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
+    Assert.assertEquals("storage agent is replaced by cascade", csa.getClass(), CascadeStorageAgent.class);
+    Assert.assertEquals("current storage agent is of same type", csa.getCurrentStorageAgent().getClass(),
agent.getClass());
+    Assert.assertEquals("parent storage agent is of same type ", csa.getParentStorageAgent().getClass(),
CascadeStorageAgent.class);
+
+    CascadeStorageAgent parent = (CascadeStorageAgent)csa.getParentStorageAgent();
+    Assert.assertEquals("current storage agent is of same type ", parent.getCurrentStorageAgent().getClass(),
agent.getClass());
+    Assert.assertEquals("parent storage agent is cascade ", parent.getParentStorageAgent().getClass(),
agent.getClass());
+    /* verify paths */
+    Assert.assertEquals(true, ((FSStorageAgent)parent.getParentStorageAgent()).path.contains("app1"));
+    Assert.assertEquals(true, ((FSStorageAgent)parent.getCurrentStorageAgent()).path.contains("app2"));
+    Assert.assertEquals(true, ((FSStorageAgent)csa.getCurrentStorageAgent()).path.contains("app3"));
+
+    ids = csa.getWindowIds(o1p1.getId());
+    Assert.assertArrayEquals("checkpoints copied", new long[] {o1p1.getRecoveryCheckpoint().getWindowId()},
ids);
   }
 
   @Test


Mime
View raw message