apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-malhar git commit: APEXMALHAR-2064 Move WindowDataManager to org.apache.apex.malhar.lib.wal
Date Fri, 22 Apr 2016 23:19:12 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 26237a2fb -> 4ef0700ad


APEXMALHAR-2064 Move WindowDataManager to org.apache.apex.malhar.lib.wal


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4ef0700a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4ef0700a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4ef0700a

Branch: refs/heads/master
Commit: 4ef0700ad83afd423cdfdbd0c84b7daeb27e827f
Parents: 26237a2
Author: MalharJenkins <jenkins@datatorrent.com>
Authored: Fri Apr 22 18:28:26 2016 +0530
Committer: Chandni Singh <csingh@apache.org>
Committed: Fri Apr 22 15:30:32 2016 -0700

----------------------------------------------------------------------
 apps/logstream/pom.xml                          |   3 +-
 benchmark/pom.xml                               |   3 +-
 .../contrib/nifi/AbstractNiFiInputOperator.java |   2 +-
 .../nifi/AbstractNiFiOutputOperator.java        |   2 +-
 .../AbstractNiFiSinglePortInputOperator.java    |   2 +-
 .../nifi/NiFiSinglePortInputOperator.java       |   3 +-
 .../nifi/NiFiSinglePortOutputOperator.java      |   2 +-
 .../nifi/NiFiSinglePortInputOperatorTest.java   |   3 +-
 .../nifi/NiFiSinglePortOutputOperatorTest.java  |   3 +-
 .../nifi/demo/TestNiFiInputApplication.java     |   2 +-
 .../nifi/demo/TestNiFiOutputApplication.java    |   2 +-
 .../kafka/AbstractKafkaInputOperator.java       |   2 +-
 .../malhar/kafka/KafkaInputOperatorTest.java    |   2 +-
 .../lib/io/IdempotentStorageManager.java        |   9 +-
 .../datatorrent/lib/util/WindowDataManager.java | 440 -------------------
 .../managed/IncrementalCheckpointManager.java   |   3 +-
 .../apex/malhar/lib/wal/WindowDataManager.java  | 440 +++++++++++++++++++
 .../lib/util/FSWindowDataManagerTest.java       | 216 ---------
 .../malhar/lib/wal/FSWindowDataManagerTest.java | 217 +++++++++
 19 files changed, 681 insertions(+), 675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index dbd3e58..6020c30 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -35,6 +35,7 @@
     <maven.deploy.skip>true</maven.deploy.skip>
     <skipTests>false</skipTests>
     <semver.plugin.skip>true</semver.plugin.skip>
+    <checkstyle.console>false</checkstyle.console>
   </properties>
 
   <name>Apache Apex Malhar (incubating) Logstream Application</name>
@@ -71,7 +72,7 @@
          <artifactId>maven-checkstyle-plugin</artifactId>
          <configuration>
            <maxAllowedViolations>104</maxAllowedViolations>
-           <consoleOutput>false</consoleOutput>
+           <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
          </configuration>
        </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index 47d6db4..f94f258 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -40,6 +40,7 @@
     <maven.deploy.skip>true</maven.deploy.skip>
     <skipTests>true</skipTests>
     <semver.plugin.skip>true</semver.plugin.skip>
+    <checkstyle.console>false</checkstyle.console>
   </properties>
 
   <build>
@@ -147,7 +148,7 @@
          <artifactId>maven-checkstyle-plugin</artifactId>
          <configuration>
            <maxAllowedViolations>281</maxAllowedViolations>
-           <consoleOutput>false</consoleOutput>
+           <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
          </configuration>
        </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
index d0130f6..c1ac4a8 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
@@ -32,7 +33,6 @@ import org.apache.nifi.remote.protocol.DataPacket;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.util.WindowDataManager;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
index e4e61fb..29e02bf 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
@@ -24,13 +24,13 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.util.WindowDataManager;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
index d874be0..22ae063 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
@@ -20,10 +20,10 @@ package com.datatorrent.contrib.nifi;
 
 import java.util.List;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.WindowDataManager;
 
 /**
  * This is the base implementation of a NiFi input operator with a single output port.&nbsp;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
index f80386d..60696e7 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
@@ -22,12 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.stream.io.StreamUtils;
 
-import com.datatorrent.lib.util.WindowDataManager;
-
 /**
  * Input adapter operator which consumes data from NiFi and produces NiFiDataPackets
  * where each NiFiDataPacket contains a byte array of content and a Map of attributes.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
index 2692034..db417e8 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
@@ -21,10 +21,10 @@ package com.datatorrent.contrib.nifi;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 
 import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.lib.util.WindowDataManager;
 
 /**
  * NiFi output adapter operator with a single input port. Clients should provide a NiFiDataPacketBuilder implementation

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
index a25497b..f36f1f2 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
@@ -33,7 +33,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.util.file.FileUtils;
@@ -45,7 +44,7 @@ import com.datatorrent.contrib.nifi.mock.MockDataPacket;
 import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.WindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 
 public class NiFiSinglePortInputOperatorTest
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
index 5b58ae0..e8aa982 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
@@ -30,6 +30,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.util.file.FileUtils;
@@ -40,8 +41,6 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient;
 import com.datatorrent.contrib.nifi.mock.MockTransaction;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.WindowDataManager;
 
 public class NiFiSinglePortOutputOperatorTest
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
index ebe8a0c..906fa31 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.contrib.nifi.demo;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
@@ -27,7 +28,6 @@ import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.contrib.nifi.NiFiSinglePortInputOperator;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.util.WindowDataManager;
 
 /**
  * A sample application that shows how to receive data to a NiFi Output Port.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
index f5399e7..046c25e 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
@@ -33,7 +33,7 @@ import com.datatorrent.contrib.nifi.NiFiDataPacketBuilder;
 import com.datatorrent.contrib.nifi.NiFiSinglePortOutputOperator;
 import com.datatorrent.contrib.nifi.StandardNiFiDataPacket;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import com.datatorrent.lib.util.WindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 
 /**
  * A sample application that shows how to send data to a NiFi Input Port.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 06cd470..3e709eb 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -34,6 +34,7 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -52,7 +53,6 @@ import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.StatsListener;
-import com.datatorrent.lib.util.WindowDataManager;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 9c5d5dc..ede7f38 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.io.FileUtils;
 
 import com.datatorrent.api.Context;
@@ -48,7 +49,6 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.util.WindowDataManager;
 import com.datatorrent.stram.StramLocalCluster;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
index 4eac924..65bda89 100644
--- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
+++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
@@ -19,10 +19,15 @@
 package com.datatorrent.lib.io;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -52,7 +57,7 @@ import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
  * application window boundaries.
  *
  * @since 2.0.0
- * @deprecated use {@link com.datatorrent.lib.util.WindowDataManager}
+ * @deprecated use {@link WindowDataManager}
  */
 @Deprecated
 public interface IdempotentStorageManager extends StorageAgent, Component<Context.OperatorContext>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
deleted file mode 100644
index 9930d7e..0000000
--- a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.util;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultimap;
-
-import com.datatorrent.api.Component;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StorageAgent;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.common.util.FSStorageAgent;
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
-
-/**
- * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window.
- * An idempotent agent cannot make any guarantees about the tuples emitted in the application window which fails.
- *
- * The order of tuples is guaranteed for ordered input sources.
- *
- * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow
- * checkpoints to occur within an application window and checkpoints must be aligned with
- * application window boundaries.
- *
- * @since 2.0.0
- */
-public interface WindowDataManager extends StorageAgent, Component<Context.OperatorContext>
-{
-  /**
-   * Gets the largest window for which there is recovery data.
-   * @return Returns the window id
-   */
-  long getLargestRecoveryWindow();
-
-  /**
-   * When an operator can partition itself dynamically then there is no guarantee that an input state which was being
-   * handled by one instance previously will be handled by the same instance after partitioning. <br/>
-   * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no
-   * longer be the instance that handles file X after repartitioning as no. of instances may have changed and file X
-   * is re-hashed to another instance. <br/>
-   * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the
-   * operators for the window being replayed and fix it's state.
-   *
-   * @param windowId window id.
-   * @return mapping of operator id to the corresponding state
-   * @throws IOException
-   */
-  Map<Integer, Object> load(long windowId) throws IOException;
-
-  /**
-   * Delete the artifacts of the operator for windows <= windowId.
-   *
-   * @param operatorId operator id
-   * @param windowId   window id
-   * @throws IOException
-   */
-  void deleteUpTo(int operatorId, long windowId) throws IOException;
-
-  /**
-   * This informs the idempotent storage manager that operator is partitioned so that it can set properties and
-   * distribute state.
-   *
-   * @param newManagers        all the new idempotent storage managers.
-   * @param removedOperatorIds set of operator ids which were removed after partitioning.
-   */
-  void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds);
-
-  /**
-   * An {@link WindowDataManager} that uses FS to persist state.
-   */
-  class FSWindowDataManager implements WindowDataManager
-  {
-    private static final String DEF_RECOVERY_PATH = "idempotentState";
-
-    protected transient FSStorageAgent storageAgent;
-
-    /**
-     * Recovery path relative to app path where state is saved.
-     */
-    @NotNull
-    private String recoveryPath;
-
-    private boolean isRecoveryPathRelativeToAppPath = true;
-
-    /**
-     * largest window for which there is recovery data across all physical operator instances.
-     */
-    protected transient long largestRecoveryWindow;
-
-    /**
-     * This is not null only for one physical instance.<br/>
-     * It consists of operator ids which have been deleted but have some state that can be replayed.
-     * Only one of the instances would be handling (modifying) the files that belong to this state.
-     */
-    protected Set<Integer> deletedOperators;
-
-    /**
-     * Sorted mapping from window id to all the operators that have state to replay for that window.
-     */
-    protected final transient TreeMultimap<Long, Integer> replayState;
-
-    protected transient FileSystem fs;
-    protected transient Path appPath;
-
-    public FSWindowDataManager()
-    {
-      replayState = TreeMultimap.create();
-      largestRecoveryWindow = Stateless.WINDOW_ID;
-      recoveryPath = DEF_RECOVERY_PATH;
-    }
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      Configuration configuration = new Configuration();
-      if (isRecoveryPathRelativeToAppPath) {
-        appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
-      } else {
-        appPath = new Path(recoveryPath);
-      }
-
-      try {
-        storageAgent = new FSStorageAgent(appPath.toString(), configuration);
-
-        fs = FileSystem.newInstance(appPath.toUri(), configuration);
-
-        if (fs.exists(appPath)) {
-          FileStatus[] fileStatuses = fs.listStatus(appPath);
-
-          for (FileStatus operatorDirStatus : fileStatuses) {
-            int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
-
-            for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
-              String fileName = status.getPath().getName();
-              if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
-                continue;
-              }
-              long windowId = Long.parseLong(fileName, 16);
-              replayState.put(windowId, operatorId);
-              if (windowId > largestRecoveryWindow) {
-                largestRecoveryWindow = windowId;
-              }
-            }
-          }
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void save(Object object, int operatorId, long windowId) throws IOException
-    {
-      storageAgent.save(object, operatorId, windowId);
-    }
-
-    @Override
-    public Object load(int operatorId, long windowId) throws IOException
-    {
-      Set<Integer> operators = replayState.get(windowId);
-      if (operators == null || !operators.contains(operatorId)) {
-        return null;
-      }
-      return storageAgent.load(operatorId, windowId);
-    }
-
-    @Override
-    public void delete(int operatorId, long windowId) throws IOException
-    {
-      storageAgent.delete(operatorId, windowId);
-    }
-
-    @Override
-    public Map<Integer, Object> load(long windowId) throws IOException
-    {
-      Set<Integer> operators = replayState.get(windowId);
-      if (operators == null) {
-        return null;
-      }
-      Map<Integer, Object> data = Maps.newHashMap();
-      for (int operatorId : operators) {
-        data.put(operatorId, load(operatorId, windowId));
-      }
-      return data;
-    }
-
-    @Override
-    public long[] getWindowIds(int operatorId) throws IOException
-    {
-      Path operatorPath = new Path(appPath, String.valueOf(operatorId));
-      if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) {
-        return null;
-      }
-      return storageAgent.getWindowIds(operatorId);
-    }
-
-    /**
-     * This deletes all the recovery files of window ids <= windowId.
-     *
-     * @param operatorId operator id.
-     * @param windowId   the largest window id for which the states will be deleted.
-     * @throws IOException
-     */
-    @Override
-    public void deleteUpTo(int operatorId, long windowId) throws IOException
-    {
-      //deleting the replay state
-      if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
-        Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
-        while (iterator.hasNext()) {
-          Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
-          long lwindow = windowEntry.getKey();
-          if (lwindow > windowId) {
-            break;
-          }
-          for (Integer loperator : windowEntry.getValue()) {
-
-            if (deletedOperators.contains(loperator)) {
-              storageAgent.delete(loperator, lwindow);
-
-              Path loperatorPath = new Path(appPath, Integer.toString(loperator));
-              if (fs.listStatus(loperatorPath).length == 0) {
-                //The operator was deleted and it has nothing to replay.
-                deletedOperators.remove(loperator);
-                fs.delete(loperatorPath, true);
-              }
-            } else if (loperator == operatorId) {
-              storageAgent.delete(loperator, lwindow);
-            }
-          }
-          iterator.remove();
-        }
-      }
-
-      if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) {
-        long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId);
-        Arrays.sort(windowsAfterReplay);
-        for (long lwindow : windowsAfterReplay) {
-          if (lwindow <= windowId) {
-            storageAgent.delete(operatorId, lwindow);
-          }
-        }
-      }
-    }
-
-    @Override
-    public long getLargestRecoveryWindow()
-    {
-      return largestRecoveryWindow;
-    }
-
-    @Override
-    public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
-    {
-      Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(),
-          "there has to be one idempotent storage manager");
-      FSWindowDataManager deletedOperatorsManager = null;
-
-      if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) {
-        if (this.deletedOperators == null) {
-          this.deletedOperators = Sets.newHashSet();
-        }
-        this.deletedOperators.addAll(removedOperatorIds);
-      }
-
-      for (WindowDataManager storageManager : newManagers) {
-
-        FSWindowDataManager lmanager = (FSWindowDataManager)storageManager;
-        lmanager.recoveryPath = this.recoveryPath;
-        lmanager.storageAgent = this.storageAgent;
-
-        if (lmanager.deletedOperators != null) {
-          deletedOperatorsManager = lmanager;
-        }
-        //only one physical instance can manage deleted operators so clearing this field for rest of the instances.
-        if (lmanager != deletedOperatorsManager) {
-          lmanager.deletedOperators = null;
-        }
-      }
-
-      if (removedOperatorIds == null || removedOperatorIds.isEmpty()) {
-        //Nothing to do
-        return;
-      }
-      if (this.deletedOperators != null) {
-
-        /*If some operators were removed then there needs to be a manager which can clean there state when it is not
-        needed.*/
-        if (deletedOperatorsManager == null) {
-          //None of the managers were handling deleted operators data.
-          deletedOperatorsManager = (FSWindowDataManager)newManagers.iterator().next();
-          deletedOperatorsManager.deletedOperators = Sets.newHashSet();
-        }
-
-        deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds);
-      }
-    }
-
-    @Override
-    public void teardown()
-    {
-      try {
-        fs.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    /**
-     * @return recovery path
-     */
-    public String getRecoveryPath()
-    {
-      return recoveryPath;
-    }
-
-    /**
-     * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative
-     * to the application path; otherwise it is handled as an absolute path.
-     *
-     * @param recoveryPath recovery path
-     */
-    public void setRecoveryPath(String recoveryPath)
-    {
-      this.recoveryPath = recoveryPath;
-    }
-
-    /**
-     * @return true if recovery path is relative to app path; false otherwise.
-     */
-    public boolean isRecoveryPathRelativeToAppPath()
-    {
-      return isRecoveryPathRelativeToAppPath;
-    }
-
-    /**
-     * Specifies whether the recovery path is relative to application path.
-     *
-     * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise.
-     */
-    public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath)
-    {
-      isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath;
-    }
-  }
-
-  /**
-   * This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators
-   * can use the same logic for maintaining idempotency and avoiding idempotency.
-   */
-  class NoopWindowDataManager implements WindowDataManager
-  {
-    @Override
-    public long getLargestRecoveryWindow()
-    {
-      return Stateless.WINDOW_ID;
-    }
-
-    @Override
-    public Map<Integer, Object> load(long windowId) throws IOException
-    {
-      return null;
-    }
-
-    @Override
-    public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
-    {
-    }
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-    }
-
-    @Override
-    public void teardown()
-    {
-    }
-
-    @Override
-    public void save(Object object, int operatorId, long windowId) throws IOException
-    {
-    }
-
-    @Override
-    public Object load(int operatorId, long windowId) throws IOException
-    {
-      return null;
-    }
-
-    @Override
-    public void delete(int operatorId, long windowId) throws IOException
-    {
-    }
-
-    @Override
-    public void deleteUpTo(int operatorId, long windowId) throws IOException
-    {
-    }
-
-    @Override
-    public long[] getWindowIds(int operatorId) throws IOException
-    {
-      return new long[0];
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index c7ae1c1..02fd6ec 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -32,6 +32,8 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Queues;
@@ -39,7 +41,6 @@ import com.google.common.collect.Queues;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.common.util.NameableThreadFactory;
-import com.datatorrent.lib.util.WindowDataManager;
 import com.datatorrent.netlet.util.Slice;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
new file mode 100644
index 0000000..fd7948a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+/**
+ * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window.
+ * An idempotent agent cannot make any guarantees about the tuples emitted in the application window which fails.
+ *
+ * The order of tuples is guaranteed for ordered input sources.
+ *
+ * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow
+ * checkpoints to occur within an application window and checkpoints must be aligned with
+ * application window boundaries.
+ *
+ * @since 2.0.0
+ */
+public interface WindowDataManager extends StorageAgent, Component<Context.OperatorContext>
+{
+  /**
+   * Gets the largest window for which there is recovery data.
+   * @return Returns the window id
+   */
+  long getLargestRecoveryWindow();
+
+  /**
+   * When an operator can partition itself dynamically then there is no guarantee that an input state which was being
+   * handled by one instance previously will be handled by the same instance after partitioning. <br/>
+   * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no
+   * longer be the instance that handles file X after repartitioning as no. of instances may have changed and file X
+   * is re-hashed to another instance. <br/>
+   * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the
+   * operators for the window being replayed and fix it's state.
+   *
+   * @param windowId window id.
+   * @return mapping of operator id to the corresponding state
+   * @throws IOException
+   */
+  Map<Integer, Object> load(long windowId) throws IOException;
+
+  /**
+   * Delete the artifacts of the operator for windows <= windowId.
+   *
+   * @param operatorId operator id
+   * @param windowId   window id
+   * @throws IOException
+   */
+  void deleteUpTo(int operatorId, long windowId) throws IOException;
+
+  /**
+   * This informs the idempotent storage manager that operator is partitioned so that it can set properties and
+   * distribute state.
+   *
+   * @param newManagers        all the new idempotent storage managers.
+   * @param removedOperatorIds set of operator ids which were removed after partitioning.
+   */
+  void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds);
+
+  /**
+   * An {@link WindowDataManager} that uses FS to persist state.
+   */
+  class FSWindowDataManager implements WindowDataManager
+  {
+    private static final String DEF_RECOVERY_PATH = "idempotentState";
+
+    protected transient FSStorageAgent storageAgent;
+
+    /**
+     * Recovery path relative to app path where state is saved.
+     */
+    @NotNull
+    private String recoveryPath;
+
+    private boolean isRecoveryPathRelativeToAppPath = true;
+
+    /**
+     * largest window for which there is recovery data across all physical operator instances.
+     */
+    protected transient long largestRecoveryWindow;
+
+    /**
+     * This is not null only for one physical instance.<br/>
+     * It consists of operator ids which have been deleted but have some state that can be replayed.
+     * Only one of the instances would be handling (modifying) the files that belong to this state.
+     */
+    protected Set<Integer> deletedOperators;
+
+    /**
+     * Sorted mapping from window id to all the operators that have state to replay for that window.
+     */
+    protected final transient TreeMultimap<Long, Integer> replayState;
+
+    protected transient FileSystem fs;
+    protected transient Path appPath;
+
+    public FSWindowDataManager()
+    {
+      replayState = TreeMultimap.create();
+      largestRecoveryWindow = Stateless.WINDOW_ID;
+      recoveryPath = DEF_RECOVERY_PATH;
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      Configuration configuration = new Configuration();
+      if (isRecoveryPathRelativeToAppPath) {
+        appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+      } else {
+        appPath = new Path(recoveryPath);
+      }
+
+      try {
+        storageAgent = new FSStorageAgent(appPath.toString(), configuration);
+
+        fs = FileSystem.newInstance(appPath.toUri(), configuration);
+
+        if (fs.exists(appPath)) {
+          FileStatus[] fileStatuses = fs.listStatus(appPath);
+
+          for (FileStatus operatorDirStatus : fileStatuses) {
+            int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
+
+            for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
+              String fileName = status.getPath().getName();
+              if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
+                continue;
+              }
+              long windowId = Long.parseLong(fileName, 16);
+              replayState.put(windowId, operatorId);
+              if (windowId > largestRecoveryWindow) {
+                largestRecoveryWindow = windowId;
+              }
+            }
+          }
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void save(Object object, int operatorId, long windowId) throws IOException
+    {
+      storageAgent.save(object, operatorId, windowId);
+    }
+
+    @Override
+    public Object load(int operatorId, long windowId) throws IOException
+    {
+      Set<Integer> operators = replayState.get(windowId);
+      if (operators == null || !operators.contains(operatorId)) {
+        return null;
+      }
+      return storageAgent.load(operatorId, windowId);
+    }
+
+    @Override
+    public void delete(int operatorId, long windowId) throws IOException
+    {
+      storageAgent.delete(operatorId, windowId);
+    }
+
+    @Override
+    public Map<Integer, Object> load(long windowId) throws IOException
+    {
+      Set<Integer> operators = replayState.get(windowId);
+      if (operators == null) {
+        return null;
+      }
+      Map<Integer, Object> data = Maps.newHashMap();
+      for (int operatorId : operators) {
+        data.put(operatorId, load(operatorId, windowId));
+      }
+      return data;
+    }
+
+    @Override
+    public long[] getWindowIds(int operatorId) throws IOException
+    {
+      Path operatorPath = new Path(appPath, String.valueOf(operatorId));
+      if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) {
+        return null;
+      }
+      return storageAgent.getWindowIds(operatorId);
+    }
+
+    /**
+     * This deletes all the recovery files of window ids <= windowId.
+     *
+     * @param operatorId operator id.
+     * @param windowId   the largest window id for which the states will be deleted.
+     * @throws IOException
+     */
+    @Override
+    public void deleteUpTo(int operatorId, long windowId) throws IOException
+    {
+      //deleting the replay state
+      if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
+        Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
+        while (iterator.hasNext()) {
+          Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
+          long lwindow = windowEntry.getKey();
+          if (lwindow > windowId) {
+            break;
+          }
+          for (Integer loperator : windowEntry.getValue()) {
+
+            if (deletedOperators.contains(loperator)) {
+              storageAgent.delete(loperator, lwindow);
+
+              Path loperatorPath = new Path(appPath, Integer.toString(loperator));
+              if (fs.listStatus(loperatorPath).length == 0) {
+                //The operator was deleted and it has nothing to replay.
+                deletedOperators.remove(loperator);
+                fs.delete(loperatorPath, true);
+              }
+            } else if (loperator == operatorId) {
+              storageAgent.delete(loperator, lwindow);
+            }
+          }
+          iterator.remove();
+        }
+      }
+
+      if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) {
+        long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId);
+        Arrays.sort(windowsAfterReplay);
+        for (long lwindow : windowsAfterReplay) {
+          if (lwindow <= windowId) {
+            storageAgent.delete(operatorId, lwindow);
+          }
+        }
+      }
+    }
+
+    @Override
+    public long getLargestRecoveryWindow()
+    {
+      return largestRecoveryWindow;
+    }
+
+    @Override
+    public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+    {
+      Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(),
+          "there has to be one idempotent storage manager");
+      FSWindowDataManager deletedOperatorsManager = null;
+
+      if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) {
+        if (this.deletedOperators == null) {
+          this.deletedOperators = Sets.newHashSet();
+        }
+        this.deletedOperators.addAll(removedOperatorIds);
+      }
+
+      for (WindowDataManager storageManager : newManagers) {
+
+        FSWindowDataManager lmanager = (FSWindowDataManager)storageManager;
+        lmanager.recoveryPath = this.recoveryPath;
+        lmanager.storageAgent = this.storageAgent;
+
+        if (lmanager.deletedOperators != null) {
+          deletedOperatorsManager = lmanager;
+        }
+        //only one physical instance can manage deleted operators so clearing this field for rest of the instances.
+        if (lmanager != deletedOperatorsManager) {
+          lmanager.deletedOperators = null;
+        }
+      }
+
+      if (removedOperatorIds == null || removedOperatorIds.isEmpty()) {
+        //Nothing to do
+        return;
+      }
+      if (this.deletedOperators != null) {
+
+        /*If some operators were removed then there needs to be a manager which can clean there state when it is not
+        needed.*/
+        if (deletedOperatorsManager == null) {
+          //None of the managers were handling deleted operators data.
+          deletedOperatorsManager = (FSWindowDataManager)newManagers.iterator().next();
+          deletedOperatorsManager.deletedOperators = Sets.newHashSet();
+        }
+
+        deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds);
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    /**
+     * @return recovery path
+     */
+    public String getRecoveryPath()
+    {
+      return recoveryPath;
+    }
+
+    /**
+     * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative
+     * to the application path; otherwise it is handled as an absolute path.
+     *
+     * @param recoveryPath recovery path
+     */
+    public void setRecoveryPath(String recoveryPath)
+    {
+      this.recoveryPath = recoveryPath;
+    }
+
+    /**
+     * @return true if recovery path is relative to app path; false otherwise.
+     */
+    public boolean isRecoveryPathRelativeToAppPath()
+    {
+      return isRecoveryPathRelativeToAppPath;
+    }
+
+    /**
+     * Specifies whether the recovery path is relative to application path.
+     *
+     * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise.
+     */
+    public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath)
+    {
+      isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath;
+    }
+  }
+
+  /**
+   * This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators
+   * can use the same logic for maintaining idempotency and avoiding idempotency.
+   */
+  class NoopWindowDataManager implements WindowDataManager
+  {
+    @Override
+    public long getLargestRecoveryWindow()
+    {
+      return Stateless.WINDOW_ID;
+    }
+
+    @Override
+    public Map<Integer, Object> load(long windowId) throws IOException
+    {
+      return null;
+    }
+
+    @Override
+    public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+    {
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+
+    @Override
+    public void save(Object object, int operatorId, long windowId) throws IOException
+    {
+    }
+
+    @Override
+    public Object load(int operatorId, long windowId) throws IOException
+    {
+      return null;
+    }
+
+    @Override
+    public void delete(int operatorId, long windowId) throws IOException
+    {
+    }
+
+    @Override
+    public void deleteUpTo(int operatorId, long windowId) throws IOException
+    {
+    }
+
+    @Override
+    public long[] getWindowIds(int operatorId) throws IOException
+    {
+      return new long[0];
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
deleted file mode 100644
index 26996e7..0000000
--- a/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeSet;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-
-/**
- * Tests for {@link WindowDataManager}
- */
-public class FSWindowDataManagerTest
-{
-  private static class TestMeta extends TestWatcher
-  {
-
-    String applicationPath;
-    WindowDataManager.FSWindowDataManager storageManager;
-    Context.OperatorContext context;
-
-    @Override
-    protected void starting(Description description)
-    {
-      TestUtils.deleteTargetTestClassFolder(description);
-      super.starting(description);
-      storageManager = new WindowDataManager.FSWindowDataManager();
-      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
-
-      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
-      attributes.put(DAG.APPLICATION_PATH, applicationPath);
-      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      TestUtils.deleteTargetTestClassFolder(description);
-    }
-  }
-
-  @Rule
-  public TestMeta testMeta = new TestMeta();
-
-  @Test
-  public void testLargestRecoveryWindow()
-  {
-    testMeta.storageManager.setup(testMeta.context);
-    Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow());
-    testMeta.storageManager.teardown();
-  }
-
-  @Test
-  public void testSave() throws IOException
-  {
-    testMeta.storageManager.setup(testMeta.context);
-    Map<Integer, String> data = Maps.newHashMap();
-    data.put(1, "one");
-    data.put(2, "two");
-    data.put(3, "three");
-    testMeta.storageManager.save(data, 1, 1);
-    testMeta.storageManager.setup(testMeta.context);
-    @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
-    Assert.assertEquals("dataOf1", data, decoded);
-    testMeta.storageManager.teardown();
-  }
-
-  @Test
-  public void testLoad() throws IOException
-  {
-    testMeta.storageManager.setup(testMeta.context);
-    Map<Integer, String> dataOf1 = Maps.newHashMap();
-    dataOf1.put(1, "one");
-    dataOf1.put(2, "two");
-    dataOf1.put(3, "three");
-
-    Map<Integer, String> dataOf2 = Maps.newHashMap();
-    dataOf2.put(4, "four");
-    dataOf2.put(5, "five");
-    dataOf2.put(6, "six");
-
-    testMeta.storageManager.save(dataOf1, 1, 1);
-    testMeta.storageManager.save(dataOf2, 2, 1);
-    testMeta.storageManager.setup(testMeta.context);
-    Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
-    Assert.assertEquals("no of states", 2, decodedStates.size());
-    for (Integer operatorId : decodedStates.keySet()) {
-      if (operatorId == 1) {
-        Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
-      } else {
-        Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
-      }
-    }
-    testMeta.storageManager.teardown();
-  }
-
-  @Test
-  public void testRecovery() throws IOException
-  {
-    testMeta.storageManager.setup(testMeta.context);
-    Map<Integer, String> dataOf1 = Maps.newHashMap();
-    dataOf1.put(1, "one");
-    dataOf1.put(2, "two");
-    dataOf1.put(3, "three");
-
-    Map<Integer, String> dataOf2 = Maps.newHashMap();
-    dataOf2.put(4, "four");
-    dataOf2.put(5, "five");
-    dataOf2.put(6, "six");
-
-    testMeta.storageManager.save(dataOf1, 1, 1);
-    testMeta.storageManager.save(dataOf2, 2, 2);
-
-    testMeta.storageManager.setup(testMeta.context);
-    Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow());
-    testMeta.storageManager.teardown();
-  }
-
-  @Test
-  public void testDelete() throws IOException
-  {
-    testMeta.storageManager.setup(testMeta.context);
-    Map<Integer, String> dataOf1 = Maps.newHashMap();
-    dataOf1.put(1, "one");
-    dataOf1.put(2, "two");
-    dataOf1.put(3, "three");
-
-    Map<Integer, String> dataOf2 = Maps.newHashMap();
-    dataOf2.put(4, "four");
-    dataOf2.put(5, "five");
-    dataOf2.put(6, "six");
-
-    Map<Integer, String> dataOf3 = Maps.newHashMap();
-    dataOf2.put(7, "seven");
-    dataOf2.put(8, "eight");
-    dataOf2.put(9, "nine");
-
-    for (int i = 1; i <= 9; ++i) {
-      testMeta.storageManager.save(dataOf1, 1, i);
-    }
-
-    testMeta.storageManager.save(dataOf2, 2, 1);
-    testMeta.storageManager.save(dataOf3, 3, 1);
-
-    testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
-        Sets.newHashSet(2, 3));
-    testMeta.storageManager.setup(testMeta.context);
-    testMeta.storageManager.deleteUpTo(1, 6);
-
-    Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
-    FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
-    FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
-    Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
-    TreeSet<String> windows = Sets.newTreeSet();
-    for (FileStatus fileStatus : fileStatuses) {
-      windows.add(fileStatus.getPath().getName());
-    }
-    Assert.assertEquals("window list for 1", Sets.newTreeSet(Arrays.asList("7", "8", "9")), windows);
-    Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
-    Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
-    testMeta.storageManager.teardown();
-  }
-
-  @Test
-  public void testAbsoluteRecoveryPath() throws IOException
-  {
-    testMeta.storageManager.setRecoveryPathRelativeToAppPath(false);
-    long time = System.currentTimeMillis();
-    testMeta.storageManager.setRecoveryPath("target/" + time);
-    testSave();
-    File recoveryDir = new File("target/" + time);
-    Assert.assertTrue("recover path exist", recoveryDir.isDirectory());
-    FileUtils.deleteDirectory(recoveryDir);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4ef0700a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
new file mode 100644
index 0000000..dff061a
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link WindowDataManager}
+ */
+public class FSWindowDataManagerTest
+{
+  private static class TestMeta extends TestWatcher
+  {
+
+    String applicationPath;
+    WindowDataManager.FSWindowDataManager storageManager;
+    Context.OperatorContext context;
+
+    @Override
+    protected void starting(Description description)
+    {
+      TestUtils.deleteTargetTestClassFolder(description);
+      super.starting(description);
+      storageManager = new WindowDataManager.FSWindowDataManager();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      TestUtils.deleteTargetTestClassFolder(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testLargestRecoveryWindow()
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow());
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testSave() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> data = Maps.newHashMap();
+    data.put(1, "one");
+    data.put(2, "two");
+    data.put(3, "three");
+    testMeta.storageManager.save(data, 1, 1);
+    testMeta.storageManager.setup(testMeta.context);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
+    Assert.assertEquals("dataOf1", data, decoded);
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testLoad() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    testMeta.storageManager.save(dataOf1, 1, 1);
+    testMeta.storageManager.save(dataOf2, 2, 1);
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
+    Assert.assertEquals("no of states", 2, decodedStates.size());
+    for (Integer operatorId : decodedStates.keySet()) {
+      if (operatorId == 1) {
+        Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
+      } else {
+        Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
+      }
+    }
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testRecovery() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    testMeta.storageManager.save(dataOf1, 1, 1);
+    testMeta.storageManager.save(dataOf2, 2, 2);
+
+    testMeta.storageManager.setup(testMeta.context);
+    Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow());
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testDelete() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    Map<Integer, String> dataOf3 = Maps.newHashMap();
+    dataOf2.put(7, "seven");
+    dataOf2.put(8, "eight");
+    dataOf2.put(9, "nine");
+
+    for (int i = 1; i <= 9; ++i) {
+      testMeta.storageManager.save(dataOf1, 1, i);
+    }
+
+    testMeta.storageManager.save(dataOf2, 2, 1);
+    testMeta.storageManager.save(dataOf3, 3, 1);
+
+    testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
+        Sets.newHashSet(2, 3));
+    testMeta.storageManager.setup(testMeta.context);
+    testMeta.storageManager.deleteUpTo(1, 6);
+
+    Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
+    FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
+    FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+    Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+    TreeSet<String> windows = Sets.newTreeSet();
+    for (FileStatus fileStatus : fileStatuses) {
+      windows.add(fileStatus.getPath().getName());
+    }
+    Assert.assertEquals("window list for 1", Sets.newTreeSet(Arrays.asList("7", "8", "9")), windows);
+    Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
+    Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testAbsoluteRecoveryPath() throws IOException
+  {
+    testMeta.storageManager.setRecoveryPathRelativeToAppPath(false);
+    long time = System.currentTimeMillis();
+    testMeta.storageManager.setRecoveryPath("target/" + time);
+    testSave();
+    File recoveryDir = new File("target/" + time);
+    Assert.assertTrue("recover path exist", recoveryDir.isDirectory());
+    FileUtils.deleteDirectory(recoveryDir);
+  }
+
+}


Mime
View raw message