beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/5] beam git commit: [BEAM-1540] Small refactorings for HBaseIO
Date Mon, 27 Feb 2017 21:13:16 GMT
[BEAM-1540] Small refactorings for HBaseIO

Rename result to current + make some attributes final
Refactor SerializableScan to use get()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16a2bc0e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16a2bc0e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16a2bc0e

Branch: refs/heads/master
Commit: 16a2bc0ef860b02f5a8da01b0bd808973310c206
Parents: 715b95a
Author: Ismaël Mejía <iemejia@gmail.com>
Authored: Thu Feb 23 10:23:04 2017 +0100
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Feb 27 13:13:03 2017 -0800

----------------------------------------------------------------------
 sdks/java/io/hbase/pom.xml                      |  5 ++-
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 32 ++++++++++----------
 .../beam/sdk/io/hbase/SerializableScan.java     | 10 ++++--
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  2 +-
 4 files changed, 27 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/16a2bc0e/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index f4a06a9..dfcca7a 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -196,9 +196,8 @@
     </dependency>
 
     <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <version>2.6</version>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
       <scope>test</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16a2bc0e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 3c49db6..ed191cb 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -217,7 +217,7 @@ public class HBaseIO {
          */
         public Read withFilter(Filter filter) {
             checkNotNull(filter, "filter");
-            return withScan(serializableScan.getScan().setFilter(filter));
+            return withScan(serializableScan.get().setFilter(filter));
         }
 
         /**
@@ -229,7 +229,7 @@ public class HBaseIO {
             checkNotNull(keyRange, "keyRange");
             byte[] startRow = keyRange.getStartKey().getBytes();
             byte[] stopRow = keyRange.getEndKey().getBytes();
-            return withScan(serializableScan.getScan().setStartRow(startRow).setStopRow(stopRow));
+            return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
         }
 
         /**
@@ -279,7 +279,7 @@ public class HBaseIO {
             builder.add(DisplayData.item("configuration",
                     serializableConfiguration.get().toString()));
             builder.add(DisplayData.item("tableId", tableId));
-            builder.addIfNotNull(DisplayData.item("scan", serializableScan.getScan().toString()));
+            builder.addIfNotNull(DisplayData.item("scan", serializableScan.get().toString()));
         }
 
         public String getTableId() {
@@ -294,18 +294,18 @@ public class HBaseIO {
          * Returns the range of keys that will be read from the table.
          */
         public ByteKeyRange getKeyRange() {
-            byte[] startRow = serializableScan.getScan().getStartRow();
-            byte[] stopRow = serializableScan.getScan().getStopRow();
+            byte[] startRow = serializableScan.get().getStartRow();
+            byte[] stopRow = serializableScan.get().getStopRow();
             return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
         }
 
-        private SerializableConfiguration serializableConfiguration;
-        private String tableId;
-        private SerializableScan serializableScan;
+        private final SerializableConfiguration serializableConfiguration;
+        private final String tableId;
+        private final SerializableScan serializableScan;
     }
 
     static class HBaseSource extends BoundedSource<Result> {
-        private Read read;
+        private final Read read;
         @Nullable private Long estimatedSizeBytes;
 
         HBaseSource(Read read, @Nullable Long estimatedSizeBytes) {
@@ -318,7 +318,7 @@ public class HBaseIO {
             if (estimatedSizeBytes == null) {
                 estimatedSizeBytes = estimateSizeBytes();
                 LOG.debug("Estimated size {} bytes for table {} and scan {}", estimatedSizeBytes,
-                        read.tableId, read.serializableScan.getScan());
+                        read.tableId, read.serializableScan.get());
             }
             return estimatedSizeBytes;
         }
@@ -360,7 +360,7 @@ public class HBaseIO {
         }
 
         private List<HRegionLocation> getRegionLocations(Connection connection) throws
Exception {
-            final Scan scan = read.serializableScan.getScan();
+            final Scan scan = read.serializableScan.get();
             byte[] startRow = scan.getStartRow();
             byte[] stopRow = scan.getStopRow();
 
@@ -390,7 +390,7 @@ public class HBaseIO {
         private List<HBaseSource>
             splitBasedOnRegions(List<HRegionLocation> regionLocations, int numSplits)
                 throws Exception {
-            final Scan scan = read.serializableScan.getScan();
+            final Scan scan = read.serializableScan.get();
             byte[] startRow = scan.getStartRow();
             byte[] stopRow = scan.getStopRow();
 
@@ -478,7 +478,7 @@ public class HBaseIO {
         private Connection connection;
         private ResultScanner scanner;
         private Iterator<Result> iter;
-        private Result result;
+        private Result current;
         private long recordsReturned;
 
         HBaseReader(HBaseSource source) {
@@ -492,7 +492,7 @@ public class HBaseIO {
             connection = ConnectionFactory.createConnection(configuration);
             TableName tableName = TableName.valueOf(tableId);
             Table table = connection.getTable(tableName);
-            Scan scan = source.read.serializableScan.getScan();
+            Scan scan = source.read.serializableScan.get();
             scanner = table.getScanner(scan);
             iter = scanner.iterator();
             return advance();
@@ -500,14 +500,14 @@ public class HBaseIO {
 
         @Override
         public Result getCurrent() throws NoSuchElementException {
-            return result;
+            return current;
         }
 
         @Override
         public boolean advance() throws IOException {
             boolean hasRecord = iter.hasNext();
             if (hasRecord) {
-                result = iter.next();
+                current = iter.next();
                 ++recordsReturned;
             }
             return hasRecord;

http://git-wip-us.apache.org/repos/asf/beam/blob/16a2bc0e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
index df575b0..f3bc7ac 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
@@ -26,12 +26,18 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 
 /**
- * This is just a wrapper class to serialize HBase {@link Scan}.
+ * This is just a wrapper class to serialize HBase {@link Scan} using Protobuf.
  */
 class SerializableScan implements Serializable {
     private transient Scan scan;
 
+    public SerializableScan() {
+    }
+
     public SerializableScan(Scan scan) {
+        if (scan == null) {
+            throw new NullPointerException("Scan must not be null.");
+        }
         this.scan = scan;
     }
 
@@ -43,7 +49,7 @@ class SerializableScan implements Serializable {
         scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in));
     }
 
-    public Scan getScan() {
+    public Scan get() {
         return scan;
     }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16a2bc0e/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 1d49f9d..774e17e 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;


Mime
View raw message