apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csi...@apache.org
Subject [1/2] incubator-apex-malhar git commit: Created a separate line by line file input reader operator
Date Mon, 04 Apr 2016 20:42:45 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 723189b4f -> b4fd6a60d


Created a separate line by line file input reader operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1e979927
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1e979927
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1e979927

Branch: refs/heads/master
Commit: 1e979927a72fdd2707b68f8fe732c9bb93afd50b
Parents: 264f629
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Thu Mar 24 18:24:15 2016 -0700
Committer: Pramod Immaneni <pramod@datatorrent.com>
Committed: Mon Apr 4 11:25:58 2016 -0700

----------------------------------------------------------------------
 library/pom.xml                                 |  2 +-
 .../lib/io/fs/AbstractFileInputOperator.java    | 44 +++++++++---
 .../malhar/fs/LineByLineFileInputOperator.java  | 75 ++++++++++++++++++++
 .../io/fs/AbstractFileInputOperatorTest.java    | 62 ++++++++--------
 4 files changed, 145 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1e979927/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 5ff8842..d5511c4 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -186,7 +186,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>15966</maxAllowedViolations>
+          <maxAllowedViolations>15959</maxAllowedViolations>
           <consoleOutput>false</consoleOutput>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1e979927/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 0bcf956..ef27cf1 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -18,30 +18,52 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import java.io.*;
-import java.util.*;
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import javax.validation.constraints.NotNull;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.fs.LineByLineFileInputOperator;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import com.datatorrent.lib.counters.BasicCounters;
-import com.datatorrent.lib.io.IdempotentStorageManager;
-import com.datatorrent.api.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.Context.CountersAggregator;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
 /**
@@ -1154,11 +1176,15 @@ public abstract class AbstractFileInputOperator<T> implements
InputOperator, Par
   }
 
   /**
+   * This class is deprecated, use {@link LineByLineFileInputOperator}
+   * <p>
    * This is an implementation of the {@link AbstractFileInputOperator} that outputs the
lines in a file.&nbsp;
    * Each line is emitted as a separate tuple.&nbsp; It is emitted as a String.
+   * </p>
    * <p>
    * The directory path where to scan and read files from should be specified using the {@link
#directory} property.
    * </p>
+   * @deprecated
    * @displayName File Line Input
    * @category Input
    * @tags fs, file, line, lines, input operator

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1e979927/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java
b/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java
new file mode 100644
index 0000000..ad85d01
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/fs/LineByLineFileInputOperator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.fs;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+/**
+ * This is an extension of the {@link AbstractFileInputOperator} that outputs the contents
of a file line by line.&nbsp;
+ * Each line is emitted as a separate tuple in string format.
+ * <p>
+ * The directory path where to scan and read files from should be specified using the {@link
#directory} property.
+ * </p>
+ * @displayName File Line Input
+ * @category Input
+ * @tags fs, file, line, lines, input operator
+ */
+public class LineByLineFileInputOperator extends AbstractFileInputOperator<String>
+{
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+
+  protected transient BufferedReader br;
+
+  @Override
+  protected InputStream openFile(Path path) throws IOException
+  {
+    InputStream is = super.openFile(path);
+    br = new BufferedReader(new InputStreamReader(is));
+    return is;
+  }
+
+  @Override
+  protected void closeFile(InputStream is) throws IOException
+  {
+    super.closeFile(is);
+    br.close();
+    br = null;
+  }
+
+  @Override
+  protected String readEntity() throws IOException
+  {
+    return br.readLine();
+  }
+
+  @Override
+  protected void emit(String tuple)
+  {
+    output.emit(tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1e979927/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index e9cd0d2..8868b9d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -20,7 +20,13 @@ package com.datatorrent.lib.io.fs;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -28,6 +34,7 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.fs.LineByLineFileInputOperator;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -41,20 +48,19 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.StatsListener;
+
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
 import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner.Partition;
-import com.datatorrent.api.StatsListener;
-
 public class AbstractFileInputOperatorTest
 {
   public static class TestMeta extends TestWatcher
@@ -101,7 +107,7 @@ public class AbstractFileInputOperatorTest
       FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n'));
     }
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -151,7 +157,7 @@ public class AbstractFileInputOperatorTest
   @Test
   public void testPartitioning() throws Exception
   {
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
 
@@ -190,12 +196,12 @@ public class AbstractFileInputOperatorTest
   public void testPartitioningStateTransfer() throws Exception
   {
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
     oper.setScanIntervalMillis(0);
 
-    FileLineInputOperator initialState = new Kryo().copy(oper);
+    LineByLineFileInputOperator initialState = new Kryo().copy(oper);
 
     // Create 4 files with 3 records each.
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
@@ -245,7 +251,7 @@ public class AbstractFileInputOperatorTest
     /* Collect all operators in a list */
     List<AbstractFileInputOperator<String>> opers = Lists.newArrayList();
     for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
-      FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance();
+      LineByLineFileInputOperator oi = (LineByLineFileInputOperator)p.getPartitionedInstance();
       oi.setup(testMeta.context);
       oi.output.setSink(sink);
       opers.add(oi);
@@ -293,13 +299,13 @@ public class AbstractFileInputOperatorTest
   @Test
   public void testPartitioningStateTransferInterrupted() throws Exception
   {
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
     oper.setScanIntervalMillis(0);
     oper.setEmitBatchSize(2);
 
-    FileLineInputOperator initialState = new Kryo().copy(oper);
+    LineByLineFileInputOperator initialState = new Kryo().copy(oper);
 
     // Create 4 files with 3 records each.
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
@@ -349,7 +355,7 @@ public class AbstractFileInputOperatorTest
     /* Collect all operators in a list */
     List<AbstractFileInputOperator<String>> opers = Lists.newArrayList();
     for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
-      FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance();
+      LineByLineFileInputOperator oi = (LineByLineFileInputOperator)p.getPartitionedInstance();
       oi.setup(testMeta.context);
       oi.output.setSink(sink);
       opers.add(oi);
@@ -378,13 +384,13 @@ public class AbstractFileInputOperatorTest
   @Test
   public void testPartitioningStateTransferFailure() throws Exception
   {
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
     oper.setScanIntervalMillis(0);
     oper.setEmitBatchSize(2);
 
-    FileLineInputOperator initialState = new Kryo().copy(oper);
+    LineByLineFileInputOperator initialState = new Kryo().copy(oper);
 
     // Create 4 files with 3 records each.
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
@@ -434,7 +440,7 @@ public class AbstractFileInputOperatorTest
     /* Collect all operators in a list */
     List<AbstractFileInputOperator<String>> opers = Lists.newArrayList();
     for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
-      FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance();
+      LineByLineFileInputOperator oi = (LineByLineFileInputOperator)p.getPartitionedInstance();
       oi.setup(testMeta.context);
       oi.output.setSink(sink);
       opers.add(oi);
@@ -468,7 +474,7 @@ public class AbstractFileInputOperatorTest
     FileUtils.write(testFile, StringUtils.join(lines, '\n'));
 
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.scanner = null;
     oper.failedFiles.add(new AbstractFileInputOperator.FailedFile(testFile.getAbsolutePath(),
1));
 
@@ -503,7 +509,7 @@ public class AbstractFileInputOperatorTest
     File testFile = new File(testMeta.dir, "file0");
     FileUtils.write(testFile, StringUtils.join(lines, '\n'));
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.scanner = null;
     oper.unfinishedFiles.add(new AbstractFileInputOperator.FailedFile(testFile.getAbsolutePath(),
2));
 
@@ -538,7 +544,7 @@ public class AbstractFileInputOperatorTest
     File testFile = new File(testMeta.dir, "file0");
     FileUtils.write(testFile, StringUtils.join(lines, '\n'));
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.scanner = null;
     oper.pendingFiles.add(testFile.getAbsolutePath());
 
@@ -573,7 +579,7 @@ public class AbstractFileInputOperatorTest
     File testFile = new File(testMeta.dir, "file0");
     FileUtils.write(testFile, StringUtils.join(lines, '\n'));
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.scanner = null;
     oper.currentFile = testFile.getAbsolutePath();
     oper.offset = 1;
@@ -611,7 +617,7 @@ public class AbstractFileInputOperatorTest
       FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
     }
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
     manager.setRecoveryPath(testMeta.dir + "/recovery");
 
@@ -660,7 +666,7 @@ public class AbstractFileInputOperatorTest
       FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
     }
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
     manager.setRecoveryPath(testMeta.dir + "/recovery");
 
@@ -703,7 +709,7 @@ public class AbstractFileInputOperatorTest
     }
     FileUtils.write(new File(testMeta.dir, "file0"), StringUtils.join(lines, '\n'));
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
     manager.setRecoveryPath(testMeta.dir + "/recovery");
     oper.setEmitBatchSize(5);
@@ -765,7 +771,7 @@ public class AbstractFileInputOperatorTest
       FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
     }
 
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
 
     IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
     manager.setRecoveryPath(testMeta.dir + "/recovery");
@@ -815,7 +821,7 @@ public class AbstractFileInputOperatorTest
   @Test
   public void testIdempotentStorageManagerPartitioning() throws Exception
   {
-    FileLineInputOperator oper = new FileLineInputOperator();
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
     oper.setIdempotentStorageManager(new TestStorageManager());


Mime
View raw message