apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-malhar git commit: 1. Marking BytesFileOutputOperator as @evolving 2. Added japicmp exclusion for this class 3. Added Converter field for pluging-in custom conversion to byte[] 4. Provided default converters for no-op, String 5. Removed in
Date Fri, 03 Jun 2016 20:27:35 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 422f5d946 -> 682c7ba59


1. Marking BytesFileOutputOperator as @evolving
2. Added japicmp exclusion for this class
3. Added Converter field for pluging-in custom conversion to byte[]
4. Provided default converters for no-op, String
5. Removed input port stringInput

6. Added javadocs

7. Renamed BytesFileOutputOperator-> GenericFileOutputOperator

8. Introduced BytesFileOutputOperator, String FileOutputOperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/682c7ba5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/682c7ba5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/682c7ba5

Branch: refs/heads/master
Commit: 682c7ba59105671415e7e115333ae41a4f2f0942
Parents: 422f5d9
Author: yogidevendra <devendra@datatorrent.com>
Authored: Thu Jun 2 16:24:12 2016 +0530
Committer: yogidevendra <devendra@datatorrent.com>
Committed: Fri Jun 3 15:48:51 2016 +0530

----------------------------------------------------------------------
 .../malhar/lib/fs/BytesFileOutputOperator.java  | 295 ----------------
 .../lib/fs/GenericFileOutputOperator.java       | 340 +++++++++++++++++++
 .../lib/fs/BytesFileOutputOperatorTest.java     | 151 --------
 .../lib/fs/GenericFileOutputOperatorTest.java   | 153 +++++++++
 pom.xml                                         |   1 +
 5 files changed, 494 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java
b/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java
deleted file mode 100644
index 9567872..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.apex.malhar.lib.fs;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.AutoMetric;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * This class is responsible for writing tuples to HDFS. All tuples are written
- * to the same file. Rolling file based on size, no. of tuples, idle windows,
- * elapsed windows is supported.
- *
- * @since 3.4.0
- */
-
-public class BytesFileOutputOperator extends AbstractSingleFileOutputOperator<byte[]>
-{
-
-  /**
-   * Flag to mark if new data in current application window
-   */
-  private transient boolean isNewDataInCurrentWindow;
-
-  /**
-   * Separator between the tuples
-   */
-  private String tupleSeparator;
-
-  /**
-   * byte[] representation of tupleSeparator
-   */
-  private transient byte[] tupleSeparatorBytes;
-
-  /**
-   * No. of bytes received in current application window
-   */
-  @AutoMetric
-  private long byteCount;
-
-  /**
-   * No. of tuples present in current part for file
-   */
-  private long currentPartTupleCount;
-
-  /**
-   * Max. number of tuples allowed per part. Part file will be finalized after
-   * these many tuples
-   */
-  private long maxTupleCount = Long.MAX_VALUE;
-
-  /**
-   * No. of windows since last new data received
-   */
-  private long currentPartIdleWindows;
-
-  /**
-   * Max number of idle windows for which no new data is added to current part
-   * file. Part file will be finalized after these many idle windows after last
-   * new data.
-   */
-  private long maxIdleWindows = Long.MAX_VALUE;
-
-  /**
-   * Stream codec for string input port
-   */
-  protected StreamCodec<String> stringStreamCodec;
-
-  /**
-   * Default value for stream expiry
-   */
-  private static final long DEFAULT_STREAM_EXPIRY_ACCESS_MILL = 60 * 60 * 1000L; //1 hour
-
-  /**
-   * Default value for rotation windows
-   */
-  private static final int DEFAULT_ROTATION_WINDOWS = 2 * 60 * 10; //10 min  
-
-  /**
-   * Initializing default values for tuple separator, stream expiry, rotation
-   * windows
-   */
-  public BytesFileOutputOperator()
-  {
-    setTupleSeparator(System.getProperty("line.separator"));
-    setExpireStreamAfterAccessMillis(DEFAULT_STREAM_EXPIRY_ACCESS_MILL);
-    setRotationWindows(DEFAULT_ROTATION_WINDOWS);
-  }
-
-  /**
-   * Input port for receiving string tuples.
-   */
-  public final transient DefaultInputPort<String> stringInput = new DefaultInputPort<String>()
-  {
-    @Override
-    public void process(String tuple)
-    {
-      processTuple(tuple.getBytes());
-    }
-
-    @Override
-    public StreamCodec<String> getStreamCodec()
-    {
-      if (BytesFileOutputOperator.this.stringStreamCodec == null) {
-        return super.getStreamCodec();
-      } else {
-        return stringStreamCodec;
-      }
-    }
-  };
-
-  /**
-   * {@inheritDoc}
-   * 
-   * @return byte[] representation of the given tuple. if input tuple is of type
-   *         byte[] then it is returned as it is. for any other type toString()
-   *         representation is used to generate byte[].
-   */
-  @Override
-  protected byte[] getBytesForTuple(byte[] tuple)
-  {
-    ByteArrayOutputStream bytesOutStream = new ByteArrayOutputStream();
-
-    try {
-      bytesOutStream.write(tuple);
-      bytesOutStream.write(tupleSeparatorBytes);
-      byteCount += bytesOutStream.size();
-      return bytesOutStream.toByteArray();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      try {
-        bytesOutStream.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  /**
-   * Initializing per window level fields {@inheritDoc}
-   */
-  @Override
-  public void beginWindow(long windowId)
-  {
-    super.beginWindow(windowId);
-    byteCount = 0;
-    isNewDataInCurrentWindow = false;
-  }
-
-  /**
-   * {@inheritDoc} Does additional state maintenance for rollover
-   */
-  @Override
-  protected void processTuple(byte[] tuple)
-  {
-    super.processTuple(tuple);
-    isNewDataInCurrentWindow = true;
-
-    if (++currentPartTupleCount == maxTupleCount) {
-      rotateCall(getPartitionedFileName());
-    }
-  }
-
-  /**
-   * {@inheritDoc} Does additional checks if file should be rolled over for this
-   * window.
-   */
-  @Override
-  public void endWindow()
-  {
-    super.endWindow();
-
-    if (!isNewDataInCurrentWindow) {
-      ++currentPartIdleWindows;
-    } else {
-      currentPartIdleWindows = 0;
-    }
-
-    if (checkEndWindowFinalization()) {
-      rotateCall(getPartitionedFileName());
-    }
-  }
-
-  /**
-   * Rollover check at the endWindow
-   */
-  private boolean checkEndWindowFinalization()
-  {
-    if ((currentPartIdleWindows == maxIdleWindows)) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * {@inheritDoc} Handles file rotation along with exception handling
-   * 
-   * @param lastFile
-   */
-  protected void rotateCall(String lastFile)
-  {
-    try {
-      this.rotate(lastFile);
-      currentPartIdleWindows = 0;
-      currentPartTupleCount = 0;
-    } catch (IOException ex) {
-      LOG.error("Exception in file rotation", ex);
-      DTThrowable.rethrow(ex);
-    } catch (ExecutionException ex) {
-      LOG.error("Exception in file rotation", ex);
-      DTThrowable.rethrow(ex);
-    }
-  }
-
-
-  /**
-   * @return Separator between the tuples
-   */
-  public String getTupleSeparator()
-  {
-    return tupleSeparator;
-  }
-
-  /**
-   * @param separator
-   *          Separator between the tuples
-   */
-  public void setTupleSeparator(String separator)
-  {
-    this.tupleSeparator = separator;
-    this.tupleSeparatorBytes = separator.getBytes();
-  }
-
-  /**
-   * @return max tuples in a part file
-   */
-  public long getMaxTupleCount()
-  {
-    return maxTupleCount;
-  }
-
-  /**
-   * @param maxTupleCount
-   *          max tuples in a part file
-   */
-  public void setMaxTupleCount(long maxTupleCount)
-  {
-    this.maxTupleCount = maxTupleCount;
-  }
-
-  /**
-   * @return max number of idle windows for rollover
-   */
-  public long getMaxIdleWindows()
-  {
-    return maxIdleWindows;
-  }
-
-  /**
-   * @param maxIdleWindows max number of idle windows for rollover
-   */
-  public void setMaxIdleWindows(long maxIdleWindows)
-  {
-    this.maxIdleWindows = maxIdleWindows;
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(BytesFileOutputOperator.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java
b/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java
new file mode 100644
index 0000000..017a890
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.converter.Converter;
+import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This class is responsible for writing tuples to HDFS. All tuples are written
+ * to the same file. Rolling file based on size, no. of tuples, idle windows,
+ * elapsed windows is supported.
+ *
+ * @since 3.4.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class GenericFileOutputOperator<INPUT> extends AbstractSingleFileOutputOperator<INPUT>
+{
+
+  /**
+   * Flag to mark if new data in current application window
+   */
+  private transient boolean isNewDataInCurrentWindow;
+
+  /**
+   * Separator between the tuples
+   */
+  private String tupleSeparator;
+
+  /**
+   * byte[] representation of tupleSeparator
+   */
+  private transient byte[] tupleSeparatorBytes;
+
+  /**
+   * No. of bytes received in current application window
+   */
+  @AutoMetric
+  private long byteCount;
+
+  /**
+   * No. of tuples present in current part for file
+   */
+  private long currentPartTupleCount;
+
+  /**
+   * Max. number of tuples allowed per part. Part file will be finalized after
+   * these many tuples
+   */
+  private long maxTupleCount = Long.MAX_VALUE;
+
+  /**
+   * No. of windows since last new data received
+   */
+  private long currentPartIdleWindows;
+
+  /**
+   * Converter for conversion of input tuples to byte[]
+   */
+  @NotNull
+  private Converter<INPUT, byte[]> converter;
+
+  /**
+   * Max number of idle windows for which no new data is added to current part
+   * file. Part file will be finalized after these many idle windows after last
+   * new data.
+   */
+  private long maxIdleWindows = Long.MAX_VALUE;
+
+  /**
+   * Stream codec for string input port
+   */
+  protected StreamCodec<String> stringStreamCodec;
+
+  /**
+   * Default value for stream expiry
+   */
+  private static final long DEFAULT_STREAM_EXPIRY_ACCESS_MILL = 60 * 60 * 1000L; //1 hour
+
+  /**
+   * Default value for rotation windows
+   */
+  private static final int DEFAULT_ROTATION_WINDOWS = 2 * 60 * 10; //10 min  
+
+  /**
+   * Initializing default values for tuple separator, stream expiry, rotation
+   * windows
+   */
+  public GenericFileOutputOperator()
+  {
+    setTupleSeparator(System.getProperty("line.separator"));
+    setExpireStreamAfterAccessMillis(DEFAULT_STREAM_EXPIRY_ACCESS_MILL);
+    setRotationWindows(DEFAULT_ROTATION_WINDOWS);
+  }
+
+  /**
+   * {@inheritDoc}
+   * 
+   * @return byte[] representation of the given tuple. if input tuple is of type
+   *         byte[] then it is returned as it is. for any other type toString()
+   *         representation is used to generate byte[].
+   */
+  @Override
+  protected byte[] getBytesForTuple(INPUT tuple)
+  {
+    ByteArrayOutputStream bytesOutStream = new ByteArrayOutputStream();
+
+    try {
+      bytesOutStream.write(converter.convert(tuple));
+      bytesOutStream.write(tupleSeparatorBytes);
+      byteCount += bytesOutStream.size();
+      return bytesOutStream.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        bytesOutStream.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Initializing per window level fields {@inheritDoc}
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    byteCount = 0;
+    isNewDataInCurrentWindow = false;
+  }
+
+  /**
+   * {@inheritDoc} Does additional state maintenance for rollover
+   */
+  @Override
+  protected void processTuple(INPUT tuple)
+  {
+    super.processTuple(tuple);
+    isNewDataInCurrentWindow = true;
+
+    if (++currentPartTupleCount == maxTupleCount) {
+      rotateCall(getPartitionedFileName());
+    }
+  }
+
+  /**
+   * {@inheritDoc} Does additional checks if file should be rolled over for this
+   * window.
+   */
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+
+    if (!isNewDataInCurrentWindow) {
+      ++currentPartIdleWindows;
+    } else {
+      currentPartIdleWindows = 0;
+    }
+
+    if (checkEndWindowFinalization()) {
+      rotateCall(getPartitionedFileName());
+    }
+  }
+
+  /**
+   * Rollover check at the endWindow
+   */
+  private boolean checkEndWindowFinalization()
+  {
+    if ((currentPartIdleWindows == maxIdleWindows)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * {@inheritDoc} Handles file rotation along with exception handling
+   * 
+   * @param lastFile
+   */
+  protected void rotateCall(String lastFile)
+  {
+    try {
+      this.rotate(lastFile);
+      currentPartIdleWindows = 0;
+      currentPartTupleCount = 0;
+    } catch (IOException ex) {
+      LOG.error("Exception in file rotation", ex);
+      DTThrowable.rethrow(ex);
+    } catch (ExecutionException ex) {
+      LOG.error("Exception in file rotation", ex);
+      DTThrowable.rethrow(ex);
+    }
+  }
+
+  /**
+   * @return Separator between the tuples
+   */
+  public String getTupleSeparator()
+  {
+    return tupleSeparator;
+  }
+
+  /**
+   * @param separator
+   *          Separator between the tuples
+   */
+  public void setTupleSeparator(String separator)
+  {
+    this.tupleSeparator = separator;
+    this.tupleSeparatorBytes = separator.getBytes();
+  }
+
+  /**
+   * @return max tuples in a part file
+   */
+  public long getMaxTupleCount()
+  {
+    return maxTupleCount;
+  }
+
+  /**
+   * @param maxTupleCount
+   *          max tuples in a part file
+   */
+  public void setMaxTupleCount(long maxTupleCount)
+  {
+    this.maxTupleCount = maxTupleCount;
+  }
+
+  /**
+   * @return max number of idle windows for rollover
+   */
+  public long getMaxIdleWindows()
+  {
+    return maxIdleWindows;
+  }
+
+  /**
+   * @param maxIdleWindows
+   *          max number of idle windows for rollover
+   */
+  public void setMaxIdleWindows(long maxIdleWindows)
+  {
+    this.maxIdleWindows = maxIdleWindows;
+  }
+
+  /**
+   * Converter for conversion of input tuples to byte[]
+   * @return converter
+   */
+  public Converter<INPUT, byte[]> getConverter()
+  {
+    return converter;
+  }
+
+  /**
+   * Converter for conversion of input tuples to byte[]
+   * @param converter
+   */
+  public void setConverter(Converter<INPUT, byte[]> converter)
+  {
+    this.converter = converter;
+  }
+  
+  /**
+   * Converter returning input tuples as byte[] without any conversion
+   */
+  public static class NoOpConverter implements Converter<byte[], byte[]>
+  {
+    @Override
+    public byte[] convert(byte[] tuple)
+    {
+      return tuple;
+    }
+  }
+
+  public static class BytesFileOutputOperator extends GenericFileOutputOperator<byte[]>
+  {
+    
+    public BytesFileOutputOperator()
+    {
+      setConverter(new NoOpConverter());
+    }
+  }
+  
+  /**
+   * Converter returning byte[] conversion of the input String.
+   */
+  public static class StringToBytesConverter implements Converter<String, byte[]>
+  {
+    @Override
+    public byte[] convert(String tuple)
+    {
+      return ((String)tuple).getBytes();
+    }
+  }
+  
+  public static class StringFileOutputOperator extends GenericFileOutputOperator<String>
+  {
+    public StringFileOutputOperator()
+    {
+      setConverter(new StringToBytesConverter());
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(GenericFileOutputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java
b/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java
deleted file mode 100644
index 1ea7352..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.apex.malhar.lib.fs;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.commons.io.FileUtils;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest;
-import com.datatorrent.netlet.util.DTThrowable;
-
-public class BytesFileOutputOperatorTest extends AbstractFileOutputOperatorTest
-{
-
-  /**
-   * Test file rollover in case of idle windows
-   * 
-   * @throws IOException
-   */
-  @Test
-  public void testIdleWindowsFinalize() throws IOException
-  {
-    BytesFileOutputOperator writer = new BytesFileOutputOperator();
-    writer.setOutputFileName("output.txt");
-    writer.setFilePath(testMeta.getDir());
-    writer.setAlwaysWriteToTmp(true);
-    writer.setMaxIdleWindows(5);
-    writer.setup(testMeta.testOperatorContext);
-
-    String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {}, {}, {}, {"6a", "6b" }, {"7a",
"7b" }, {}, {}, {},
-        {}, {}, {"13a", "13b" }, {"14a", "14b" }, {}, {}, {}, {"18a", "18b" }, {"19a", "19b"
}, {}, {}, {}, {}, {},
-        {}, {"26a", "26b"} };
-
-    for (int i = 0; i <= 12; i++) {
-      writer.beginWindow(i);
-      for (String t : tuples[i]) {
-        writer.stringInput.put(t);
-      }
-      writer.endWindow();
-    }
-    writer.committed(10);
-
-    for (int i = 13; i <= 26; i++) {
-      writer.beginWindow(i);
-      for (String t : tuples[i]) {
-        writer.stringInput.put(t);
-      }
-      writer.endWindow();
-    }
-    writer.committed(20);
-    writer.committed(26);
-
-    String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n",
-        "26a\n26b\n" };
-
-    for (int i = 0; i < expected.length; i++) {
-      checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
-    }
-  }
-
-  /**
-   * Test file rollover for tuple count
-   * 
-   * @throws IOException
-   */
-  @Test
-  public void testTupleCountFinalize() throws IOException
-  {
-    BytesFileOutputOperator writer = new BytesFileOutputOperator();
-    writer.setOutputFileName("output.txt");
-    writer.setFilePath(testMeta.getDir());
-    writer.setAlwaysWriteToTmp(true);
-    writer.setMaxTupleCount(10);
-    writer.setup(testMeta.testOperatorContext);
-
-    String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {"3a", "3b" }, {"4a", "4b" },
{}, {"6a", "6b" },
-        {"7a", "7b" }, {}, {}, {"9a" }, {"10a", "10b" }, {}, {"12a" }, {"13a", "13b"}, {"14a",
"14b" }, {}, {},
-        {}, {"18a", "18b" }, {"19a", "19b" }, {"20a" }, {"21a" }, {"22a" }};
-
-    for (int i = 0; i < tuples.length; i++) {
-      writer.beginWindow(i);
-      for (String t : tuples[i]) {
-        writer.stringInput.put(t);
-      }
-      writer.endWindow();
-      if (i % 10 == 0) {
-        writer.committed(10);
-      }
-    }
-    writer.committed(tuples.length);
-
-    String[] expected = {"0a\n0b\n1a\n1b\n3a\n3b\n4a\n4b\n6a\n6b\n", "7a\n7b\n9a\n10a\n10b\n12a\n13a\n13b\n14a\n14b\n",
-        "18a\n18b\n19a\n19b\n20a\n21a\n22a\n" };
-
-    for (int i = 0; i < expected.length; i++) {
-      checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
-    }
-  }
-
-  public static void checkOutput(int fileCount, String baseFilePath, String expectedOutput,
boolean checkTmp)
-  {
-    if (fileCount >= 0) {
-      baseFilePath += "." + fileCount;
-    }
-
-    File file = new File(baseFilePath);
-
-    if (!file.exists()) {
-      String[] extensions = {"tmp"};
-      Collection<File> tmpFiles = FileUtils.listFiles(file.getParentFile(), extensions,
false);
-      for (File tmpFile : tmpFiles) {
-        if (file.getPath().startsWith(baseFilePath)) {
-          file = tmpFile;
-          break;
-        }
-      }
-    }
-
-    String fileContents = null;
-
-    try {
-      fileContents = FileUtils.readFileToString(file);
-    } catch (IOException ex) {
-      DTThrowable.rethrow(ex);
-    }
-
-    Assert.assertEquals("Single file " + fileCount + " output contents", expectedOutput,
fileContents);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
new file mode 100644
index 0000000..52d5c5a
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.BytesFileOutputOperator;
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator;
+import org.apache.commons.io.FileUtils;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest;
+import com.datatorrent.netlet.util.DTThrowable;
+
+public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTest
+{
+
+  /**
+   * Test file rollover in case of idle windows
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testIdleWindowsFinalize() throws IOException
+  {
+    StringFileOutputOperator writer = new StringFileOutputOperator();
+    writer.setOutputFileName("output.txt");
+    writer.setFilePath(testMeta.getDir());
+    writer.setAlwaysWriteToTmp(true);
+    writer.setMaxIdleWindows(5);
+    writer.setup(testMeta.testOperatorContext);
+
+    String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {}, {}, {}, {"6a", "6b" }, {"7a",
"7b" }, {}, {}, {},
+        {}, {}, {"13a", "13b" }, {"14a", "14b" }, {}, {}, {}, {"18a", "18b" }, {"19a", "19b"
}, {}, {}, {}, {}, {},
+        {}, {"26a", "26b"} };
+
+    for (int i = 0; i <= 12; i++) {
+      writer.beginWindow(i);
+      for (String t : tuples[i]) {
+        writer.input.put(t);
+      }
+      writer.endWindow();
+    }
+    writer.committed(10);
+
+    for (int i = 13; i <= 26; i++) {
+      writer.beginWindow(i);
+      for (String t : tuples[i]) {
+        writer.input.put(t);
+      }
+      writer.endWindow();
+    }
+    writer.committed(20);
+    writer.committed(26);
+
+    String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n",
+        "26a\n26b\n" };
+
+    for (int i = 0; i < expected.length; i++) {
+      checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
+    }
+  }
+
+  /**
+   * Test file rollover for tuple count
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testTupleCountFinalize() throws IOException
+  {
+    BytesFileOutputOperator writer = new BytesFileOutputOperator();
+    writer.setOutputFileName("output.txt");
+    writer.setFilePath(testMeta.getDir());
+    writer.setAlwaysWriteToTmp(true);
+    writer.setMaxTupleCount(10);
+    writer.setup(testMeta.testOperatorContext);
+
+    String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {"3a", "3b" }, {"4a", "4b" },
{}, {"6a", "6b" },
+        {"7a", "7b" }, {}, {}, {"9a" }, {"10a", "10b" }, {}, {"12a" }, {"13a", "13b"}, {"14a",
"14b" }, {}, {},
+        {}, {"18a", "18b" }, {"19a", "19b" }, {"20a" }, {"21a" }, {"22a" }};
+
+    for (int i = 0; i < tuples.length; i++) {
+      writer.beginWindow(i);
+      for (String t : tuples[i]) {
+        writer.input.put(t.getBytes());
+      }
+      writer.endWindow();
+      if (i % 10 == 0) {
+        writer.committed(10);
+      }
+    }
+    writer.committed(tuples.length);
+
+    String[] expected = {"0a\n0b\n1a\n1b\n3a\n3b\n4a\n4b\n6a\n6b\n", "7a\n7b\n9a\n10a\n10b\n12a\n13a\n13b\n14a\n14b\n",
+        "18a\n18b\n19a\n19b\n20a\n21a\n22a\n" };
+
+    for (int i = 0; i < expected.length; i++) {
+      checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
+    }
+  }
+  
+  public static void checkOutput(int fileCount, String baseFilePath, String expectedOutput,
boolean checkTmp)
+  {
+    if (fileCount >= 0) {
+      baseFilePath += "." + fileCount;
+    }
+
+    File file = new File(baseFilePath);
+
+    if (!file.exists()) {
+      String[] extensions = {"tmp"};
+      Collection<File> tmpFiles = FileUtils.listFiles(file.getParentFile(), extensions,
false);
+      for (File tmpFile : tmpFiles) {
+        if (file.getPath().startsWith(baseFilePath)) {
+          file = tmpFile;
+          break;
+        }
+      }
+    }
+
+    String fileContents = null;
+
+    try {
+      fileContents = FileUtils.readFileToString(file);
+    } catch (IOException ex) {
+      DTThrowable.rethrow(ex);
+    }
+
+    Assert.assertEquals("Single file " + fileCount + " output contents", expectedOutput,
fileContents);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/682c7ba5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 707db06..3f1b6ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
               <excludes>
                 <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
                 <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
+                <exclude>org.apache.apex.malhar.lib.fs.BytesFileOutputOperator</exclude>
               </excludes>
             </parameter>
             <skip>${semver.plugin.skip}</skip>


Mime
View raw message