beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [01/50] beam git commit: Return a valid Coder for any subtype of Mutation on HBaseCoderProviderRegistrar
Date Fri, 23 Jun 2017 03:04:27 GMT
Repository: beam
Updated Branches:
  refs/heads/gearpump-runner 559e3c341 -> 99f4f8b1b


Return a valid Coder for any subtype of Mutation on HBaseCoderProviderRegistrar


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b6d20d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b6d20d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b6d20d9

Branch: refs/heads/gearpump-runner
Commit: 6b6d20d9dc5afa0c1d8520cf6dbc98e6488a58a2
Parents: a40d11c
Author: Ismaël Mejía <iemejia@apache.org>
Authored: Wed Jun 21 01:04:18 2017 +0200
Committer: Ismaël Mejía <iemejia@apache.org>
Committed: Wed Jun 21 07:29:51 2017 +0200

----------------------------------------------------------------------
 .../io/hbase/HBaseCoderProviderRegistrar.java   | 11 +----
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   | 42 ++++++++++++++++++++
 .../hbase/HBaseCoderProviderRegistrarTest.java  |  4 ++
 3 files changed, 47 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6b6d20d9/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
index dee3c70..2973d1b 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
@@ -24,11 +24,6 @@ import org.apache.beam.sdk.coders.CoderProvider;
 import org.apache.beam.sdk.coders.CoderProviderRegistrar;
 import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 
 /**
@@ -39,11 +34,7 @@ public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar
{
   @Override
   public List<CoderProvider> getCoderProviders() {
     return ImmutableList.of(
-      CoderProviders.forCoder(TypeDescriptor.of(Append.class), HBaseMutationCoder.of()),
-      CoderProviders.forCoder(TypeDescriptor.of(Delete.class), HBaseMutationCoder.of()),
-      CoderProviders.forCoder(TypeDescriptor.of(Increment.class), HBaseMutationCoder.of()),
-      CoderProviders.forCoder(TypeDescriptor.of(Mutation.class), HBaseMutationCoder.of()),
-      CoderProviders.forCoder(TypeDescriptor.of(Put.class), HBaseMutationCoder.of()),
+      HBaseMutationCoder.getCoderProvider(),
       CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6b6d20d9/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 501fe09..ee83114 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -21,8 +21,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.List;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -65,4 +69,42 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements
Serializable {
       throw new IllegalArgumentException("Only Put and Delete are supported");
     }
   }
+
+  /**
+   * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for
+   * {@link Mutation mutations}.
+   */
+  static CoderProvider getCoderProvider() {
+    return HBASE_MUTATION_CODER_PROVIDER;
+  }
+
+  private static final CoderProvider HBASE_MUTATION_CODER_PROVIDER =
+    new HBaseMutationCoderProvider();
+
+  /**
+   * A {@link CoderProvider} for {@link Mutation mutations}.
+   */
+  private static class HBaseMutationCoderProvider extends CoderProvider {
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException
{
+      if (!typeDescriptor.isSubtypeOf(HBASE_MUTATION_TYPE_DESCRIPTOR)) {
+        throw new CannotProvideCoderException(
+          String.format(
+            "Cannot provide %s because %s is not a subclass of %s",
+            HBaseMutationCoder.class.getSimpleName(),
+            typeDescriptor,
+            Mutation.class.getName()));
+      }
+
+      try {
+        return (Coder<T>) HBaseMutationCoder.of();
+      } catch (IllegalArgumentException e) {
+        throw new CannotProvideCoderException(e);
+      }
+    }
+  }
+
+  private static final TypeDescriptor<Mutation> HBASE_MUTATION_TYPE_DESCRIPTOR =
+    new TypeDescriptor<Mutation>() {};
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6b6d20d9/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
index ac81e8a..5b2e138 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
@@ -18,7 +18,9 @@
 package org.apache.beam.sdk.io.hbase;
 
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,5 +39,7 @@ public class HBaseCoderProviderRegistrarTest {
   @Test
   public void testMutationCoderIsRegistered() throws Exception {
     CoderRegistry.createDefault().getCoder(Mutation.class);
+    CoderRegistry.createDefault().getCoder(Put.class);
+    CoderRegistry.createDefault().getCoder(Delete.class);
   }
 }


Mime
View raw message