beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Fix HDFSFileSource’s split size estimate
Date Tue, 31 Jan 2017 05:03:08 GMT
Repository: beam
Updated Branches:
  refs/heads/master bcc2806c0 -> 4e425ca1b


Fix HDFSFileSource’s split size estimate


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/086e1674
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/086e1674
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/086e1674

Branch: refs/heads/master
Commit: 086e1674641be37aafd1235f2ac2db2da012376b
Parents: bcc2806
Author: Igor Bernstein <igorbernstein@google.com>
Authored: Sun Jan 29 00:00:02 2017 -0500
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Jan 30 21:02:55 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 10 +++++++++
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 23 ++++++++++++++++++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/086e1674/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 61660de..1affb4a 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -237,7 +237,14 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K,
V>> {
   @Override
   public long getEstimatedSizeBytes(PipelineOptions options) {
     long size = 0;
+
     try {
+      // If this source represents a split from splitIntoBundles, then return the size of
the split,
+      // rather then the entire input
+      if (serializableSplit != null) {
+        return serializableSplit.getSplit().getLength();
+      }
+
       Job job = Job.getInstance(); // new instance
       for (FileStatus st : listStatus(createFormat(job), job)) {
         size += st.getLen();
@@ -245,6 +252,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K,
V>> {
     } catch (IOException | NoSuchMethodException | InvocationTargetException
         | IllegalAccessException | InstantiationException e) {
       // ignore, and return 0
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      // ignore, and return 0
     }
     return size;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/086e1674/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
index 6145952..4c3f1ce 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -151,6 +151,29 @@ public class HDFSFileSourceTest {
     assertTrue(nonEmptySplits > 2);
   }
 
+  @Test
+  public void testSplitEstimatedSize() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000,
0);
+    File file = createFileWithData("tmp.avro", expectedResults);
+
+    HDFSFileSource<IntWritable, Text> source =
+        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+            IntWritable.class, Text.class);
+
+    long originalSize = source.getEstimatedSizeBytes(options);
+    long splitTotalSize = 0;
+    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.splitIntoBundles(
+        SequenceFile.SYNC_INTERVAL, options
+    );
+    for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
+      splitTotalSize += splitSource.getEstimatedSizeBytes(options);
+    }
+    // Assert that the estimated size of the whole is the sum of its parts
+    assertEquals(originalSize, splitTotalSize);
+  }
+
   private File createFileWithData(String filename, List<KV<IntWritable, Text>>
records)
       throws IOException {
     File tmpFile = tmpFolder.newFile(filename);


Mime
View raw message