beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: upgrade to version 2.1.0-SNAPSHOT
Date Thu, 08 Jun 2017 11:44:25 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 4c5b7584a -> 5c1f2cbc6


upgrade to version 2.1.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03a913a9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03a913a9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03a913a9

Branch: refs/heads/DSL_SQL
Commit: 03a913a95c99474841a175b727925ba7c1eed4c9
Parents: 4c5b758
Author: mingmxu <mingmxu@ebay.com>
Authored: Wed Jun 7 19:27:32 2017 -0700
Committer: mingmxu <mingmxu@ebay.com>
Committed: Wed Jun 7 19:27:32 2017 -0700

----------------------------------------------------------------------
 dsls/pom.xml                                    |  2 +-
 dsls/sql/pom.xml                                | 43 +++-----------
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   | 59 ++++++++++----------
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   | 20 +++++--
 .../dsls/sql/schema/text/BeamTextCSVTable.java  |  2 +-
 .../schema/text/BeamTextCSVTableIOWriter.java   |  2 +-
 6 files changed, 52 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/pom.xml b/dsls/pom.xml
index 6f9d635..a741563 100644
--- a/dsls/pom.xml
+++ b/dsls/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.7.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index bc658e6..39e32c4 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-dsls-parent</artifactId>
-    <version>0.7.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>beam-dsls-sql</artifactId>
@@ -117,41 +117,6 @@
     </plugins>
   </build>
 
-  <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>beam-sdks-java-core</artifactId>
-        <version>0.6.0</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>beam-runners-direct-java</artifactId>
-        <version>0.6.0</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>beam-sdks-java-io-kafka</artifactId>
-        <version>0.6.0</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>beam-runners-core-java</artifactId>
-        <version>0.6.0</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>beam-sdks-common-runner-api</artifactId>
-        <version>0.6.0</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>beam-runners-core-construction-java</artifactId>
-        <version>0.6.0</version>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
-
   <dependencies>
     <dependency>
       <groupId>junit</groupId>
@@ -213,5 +178,11 @@
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.10.1.0</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index f161d27..14a0f31 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -54,9 +54,8 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> {
   }
 
   @Override
-  public void encode(BeamSQLRow value, OutputStream outStream,
-      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException
{
-    listCoder.encode(value.getNullFields(), outStream, context.nested());
+  public void encode(BeamSQLRow value, OutputStream outStream) throws CoderException, IOException
{
+    listCoder.encode(value.getNullFields(), outStream);
 
     for (int idx = 0; idx < value.size(); ++idx) {
       if (value.getNullFields().contains(idx)) {
@@ -65,36 +64,35 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> {
 
       switch (value.getDataType().getFieldsType().get(idx)) {
         case INTEGER:
-          intCoder.encode(value.getInteger(idx), outStream, context.nested());
+          intCoder.encode(value.getInteger(idx), outStream);
           break;
         case SMALLINT:
-          intCoder.encode((int) value.getShort(idx), outStream, context.nested());
+          intCoder.encode((int) value.getShort(idx), outStream);
           break;
         case TINYINT:
-          intCoder.encode((int) value.getByte(idx), outStream, context.nested());
+          intCoder.encode((int) value.getByte(idx), outStream);
           break;
         case DOUBLE:
-          doubleCoder.encode(value.getDouble(idx), outStream, context.nested());
+          doubleCoder.encode(value.getDouble(idx), outStream);
           break;
         case FLOAT:
-          doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested());
+          doubleCoder.encode((double) value.getFloat(idx), outStream);
           break;
         case DECIMAL:
-          bigDecimalCoder.encode(value.getBigDecimal(idx), outStream, context.nested());
+          bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
           break;
         case BIGINT:
-          longCoder.encode(value.getLong(idx), outStream, context.nested());
+          longCoder.encode(value.getLong(idx), outStream);
           break;
         case VARCHAR:
         case CHAR:
-          stringCoder.encode(value.getString(idx), outStream, context.nested());
+          stringCoder.encode(value.getString(idx), outStream);
           break;
         case TIME:
-          longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(),
-              outStream, context.nested());
+          longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
           break;
         case TIMESTAMP:
-          longCoder.encode(value.getDate(idx).getTime(), outStream, context.nested());
+          longCoder.encode(value.getDate(idx).getTime(), outStream);
           break;
 
         default:
@@ -102,14 +100,13 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> {
       }
     }
 
-    instantCoder.encode(value.getWindowStart(), outStream, context.nested());
-    instantCoder.encode(value.getWindowEnd(), outStream, context);
+    instantCoder.encode(value.getWindowStart(), outStream);
+    instantCoder.encode(value.getWindowEnd(), outStream);
   }
 
   @Override
-  public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context
context)
-      throws CoderException, IOException {
-    List<Integer> nullFields = listCoder.decode(inStream, context.nested());
+  public BeamSQLRow decode(InputStream inStream) throws CoderException, IOException {
+    List<Integer> nullFields = listCoder.decode(inStream);
 
     BeamSQLRow record = new BeamSQLRow(tableSchema);
     record.setNullFields(nullFields);
@@ -121,37 +118,37 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> {
 
       switch (tableSchema.getFieldsType().get(idx)) {
         case INTEGER:
-          record.addField(idx, intCoder.decode(inStream, context.nested()));
+          record.addField(idx, intCoder.decode(inStream));
           break;
         case SMALLINT:
-          record.addField(idx, intCoder.decode(inStream, context.nested()).shortValue());
+          record.addField(idx, intCoder.decode(inStream).shortValue());
           break;
         case TINYINT:
-          record.addField(idx, intCoder.decode(inStream, context.nested()).byteValue());
+          record.addField(idx, intCoder.decode(inStream).byteValue());
           break;
         case DOUBLE:
-          record.addField(idx, doubleCoder.decode(inStream, context.nested()));
+          record.addField(idx, doubleCoder.decode(inStream));
           break;
         case FLOAT:
-          record.addField(idx, doubleCoder.decode(inStream, context.nested()).floatValue());
+          record.addField(idx, doubleCoder.decode(inStream).floatValue());
           break;
         case BIGINT:
-          record.addField(idx, longCoder.decode(inStream, context.nested()));
+          record.addField(idx, longCoder.decode(inStream));
           break;
         case DECIMAL:
-          record.addField(idx, bigDecimalCoder.decode(inStream, context.nested()));
+          record.addField(idx, bigDecimalCoder.decode(inStream));
           break;
         case VARCHAR:
         case CHAR:
-          record.addField(idx, stringCoder.decode(inStream, context.nested()));
+          record.addField(idx, stringCoder.decode(inStream));
           break;
         case TIME:
           GregorianCalendar calendar = new GregorianCalendar();
-          calendar.setTime(new Date(longCoder.decode(inStream, context.nested())));
+          calendar.setTime(new Date(longCoder.decode(inStream)));
           record.addField(idx, calendar);
           break;
         case TIMESTAMP:
-          record.addField(idx, new Date(longCoder.decode(inStream, context.nested())));
+          record.addField(idx, new Date(longCoder.decode(inStream)));
           break;
 
         default:
@@ -159,8 +156,8 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> {
       }
     }
 
-    record.setWindowStart(instantCoder.decode(inStream, context.nested()));
-    record.setWindowEnd(instantCoder.decode(inStream, context));
+    record.setWindowStart(instantCoder.decode(inStream));
+    record.setWindowEnd(instantCoder.decode(inStream));
 
     return record;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index 7342cee..aa7cf3a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -33,6 +33,8 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 
 /**
  * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
@@ -75,9 +77,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
   @Override
   public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply("read",
-            KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics)
-                .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of())
-                .withValueCoder(ByteArrayCoder.of()).withoutMetadata())
+            KafkaIO.<byte[], byte[]>read()
+                .withBootstrapServers(bootstrapServers)
+                .withTopics(topics)
+                .updateConsumerProperties(configUpdates)
+                .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+                .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+                .withoutMetadata())
             .apply("in_format", getPTransformForInput());
   }
 
@@ -90,9 +96,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
       @Override
       public PDone expand(PCollection<BeamSQLRow> input) {
         return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
-            KafkaIO.<byte[], byte[]>write().withBootstrapServers(bootstrapServers)
-                .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of())
-                .withValueCoder(ByteArrayCoder.of()));
+            KafkaIO.<byte[], byte[]>write()
+                .withBootstrapServers(bootstrapServers)
+                .withTopic(topics.get(0))
+                .withKeySerializer(ByteArraySerializer.class)
+                .withValueSerializer(ByteArraySerializer.class));
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
index 6b21289..41742c7 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -58,7 +58,7 @@ public class BeamTextCSVTable extends BeamTextTable {
 
   @Override
   public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline).apply("decodeRecord", TextIO.Read.from(filePattern))
+    return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
         .apply("parseCSVLine",
             new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
index eade842..9b9cbd2 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -55,6 +55,6 @@ public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSQLRow>
         BeamSQLRow row = ctx.element();
         ctx.output(beamSQLRow2CsvLine(row, csvFormat));
       }
-    })).apply(TextIO.Write.to(filePattern));
+    })).apply(TextIO.write().to(filePattern));
   }
 }


Mime
View raw message