Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B747E200C53 for ; Tue, 11 Apr 2017 18:58:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B5E17160B9B; Tue, 11 Apr 2017 16:58:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 88612160B7D for ; Tue, 11 Apr 2017 18:58:57 +0200 (CEST) Received: (qmail 34102 invoked by uid 500); 11 Apr 2017 16:58:56 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 34092 invoked by uid 99); 11 Apr 2017 16:58:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Apr 2017 16:58:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 11DEDDFF9F; Tue, 11 Apr 2017 16:58:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.apache.org Date: Tue, 11 Apr 2017 16:58:56 -0000 Message-Id: <036143ee9a2b47f7bc3262f95df91aa6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Add Coder utilities for Proto conversions archived-at: Tue, 11 Apr 2017 16:58:58 -0000 Repository: beam Updated Branches: refs/heads/master 986fcefca -> 8beea73c1 Add Coder utilities for Proto conversions Include Known Coders Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/01e5a8d5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/01e5a8d5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/01e5a8d5 Branch: refs/heads/master Commit: 01e5a8d5c7517f511d5d7bfc524319a02e6d2e21 Parents: 986fcef Author: Thomas Groh Authored: Fri Apr 7 11:46:24 2017 -0700 Committer: Thomas Groh Committed: Tue Apr 11 09:58:37 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/construction/Coders.java | 162 +++++++++++++++++++ .../core/construction/SdkComponents.java | 5 +- .../runners/core/construction/CodersTest.java | 99 ++++++++++++ .../core/construction/SdkComponentsTest.java | 25 ++- 4 files changed, 288 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/01e5a8d5/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java new file mode 100644 index 0000000..d890de7 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.Serializer; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; + +/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */ +public class Coders { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + // This URN says that the coder is just a UDF blob this SDK understands + // TODO: standardize such things + public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; + + // The URNs for coders which are shared across languages + private static final BiMap, String> KNOWN_CODER_URNS = + ImmutableBiMap., String>builder() + .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1") + .put(KvCoder.class, "urn:beam:coders:kv:0.1") + .put(VarIntCoder.class, "urn:beam:coders:varint:0.1") + .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1") + .put(IterableCoder.class, "urn:beam:coders:stream:0.1") + .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1") + .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1") + .build(); + + public static RunnerApi.Coder toProto( + Coder coder, @SuppressWarnings("unused") SdkComponents components) throws IOException { + if (KNOWN_CODER_URNS.containsKey(coder.getClass())) { + return toKnownCoder(coder, components); + } + return toCustomCoder(coder); + } + + private static RunnerApi.Coder toKnownCoder(Coder coder, SdkComponents components) + throws IOException { + List componentIds = new ArrayList<>(); + for (Coder componentCoder : coder.getCoderArguments()) { + componentIds.add(components.registerCoder(componentCoder)); + } + return RunnerApi.Coder.newBuilder() + .addAllComponentCoderIds(componentIds) + .setSpec( + SdkFunctionSpec.newBuilder() + .setSpec(FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(coder.getClass())))) + .build(); + } + + private static RunnerApi.Coder toCustomCoder(Coder coder) throws IOException { + RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder(); + return coderBuilder + .setSpec( + SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_CODER_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject()))) + .build())))) + .build(); + } + + public static Coder fromProto(RunnerApi.Coder protoCoder, Components components) + throws IOException { + String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn(); + if (coderSpecUrn.equals(CUSTOM_CODER_URN)) { + return fromCustomCoder(protoCoder, components); + } + return fromKnownCoder(protoCoder, components); + } + + private static Coder fromKnownCoder(RunnerApi.Coder coder, Components components) + throws IOException { + String coderUrn = coder.getSpec().getSpec().getUrn(); + List> coderComponents = new LinkedList<>(); + for (String componentId : coder.getComponentCoderIdsList()) { + Coder innerCoder = fromProto(components.getCodersOrThrow(componentId), components); + coderComponents.add(innerCoder); + } + switch (coderUrn) { + case "urn:beam:coders:bytes:0.1": + return ByteArrayCoder.of(); + case "urn:beam:coders:kv:0.1": + return KvCoder.of(coderComponents); + case "urn:beam:coders:varint:0.1": + return VarLongCoder.of(); + case "urn:beam:coders:interval_window:0.1": + return IntervalWindowCoder.of(); + case "urn:beam:coders:stream:0.1": + return IterableCoder.of(coderComponents); + case "urn:beam:coders:global_window:0.1": + return GlobalWindow.Coder.INSTANCE; + case "urn:beam:coders:windowed_value:0.1": + return WindowedValue.FullWindowedValueCoder.of(coderComponents); + default: + throw new IllegalStateException( + String.format( + "Unknown coder URN %s. Known URNs: %s", coderUrn, KNOWN_CODER_URNS.values())); + } + } + + private static Coder fromCustomCoder( + RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components) + throws IOException { + CloudObject coderCloudObject = + OBJECT_MAPPER.readValue( + protoCoder + .getSpec() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + CloudObject.class); + return Serializer.deserialize(coderCloudObject, Coder.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/01e5a8d5/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index c4b8cf1..5cb0a00 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.core.construction; import com.google.common.base.Equivalence; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import java.io.IOException; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; @@ -119,7 +120,7 @@ class SdkComponents { * #equals(Object)} and {@link #hashCode()} but incompatible binary formats are not considered the * same coder. */ - String registerCoder(Coder coder) { + String registerCoder(Coder coder) throws IOException { String existing = coderIds.get(Equivalence.identity().wrap(coder)); if (existing != null) { return existing; @@ -127,6 +128,8 @@ class SdkComponents { String baseName = NameUtils.approximateSimpleName(coder); String name = uniqify(baseName, coderIds.values()); coderIds.put(Equivalence.identity().wrap(coder), name); + RunnerApi.Coder coderProto = Coders.toProto(coder, this); + componentsBuilder.putCoders(name, coderProto); return name; } http://git-wip-us.apache.org/repos/asf/beam/blob/01e5a8d5/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java new file mode 100644 index 0000000..1a657b2 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link Coders}. + */ +@RunWith(Parameterized.class) +public class CodersTest { + @Parameters(name = "{index}: {0}") + public static Iterable> data() { + return ImmutableList.>of( + StringUtf8Coder.of(), + IterableCoder.of(VarLongCoder.of()), + KvCoder.of(StringUtf8Coder.of(), ListCoder.of(VarLongCoder.of())), + SerializableCoder.of(Record.class), + new RecordCoder(), + KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class))); + } + + @Parameter(0) + public Coder coder; + + @Test + public void toAndFromProto() throws Exception { + SdkComponents componentsBuilder = SdkComponents.create(); + RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder); + + Components encodedComponents = componentsBuilder.toComponents(); + Coder decodedCoder = Coders.fromProto(coderProto, encodedComponents); + assertThat(decodedCoder, Matchers.>equalTo(coder)); + } + + static class Record implements Serializable { + } + + private static class RecordCoder extends CustomCoder { + @Override + public void encode(Record value, OutputStream outStream, Context context) + throws CoderException, IOException {} + + @Override + public Record decode(InputStream inStream, Context context) throws CoderException, IOException { + return new Record(); + } + + @Override + public boolean equals(Object other) { + return other != null && getClass().equals(other.getClass()); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/01e5a8d5/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index c96e57c..28b4911 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; +import java.io.IOException; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -39,6 +40,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -56,13 +58,32 @@ public class SdkComponentsTest { private SdkComponents components = SdkComponents.create(); @Test - public void registerCoder() { + public void registerCoder() throws IOException { Coder coder = KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of()))); String id = components.registerCoder(coder); assertThat(components.registerCoder(coder), equalTo(id)); assertThat(id, not(isEmptyOrNullString())); - assertThat(components.registerCoder(VarLongCoder.of()), not(equalTo(id))); + VarLongCoder otherCoder = VarLongCoder.of(); + assertThat(components.registerCoder(otherCoder), not(equalTo(id))); + + components.toComponents().getCodersOrThrow(id); + components.toComponents().getCodersOrThrow(components.registerCoder(otherCoder)); + } + + @Test + public void registerCoderEqualsNotSame() throws IOException { + Coder coder = + KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of()))); + Coder otherCoder = + KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of()))); + assertThat(coder, Matchers.>equalTo(otherCoder)); + String id = components.registerCoder(coder); + String otherId = components.registerCoder(otherCoder); + assertThat(otherId, not(equalTo(id))); + + components.toComponents().getCodersOrThrow(id); + components.toComponents().getCodersOrThrow(otherId); } @Test