Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 84BA2200C8E for ; Thu, 8 Jun 2017 13:44:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 83497160BD5; Thu, 8 Jun 2017 11:44:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9FA2E160BCA for ; Thu, 8 Jun 2017 13:44:26 +0200 (CEST) Received: (qmail 14357 invoked by uid 500); 8 Jun 2017 11:44:25 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 14348 invoked by uid 99); 8 Jun 2017 11:44:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jun 2017 11:44:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B78A2DFE5C; Thu, 8 Jun 2017 11:44:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbonofre@apache.org To: commits@beam.apache.org Date: Thu, 08 Jun 2017 11:44:25 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: upgrade to version 2.1.0-SNAPSHOT archived-at: Thu, 08 Jun 2017 11:44:27 -0000 Repository: beam Updated Branches: refs/heads/DSL_SQL 4c5b7584a -> 5c1f2cbc6 upgrade to version 2.1.0-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03a913a9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03a913a9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03a913a9 Branch: refs/heads/DSL_SQL Commit: 03a913a95c99474841a175b727925ba7c1eed4c9 Parents: 4c5b758 Author: mingmxu Authored: Wed Jun 7 19:27:32 2017 -0700 Committer: mingmxu Committed: Wed Jun 7 19:27:32 2017 -0700 ---------------------------------------------------------------------- dsls/pom.xml | 2 +- dsls/sql/pom.xml | 43 +++----------- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 59 ++++++++++---------- .../dsls/sql/schema/kafka/BeamKafkaTable.java | 20 +++++-- .../dsls/sql/schema/text/BeamTextCSVTable.java | 2 +- .../schema/text/BeamTextCSVTableIOWriter.java | 2 +- 6 files changed, 52 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/pom.xml b/dsls/pom.xml index 6f9d635..a741563 100644 --- a/dsls/pom.xml +++ b/dsls/pom.xml @@ -22,7 +22,7 @@ org.apache.beam beam-parent - 0.7.0-SNAPSHOT + 2.1.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index bc658e6..39e32c4 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -22,7 +22,7 @@ org.apache.beam beam-dsls-parent - 0.7.0-SNAPSHOT + 2.1.0-SNAPSHOT beam-dsls-sql @@ -117,41 +117,6 @@ - - - - org.apache.beam - beam-sdks-java-core - 0.6.0 - - - org.apache.beam - beam-runners-direct-java - 0.6.0 - - - org.apache.beam - beam-sdks-java-io-kafka - 0.6.0 - - - org.apache.beam - beam-runners-core-java - 0.6.0 - - - org.apache.beam - beam-sdks-common-runner-api - 0.6.0 - - - org.apache.beam - beam-runners-core-construction-java - 0.6.0 - - - - junit @@ -213,5 +178,11 @@ joda-time joda-time + + org.apache.kafka + kafka-clients + 0.10.1.0 + provided + http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index f161d27..14a0f31 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -54,9 +54,8 @@ public class BeamSqlRowCoder extends CustomCoder { } @Override - public void encode(BeamSQLRow value, OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - listCoder.encode(value.getNullFields(), outStream, context.nested()); + public void encode(BeamSQLRow value, OutputStream outStream) throws CoderException, IOException { + listCoder.encode(value.getNullFields(), outStream); for (int idx = 0; idx < value.size(); ++idx) { if (value.getNullFields().contains(idx)) { @@ -65,36 +64,35 @@ public class BeamSqlRowCoder extends CustomCoder { switch (value.getDataType().getFieldsType().get(idx)) { case INTEGER: - intCoder.encode(value.getInteger(idx), outStream, context.nested()); + intCoder.encode(value.getInteger(idx), outStream); break; case SMALLINT: - intCoder.encode((int) value.getShort(idx), outStream, context.nested()); + intCoder.encode((int) value.getShort(idx), outStream); break; case TINYINT: - intCoder.encode((int) value.getByte(idx), outStream, context.nested()); + intCoder.encode((int) value.getByte(idx), outStream); break; case DOUBLE: - doubleCoder.encode(value.getDouble(idx), outStream, context.nested()); + doubleCoder.encode(value.getDouble(idx), outStream); break; case FLOAT: - doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested()); + doubleCoder.encode((double) value.getFloat(idx), outStream); break; case DECIMAL: - bigDecimalCoder.encode(value.getBigDecimal(idx), outStream, context.nested()); + bigDecimalCoder.encode(value.getBigDecimal(idx), outStream); break; case BIGINT: - longCoder.encode(value.getLong(idx), outStream, context.nested()); + longCoder.encode(value.getLong(idx), outStream); break; case VARCHAR: case CHAR: - stringCoder.encode(value.getString(idx), outStream, context.nested()); + stringCoder.encode(value.getString(idx), outStream); break; case TIME: - longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), - outStream, context.nested()); + longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream); break; case TIMESTAMP: - longCoder.encode(value.getDate(idx).getTime(), outStream, context.nested()); + longCoder.encode(value.getDate(idx).getTime(), outStream); break; default: @@ -102,14 +100,13 @@ public class BeamSqlRowCoder extends CustomCoder { } } - instantCoder.encode(value.getWindowStart(), outStream, context.nested()); - instantCoder.encode(value.getWindowEnd(), outStream, context); + instantCoder.encode(value.getWindowStart(), outStream); + instantCoder.encode(value.getWindowEnd(), outStream); } @Override - public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - List nullFields = listCoder.decode(inStream, context.nested()); + public BeamSQLRow decode(InputStream inStream) throws CoderException, IOException { + List nullFields = listCoder.decode(inStream); BeamSQLRow record = new BeamSQLRow(tableSchema); record.setNullFields(nullFields); @@ -121,37 +118,37 @@ public class BeamSqlRowCoder extends CustomCoder { switch (tableSchema.getFieldsType().get(idx)) { case INTEGER: - record.addField(idx, intCoder.decode(inStream, context.nested())); + record.addField(idx, intCoder.decode(inStream)); break; case SMALLINT: - record.addField(idx, intCoder.decode(inStream, context.nested()).shortValue()); + record.addField(idx, intCoder.decode(inStream).shortValue()); break; case TINYINT: - record.addField(idx, intCoder.decode(inStream, context.nested()).byteValue()); + record.addField(idx, intCoder.decode(inStream).byteValue()); break; case DOUBLE: - record.addField(idx, doubleCoder.decode(inStream, context.nested())); + record.addField(idx, doubleCoder.decode(inStream)); break; case FLOAT: - record.addField(idx, doubleCoder.decode(inStream, context.nested()).floatValue()); + record.addField(idx, doubleCoder.decode(inStream).floatValue()); break; case BIGINT: - record.addField(idx, longCoder.decode(inStream, context.nested())); + record.addField(idx, longCoder.decode(inStream)); break; case DECIMAL: - record.addField(idx, bigDecimalCoder.decode(inStream, context.nested())); + record.addField(idx, bigDecimalCoder.decode(inStream)); break; case VARCHAR: case CHAR: - record.addField(idx, stringCoder.decode(inStream, context.nested())); + record.addField(idx, stringCoder.decode(inStream)); break; case TIME: GregorianCalendar calendar = new GregorianCalendar(); - calendar.setTime(new Date(longCoder.decode(inStream, context.nested()))); + calendar.setTime(new Date(longCoder.decode(inStream))); record.addField(idx, calendar); break; case TIMESTAMP: - record.addField(idx, new Date(longCoder.decode(inStream, context.nested()))); + record.addField(idx, new Date(longCoder.decode(inStream))); break; default: @@ -159,8 +156,8 @@ public class BeamSqlRowCoder extends CustomCoder { } } - record.setWindowStart(instantCoder.decode(inStream, context.nested())); - record.setWindowEnd(instantCoder.decode(inStream, context)); + record.setWindowStart(instantCoder.decode(inStream)); + record.setWindowEnd(instantCoder.decode(inStream)); return record; } http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java index 7342cee..aa7cf3a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -33,6 +33,8 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; /** * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to @@ -75,9 +77,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab @Override public PCollection buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply("read", - KafkaIO.read().withBootstrapServers(bootstrapServers).withTopics(topics) - .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of()) - .withValueCoder(ByteArrayCoder.of()).withoutMetadata()) + KafkaIO.read() + .withBootstrapServers(bootstrapServers) + .withTopics(topics) + .updateConsumerProperties(configUpdates) + .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withoutMetadata()) .apply("in_format", getPTransformForInput()); } @@ -90,9 +96,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab @Override public PDone expand(PCollection input) { return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", - KafkaIO.write().withBootstrapServers(bootstrapServers) - .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of()) - .withValueCoder(ByteArrayCoder.of())); + KafkaIO.write() + .withBootstrapServers(bootstrapServers) + .withTopic(topics.get(0)) + .withKeySerializer(ByteArraySerializer.class) + .withValueSerializer(ByteArraySerializer.class)); } }; } http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java index 6b21289..41742c7 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java @@ -58,7 +58,7 @@ public class BeamTextCSVTable extends BeamTextTable { @Override public PCollection buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply("decodeRecord", TextIO.Read.from(filePattern)) + return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) .apply("parseCSVLine", new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat)); } http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java index eade842..9b9cbd2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -55,6 +55,6 @@ public class BeamTextCSVTableIOWriter extends PTransform BeamSQLRow row = ctx.element(); ctx.output(beamSQLRow2CsvLine(row, csvFormat)); } - })).apply(TextIO.Write.to(filePattern)); + })).apply(TextIO.write().to(filePattern)); } }