beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: proposal for new UDF
Date Mon, 17 Jul 2017 15:14:24 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL a452b8020 -> bed209e41


proposal for new UDF


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a49e4783
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a49e4783
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a49e4783

Branch: refs/heads/DSL_SQL
Commit: a49e47830e5689b9b392d23813626b1cf9636ca6
Parents: 5fea746
Author: mingmxu <mingmxu@ebay.com>
Authored: Thu Jul 13 23:22:14 2017 -0700
Committer: mingmxu <mingmxu@ebay.com>
Committed: Thu Jul 13 23:22:14 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  |  9 +++--
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    |  5 ++-
 .../apache/beam/dsls/sql/schema/BeamSqlUdf.java | 41 ++++++++++++++++++++
 .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java    |  9 +++--
 4 files changed, 54 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index ec3799c..d902f42 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -128,8 +129,8 @@ public class BeamSql {
     /**
      * register a UDF function used in this query.
      */
-     public QueryTransform withUdf(String functionName, Class<?> clazz, String methodName){
-       getSqlEnv().registerUdf(functionName, clazz, methodName);
+     public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf>
clazz){
+       getSqlEnv().registerUdf(functionName, clazz);
        return this;
      }
 
@@ -196,8 +197,8 @@ public class BeamSql {
     /**
      * register a UDF function used in this query.
      */
-     public SimpleQueryTransform withUdf(String functionName, Class<?> clazz, String
methodName){
-       getSqlEnv().registerUdf(functionName, clazz, methodName);
+     public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf>
clazz){
+       getSqlEnv().registerUdf(functionName, clazz);
        return this;
      }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
index 61f0355..e8c8c97 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.Enumerable;
@@ -55,8 +56,8 @@ public class BeamSqlEnv implements Serializable{
   /**
    * Register a UDF function which can be used in SQL expression.
    */
-  public void registerUdf(String functionName, Class<?> clazz, String methodName) {
-    schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName));
+  public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+    schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
new file mode 100644
index 0000000..2066353
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Interface to create a UDF in Beam SQL.
+ *
+ * <p>A static method {@code eval} is required. Here is an example:
+ *
+ * <blockquote><pre>
+ * public static class MyLeftFunction {
+ *   public String eval(
+ *       &#64;Parameter(name = "s") String s,
+ *       &#64;Parameter(name = "n", optional = true) Integer n) {
+ *     return s.substring(0, n == null ? 1 : n);
+ *   }
+ * }</pre></blockquote>
+ *
+ * <p>The first parameter is named "s" and is mandatory,
+ * and the second parameter is named "n" and is optional.
+ */
+public interface BeamSqlUdf extends Serializable {
+  String UDF_METHOD = "eval";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
index ba3e87e..332a273 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -78,14 +79,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
     String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int
= 2";
     PCollection<BeamSqlRow> result1 =
         boundedInput1.apply("testUdf1",
-            BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class, "cubic"));
+            BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class));
     PAssert.that(result1).containsInAnyOrder(record);
 
     String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int
= 2";
     PCollection<BeamSqlRow> result2 =
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
         .apply("testUdf2",
-            BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class, "cubic"));
+            BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
     PAssert.that(result2).containsInAnyOrder(record);
 
     pipeline.run().waitUntilFinish();
@@ -129,8 +130,8 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
   /**
    * A example UDF for test.
    */
-  public static class CubicInteger{
-    public static Integer cubic(Integer input){
+  public static class CubicInteger implements BeamSqlUdf{
+    public static Integer eval(Integer input){
       return input * input * input;
     }
   }


Mime
View raw message