apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject apex-core git commit: APEXCORE-709 Refactor code chagnes made through APEXCORE-575
Date Fri, 21 Apr 2017 14:10:15 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 25e4c4c51 -> 6cb3e3510


APEXCORE-709 Refactor code chagnes made through APEXCORE-575


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/6cb3e351
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/6cb3e351
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/6cb3e351

Branch: refs/heads/master
Commit: 6cb3e3510060e23f1519d2f91a629d8df38e4431
Parents: 25e4c4c
Author: Tushar R. Gosavi <tushar@apache.org>
Authored: Mon Apr 17 13:33:19 2017 +0530
Committer: Tushar R. Gosavi <tushar@apache.org>
Committed: Fri Apr 21 11:13:48 2017 +0530

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java        |   2 +-
 .../datatorrent/common/util/FSStorageAgent.java |  15 +-
 .../apex/common/util/AsyncStorageAgent.java     |   2 +-
 .../apex/common/util/CascadeStorageAgent.java   | 202 ------------------
 .../common/util/CascadeStorageAgentTest.java    | 116 -----------
 .../stram/StreamingContainerManager.java        |   2 +-
 .../java/com/datatorrent/stram/engine/Node.java |   4 +-
 .../stram/plan/physical/PhysicalPlan.java       |   2 +-
 .../apex/engine/util/CascadeStorageAgent.java   | 208 +++++++++++++++++++
 .../datatorrent/stram/StramRecoveryTest.java    |   2 +-
 .../stram/StreamingContainerManagerTest.java    |  20 --
 .../engine/util/CascadeStorageAgentTest.java    | 116 +++++++++++
 12 files changed, 337 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 0c389a4..c8275ea 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -147,7 +147,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent implements AsyncStorageA
   }
 
   @Override
-  public void finalize(int operatorId, long windowId) throws IOException
+  public void flush(int operatorId, long windowId) throws IOException
   {
     // Checkpoint already present in HDFS during save, when syncCheckpoint is true.
     if (isSyncCheckpoint()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index b5a43fe..4b71761 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
@@ -157,27 +158,23 @@ public class FSStorageAgent implements StorageAgent, Serializable
     try {
       FileStatus status = fileContext.getFileStatus(lPath);
       if (!status.isDirectory()) {
-        throw new IOException("Checkpoint location is not a directory ");
+        throw new RuntimeException("Checkpoint location is not a directory");
       }
     } catch (FileNotFoundException ex) {
-      // During initialization this directory may not exists.
-      // return an empty array.
-      return new long[0];
+      // During initialization checkpoint directory may not exists.
+      fileContext.mkdir(lPath, FsPermission.getDirDefault(), true);
     }
 
     RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
-    if (!fileStatusRemoteIterator.hasNext()) {
-      throw new IOException("Storage Agent has not saved anything yet!");
-    }
     List<Long> lwindows = new ArrayList<>();
-    do {
+    while (fileStatusRemoteIterator.hasNext()) {
       FileStatus fileStatus = fileStatusRemoteIterator.next();
       String name = fileStatus.getPath().getName();
       if (name.equals(TMP_FILE)) {
         continue;
       }
       lwindows.add(STATELESS_CHECKPOINT_WINDOW_ID.equals(name) ? Stateless.WINDOW_ID : Long.parseLong(name,
16));
-    } while (fileStatusRemoteIterator.hasNext());
+    }
     long[] windowIds = new long[lwindows.size()];
     for (int i = 0; i < windowIds.length; i++) {
       windowIds[i] = lwindows.get(i);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
index 337ccdd..f797b92 100644
--- a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
+++ b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
@@ -41,7 +41,7 @@ public interface AsyncStorageAgent extends StorageAgent
    * @param windowId
    * @throws IOException
    */
-  void finalize(int operatorId, long windowId) throws IOException;
+  void flush(int operatorId, long windowId) throws IOException;
 
   /**
    * Check if StorageAgent is configured to take synchronous checkpoints.

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
deleted file mode 100644
index d6fec8e..0000000
--- a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.common.util;
-
-import java.io.IOException;
-import java.io.ObjectStreamException;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.StorageAgent;
-
-/**
- * A StorageAgent which chains two StorageAgent. It use the current storage-agent to store
- * the checkpoint, and use the parent agent to read old checkpoints. For application having
- * large number of physical operators, the size and number of files to be copied could be
- * large impacting application restart time. This storage-agent is used during application
- * restart to avoiding copying checkpoints from old application directory to improve application
- * restart time.
- */
-public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable
-{
-  private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class);
-  private final StorageAgent parent;
-  private final StorageAgent current;
-  private final transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
-
-  public CascadeStorageAgent(StorageAgent parent, StorageAgent current)
-  {
-    this.parent = parent;
-    this.current = current;
-    oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
-  }
-
-  /**
-   * does the checkpoint belong to parent
-   */
-  private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException
-  {
-    long[] wids = getParentWindowIds(operatorId);
-    if (wids.length != 0) {
-      return (wid <= wids[wids.length - 1]);
-    }
-    return false;
-  }
-
-  /**
-   * Return window-id of checkpoints available in old storage agent. This function
-   * will call getWindowIds of old storage agent only once for the fist time, and
-   * return cached data for next calls for same operator.
-   *
-   * @param operatorId
-   * @return
-   * @throws IOException
-   */
-  private long[] getParentWindowIds(int operatorId) throws IOException
-  {
-    long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId);
-    if (oldWindowIds == null) {
-      oldWindowIds = parent.getWindowIds(operatorId);
-      if (oldWindowIds == null) {
-        oldWindowIds = new long[0];
-      }
-      Arrays.sort(oldWindowIds);
-      oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds);
-      logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}",
operatorId, Arrays.toString(oldWindowIds));
-    }
-    return oldWindowIds;
-  }
-
-  /**
-   * Save object in current storage agent. This should not modify old storage agent
-   * in any way.
-   *
-   * @param object - The operator whose state needs to be saved.
-   * @param operatorId - Identifier of the operator.
-   * @param windowId - Identifier for the specific state of the operator.
-   * @throws IOException
-   */
-  @Override
-  public void save(Object object, int operatorId, long windowId) throws IOException
-  {
-    current.save(object, operatorId, windowId);
-  }
-
-  /**
-   * Delete old checkpoints from the storage agent.
-   *
-   * The checkpoints are deleted from current directory if it is present in current
-   * storage agent. and cached state for old storage agent is removed.
-   *
-   * @param operatorId
-   * @param windowId
-   * @throws IOException
-   */
-  @Override
-  public void delete(int operatorId, long windowId) throws IOException
-  {
-    if (!isCheckpointFromParent(operatorId, windowId)) {
-      current.delete(operatorId, windowId);
-    }
-  }
-
-  /**
-   * Load checkpoint from storage agents. Do a basic comparision of windowIds
-   * to check the storage agent which has the checkpoint.
-   *
-   * @param operatorId Id for which the object was previously saved
-   * @param windowId WindowId for which the object was previously saved
-   * @return
-   * @throws IOException
-   */
-  @Override
-  public Object load(int operatorId, long windowId) throws IOException
-  {
-    long[] oldWindowIds = getParentWindowIds(operatorId);
-    if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length
- 1]) {
-      return parent.load(operatorId, windowId);
-    }
-    return current.load(operatorId, windowId);
-  }
-
-  @Override
-  public long[] getWindowIds(int operatorId) throws IOException
-  {
-    long[] currentIds = current.getWindowIds(operatorId);
-    long[] oldWindowIds = getParentWindowIds(operatorId);
-    return merge(currentIds, oldWindowIds);
-  }
-
-  private static final long[] EMPTY_LONG_ARRAY = new long[0];
-  private long[] merge(long[] currentIds, long[] oldWindowIds)
-  {
-    if (currentIds == null && oldWindowIds == null) {
-      return EMPTY_LONG_ARRAY;
-    }
-    if (currentIds == null) {
-      return oldWindowIds;
-    }
-    if (oldWindowIds == null) {
-      return currentIds;
-    }
-    long[] mergedArray = new long[currentIds.length + oldWindowIds.length];
-    System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length);
-    System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length);
-    Arrays.sort(mergedArray);
-    return mergedArray;
-  }
-
-  @Override
-  public void finalize(int operatorId, long windowId) throws IOException
-  {
-    if (current instanceof AsyncStorageAgent) {
-      ((AsyncStorageAgent)current).finalize(operatorId, windowId);
-    }
-  }
-
-  @Override
-  public boolean isSyncCheckpoint()
-  {
-    if (parent instanceof AsyncStorageAgent) {
-      return ((AsyncStorageAgent)parent).isSyncCheckpoint();
-    }
-    return true;
-  }
-
-  public Object readResolve() throws ObjectStreamException
-  {
-    return new CascadeStorageAgent(parent, current);
-  }
-
-  public StorageAgent getCurrentStorageAgent()
-  {
-    return current;
-  }
-
-  public StorageAgent getParentStorageAgent()
-  {
-    return parent;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
deleted file mode 100644
index 40f24f0..0000000
--- a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.common.util;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.apex.common.util.CascadeStorageAgent;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.DAG;
-
-public class CascadeStorageAgentTest
-{
-
-  static class TestMeta extends TestWatcher
-  {
-    String applicationPath;
-
-    @Override
-    protected void starting(Description description)
-    {
-      super.starting(description);
-      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
-      try {
-        FileUtils.forceMkdir(new File("target/" + description.getClassName()));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
-      attributes.put(DAG.APPLICATION_PATH, applicationPath);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      try {
-        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  @Rule
-  public TestMeta testMeta = new TestMeta();
-
-  @Test
-  public void testSingleIndirection() throws IOException
-  {
-    String oldAppPath = testMeta.applicationPath;
-    FSStorageAgent storageAgent = new FSStorageAgent(oldAppPath, null);
-    storageAgent.save("1", 1, 1);
-    storageAgent.save("2", 1, 2);
-    storageAgent.save("3", 2, 1);
-
-    String newAppPath = oldAppPath + ".new";
-    CascadeStorageAgent cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath,
null));
-    long[] operatorIds = cascade.getWindowIds(1);
-    Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L, 2L});
-
-    operatorIds = cascade.getWindowIds(2);
-    Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L});
-
-    /* save should happen to new location */
-    cascade.save("4", 1, 4);
-    FileContext fileContext = FileContext.getFileContext();
-    Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 4)));
-    Assert.assertTrue("operator 1 window 4 file exists in new directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 4)));
-
-    // check for delete,
-    // delete for old checkpoint should be ignored
-    cascade.save("5", 1, 5);
-    cascade.delete(1, 2L);
-    Assert.assertTrue("operator 1 window 2 file exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 2)));
-    cascade.delete(1, 4L);
-    Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 4)));
-
-    /* chaining of storage agent */
-    String latestAppPath = oldAppPath + ".latest";
-    cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null));
-    CascadeStorageAgent latest = new CascadeStorageAgent(cascade, new FSStorageAgent(latestAppPath,
null));
-    operatorIds = latest.getWindowIds(1);
-    Assert.assertArrayEquals("Window ids ", operatorIds, new long[] {1,2,5});
-
-    latest.save("6", 1, 6);
-    Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 6)));
-    Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 6)));
-    Assert.assertTrue("operator 1 window 6 file exists in new directory", fileContext.util().exists(new
Path(latestAppPath + "/" + 1 + "/" + 6)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 92fce54..ca2bd88 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,9 +65,9 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.common.util.CascadeStorageAgent;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
+import org.apache.apex.engine.util.CascadeStorageAgent;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ToStringBuilder;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index c84a249..88b002f 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -539,7 +539,7 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
                 checkpointStats = null;
                 return;
               } else {
-                asyncStorageAgent.finalize(id, windowId);
+                asyncStorageAgent.flush(id, windowId);
               }
             }
           }
@@ -688,7 +688,7 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
     @Override
     public Stats.CheckpointStats call() throws Exception
     {
-      agent.finalize(id, windowId);
+      agent.flush(id, windowId);
       stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime;
       return stats;
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index f4e2100..ecc010c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1227,7 +1227,7 @@ public class PhysicalPlan implements Serializable
       StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
       agent.save(oo, oper.id, windowId);
       if (agent instanceof AsyncStorageAgent) {
-        ((AsyncStorageAgent)agent).finalize(oper.id, windowId);
+        ((AsyncStorageAgent)agent).flush(oper.id, windowId);
       }
     } catch (IOException e) {
       // inconsistent state, no recovery option, requires shutdown

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java b/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
new file mode 100644
index 0000000..9903010
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.util;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.common.util.AsyncStorageAgent;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.StorageAgent;
+
+/**
+ * A StorageAgent which chains two StorageAgent. It use the current storage-agent to store
+ * the checkpoint, and use the parent agent to read old checkpoints. For application having
+ * large number of physical operators, the size and number of files to be copied could be
+ * large impacting application restart time. This storage-agent is used during application
+ * restart to avoiding copying checkpoints from old application directory to improve application
+ * restart time.
+ */
+@InterfaceStability.Evolving
+public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable
+{
+  private static final long serialVersionUID = 985557590735264920L;
+  private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class);
+  private final StorageAgent parent;
+  private final StorageAgent current;
+  private transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
+
+  public CascadeStorageAgent(StorageAgent parent, StorageAgent current)
+  {
+    this.parent = parent;
+    this.current = current;
+    oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
+  }
+
+  /**
+   * does the checkpoint belong to parent
+   */
+  private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException
+  {
+    long[] wids = getParentWindowIds(operatorId);
+    if (wids.length != 0) {
+      return (wid <= wids[wids.length - 1]);
+    }
+    return false;
+  }
+
+  /**
+   * Return window-id of checkpoints available in old storage agent. This function
+   * will call getWindowIds of old storage agent only once for the fist time, and
+   * return cached data for next calls for same operator.
+   *
+   * @param operatorId
+   * @return
+   * @throws IOException
+   */
+  private long[] getParentWindowIds(int operatorId) throws IOException
+  {
+    long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId);
+    if (oldWindowIds == null) {
+      oldWindowIds = parent.getWindowIds(operatorId);
+      if (oldWindowIds == null) {
+        oldWindowIds = new long[0];
+      }
+      Arrays.sort(oldWindowIds);
+      oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds);
+      logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}",
operatorId, Arrays.toString(oldWindowIds));
+    }
+    return oldWindowIds;
+  }
+
+  /**
+   * Save object in current storage agent. This should not modify old storage agent
+   * in any way.
+   *
+   * @param object - The operator whose state needs to be saved.
+   * @param operatorId - Identifier of the operator.
+   * @param windowId - Identifier for the specific state of the operator.
+   * @throws IOException
+   */
+  @Override
+  public void save(Object object, int operatorId, long windowId) throws IOException
+  {
+    current.save(object, operatorId, windowId);
+  }
+
+  /**
+   * Delete old checkpoints from the storage agent.
+   *
+   * The checkpoints are deleted from current directory if it is present in current
+   * storage agent. and cached state for old storage agent is removed.
+   *
+   * @param operatorId
+   * @param windowId
+   * @throws IOException
+   */
+  @Override
+  public void delete(int operatorId, long windowId) throws IOException
+  {
+    if (!isCheckpointFromParent(operatorId, windowId)) {
+      current.delete(operatorId, windowId);
+    }
+  }
+
+  /**
+   * Load checkpoint from storage agents. Do a basic comparision of windowIds
+   * to check the storage agent which has the checkpoint.
+   *
+   * @param operatorId Id for which the object was previously saved
+   * @param windowId WindowId for which the object was previously saved
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public Object load(int operatorId, long windowId) throws IOException
+  {
+    long[] oldWindowIds = getParentWindowIds(operatorId);
+    if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length
- 1]) {
+      return parent.load(operatorId, windowId);
+    }
+    return current.load(operatorId, windowId);
+  }
+
+  @Override
+  public long[] getWindowIds(int operatorId) throws IOException
+  {
+    long[] currentIds = current.getWindowIds(operatorId);
+    long[] oldWindowIds = getParentWindowIds(operatorId);
+    return merge(currentIds, oldWindowIds);
+  }
+
+  private static final long[] EMPTY_LONG_ARRAY = new long[0];
+  private long[] merge(long[] currentIds, long[] oldWindowIds)
+  {
+    if (currentIds == null && oldWindowIds == null) {
+      return EMPTY_LONG_ARRAY;
+    }
+    if (currentIds == null) {
+      return oldWindowIds;
+    }
+    if (oldWindowIds == null) {
+      return currentIds;
+    }
+    long[] mergedArray = new long[currentIds.length + oldWindowIds.length];
+    System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length);
+    System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length);
+    Arrays.sort(mergedArray);
+    return mergedArray;
+  }
+
+  @Override
+  public void flush(int operatorId, long windowId) throws IOException
+  {
+    if (current instanceof AsyncStorageAgent) {
+      ((AsyncStorageAgent)current).flush(operatorId, windowId);
+    }
+  }
+
+  @Override
+  public boolean isSyncCheckpoint()
+  {
+    if (parent instanceof AsyncStorageAgent) {
+      return ((AsyncStorageAgent)parent).isSyncCheckpoint();
+    }
+    return true;
+  }
+
+  private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
+  {
+    input.defaultReadObject();
+    oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
+  }
+
+  public StorageAgent getCurrentStorageAgent()
+  {
+    return current;
+  }
+
+  public StorageAgent getParentStorageAgent()
+  {
+    return parent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 2f46049..177e3fa 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -44,7 +44,7 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.common.util.CascadeStorageAgent;
+import org.apache.apex.engine.util.CascadeStorageAgent;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index cb2d760..53f18f9 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -468,16 +468,6 @@ public class StreamingContainerManagerTest
     long[] windowsIds = sa.getWindowIds(1);
     Arrays.sort(windowsIds);
     Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
-
-    for (long windowId : windowIds) {
-      sa.delete(1, windowId);
-    }
-    try {
-      sa.getWindowIds(1);
-      Assert.fail("There should not be any most recently saved windowId!");
-    } catch (IOException io) {
-      Assert.assertTrue("No State Saved", true);
-    }
   }
 
   @Test
@@ -495,16 +485,6 @@ public class StreamingContainerManagerTest
     long[] windowsIds = sa.getWindowIds(1);
     Arrays.sort(windowsIds);
     Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
-
-    for (long windowId : windowIds) {
-      sa.delete(1, windowId);
-    }
-    try {
-      sa.getWindowIds(1);
-      Assert.fail("There should not be any most recently saved windowId!");
-    } catch (IOException io) {
-      Assert.assertTrue("No State Saved", true);
-    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
b/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
new file mode 100644
index 0000000..43b5636
--- /dev/null
+++ b/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.util;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+import com.datatorrent.common.util.FSStorageAgent;
+
+public class CascadeStorageAgentTest
+{
+
+  static class TestMeta extends TestWatcher
+  {
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      try {
+        FileUtils.forceMkdir(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSingleIndirection() throws IOException
+  {
+    String oldAppPath = testMeta.applicationPath;
+    FSStorageAgent storageAgent = new FSStorageAgent(oldAppPath, null);
+    storageAgent.save("1", 1, 1);
+    storageAgent.save("2", 1, 2);
+    storageAgent.save("3", 2, 1);
+
+    String newAppPath = oldAppPath + ".new";
+    CascadeStorageAgent cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath,
null));
+    long[] operatorIds = cascade.getWindowIds(1);
+    Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L, 2L});
+
+    operatorIds = cascade.getWindowIds(2);
+    Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L});
+
+    /* save should happen to new location */
+    cascade.save("4", 1, 4);
+    FileContext fileContext = FileContext.getFileContext();
+    Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 4)));
+    Assert.assertTrue("operator 1 window 4 file exists in new directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 4)));
+
+    // check for delete,
+    // delete for old checkpoint should be ignored
+    cascade.save("5", 1, 5);
+    cascade.delete(1, 2L);
+    Assert.assertTrue("operator 1 window 2 file exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 2)));
+    cascade.delete(1, 4L);
+    Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 4)));
+
+    /* chaining of storage agent */
+    String latestAppPath = oldAppPath + ".latest";
+    cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null));
+    CascadeStorageAgent latest = new CascadeStorageAgent(cascade, new FSStorageAgent(latestAppPath,
null));
+    operatorIds = latest.getWindowIds(1);
+    Assert.assertArrayEquals("Window ids ", operatorIds, new long[] {1,2,5});
+
+    latest.save("6", 1, 6);
+    Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new
Path(oldAppPath + "/" + 1 + "/" + 6)));
+    Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new
Path(newAppPath + "/" + 1 + "/" + 6)));
+    Assert.assertTrue("operator 1 window 6 file exists in new directory", fileContext.util().exists(new
Path(latestAppPath + "/" + 1 + "/" + 6)));
+  }
+}


Mime
View raw message