Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 77134200BBD for ; Tue, 8 Nov 2016 17:03:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 75B6D160B0A; Tue, 8 Nov 2016 16:03:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7246C160AD0 for ; Tue, 8 Nov 2016 17:03:02 +0100 (CET) Received: (qmail 86410 invoked by uid 500); 8 Nov 2016 16:03:01 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 86401 invoked by uid 99); 8 Nov 2016 16:03:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Nov 2016 16:03:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3C01C1A00D0 for ; Tue, 8 Nov 2016 16:03:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id e8Ump_xlSsp1 for ; Tue, 8 Nov 2016 16:02:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 9245B5FC5F for ; Tue, 8 Nov 2016 16:02:58 +0000 (UTC) Received: (qmail 84585 invoked by uid 99); 8 Nov 2016 16:02:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Nov 2016 16:02:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0134E01F4; Tue, 8 Nov 2016 16:02:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Tue, 08 Nov 2016 16:02:23 -0000 Message-Id: <4e2b9e2e89e147b7a413e2864e036f78@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] incubator-beam git commit: Changes in AvroCoder serialization so it can serialize in Kryo archived-at: Tue, 08 Nov 2016 16:03:03 -0000 Repository: incubator-beam Updated Branches: refs/heads/master afa0c31bd -> bfc527d63 Changes in AvroCoder serialization so it can serialize in Kryo Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06c18468 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06c18468 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06c18468 Branch: refs/heads/master Commit: 06c1846860176cc2bd971f8ad7037c97594af866 Parents: afa0c31 Author: Aviem Zur Authored: Thu Sep 8 11:21:41 2016 +0300 Committer: Luke Cwik Committed: Tue Nov 8 07:47:34 2016 -0800 ---------------------------------------------------------------------- sdks/java/core/pom.xml | 7 ++ .../org/apache/beam/sdk/coders/AvroCoder.java | 126 +++++++++++-------- .../apache/beam/sdk/coders/AvroCoderTest.java | 33 +++++ 3 files changed, 112 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 17ef193..c7b46d8 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -473,5 +473,12 @@ google-cloud-dataflow-java-proto-library-all test + + + com.esotericsoftware.kryo + kryo + 2.21 + test + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 7894d14..4f0239e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.Serializable; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; @@ -164,7 +163,9 @@ public class AvroCoder extends StandardCoder { }; private final Class type; - private final transient Schema schema; + private transient Schema schema; + + private final String schemaStr; private final List nonDeterministicReasons; @@ -174,36 +175,16 @@ public class AvroCoder extends StandardCoder { // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe, // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use // an inner coder. - private final transient ThreadLocal decoder; - private final transient ThreadLocal encoder; - private final transient ThreadLocal> writer; - private final transient ThreadLocal> reader; + private transient ThreadLocal memoizedDecoder; + private transient ThreadLocal memoizedEncoder; + private transient ThreadLocal> memoizedWriter; + private transient ThreadLocal> memoizedReader; protected AvroCoder(Class type, Schema schema) { this.type = type; this.schema = schema; - + this.schemaStr = schema.toString(); nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema); - - // Decoder and Encoder start off null for each thread. They are allocated and potentially - // reused inside encode/decode. - this.decoder = new ThreadLocal<>(); - this.encoder = new ThreadLocal<>(); - - // Reader and writer are allocated once per thread and are "final" for thread-local Coder - // instance. - this.reader = new ThreadLocal>() { - @Override - public DatumReader initialValue() { - return createDatumReader(); - } - }; - this.writer = new ThreadLocal>() { - @Override - public DatumWriter initialValue() { - return createDatumWriter(); - } - }; } /** @@ -246,33 +227,29 @@ public class AvroCoder extends StandardCoder { return type; } - private Object writeReplace() { - // When serialized by Java, instances of AvroCoder should be replaced by - // a SerializedAvroCoderProxy. - return new SerializedAvroCoderProxy<>(type, schema.toString()); - } - @Override public void encode(T value, OutputStream outStream, Context context) throws IOException { // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. + ThreadLocal encoder = getEncoder(); BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); // Save the potentially-new instance for reuse later. encoder.set(encoderInstance); - writer.get().write(value, encoderInstance); + getWriter().get().write(value, encoderInstance); // Direct binary encoder does not buffer any data and need not be flushed. } @Override public T decode(InputStream inStream, Context context) throws IOException { // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it. + ThreadLocal decoder = getDecoder(); BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); // Save the potentially-new instance for later. decoder.set(decoderInstance); - return reader.get().read(null, decoderInstance); + return getReader().get().read(null, decoderInstance); } @Override - public List> getCoderArguments() { + public List> getCoderArguments() { return null; } @@ -280,7 +257,7 @@ public class AvroCoder extends StandardCoder { public CloudObject asCloudObject() { CloudObject result = super.asCloudObject(); addString(result, "type", type.getName()); - addString(result, "schema", schema.toString()); + addString(result, "schema", getSchema().toString()); return result; } @@ -306,9 +283,9 @@ public class AvroCoder extends StandardCoder { @Deprecated public DatumReader createDatumReader() { if (type.equals(GenericRecord.class)) { - return new GenericDatumReader<>(schema); + return new GenericDatumReader<>(getSchema()); } else { - return new ReflectDatumReader<>(schema); + return new ReflectDatumReader<>(getSchema()); } } @@ -321,9 +298,9 @@ public class AvroCoder extends StandardCoder { @Deprecated public DatumWriter createDatumWriter() { if (type.equals(GenericRecord.class)) { - return new GenericDatumWriter<>(schema); + return new GenericDatumWriter<>(getSchema()); } else { - return new ReflectDatumWriter<>(schema); + return new ReflectDatumWriter<>(getSchema()); } } @@ -331,28 +308,69 @@ public class AvroCoder extends StandardCoder { * Returns the schema used by this coder. */ public Schema getSchema() { - return schema; + return getMemoizedSchema(); + } + + /** + * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily. + */ + private ThreadLocal getDecoder() { + if (memoizedDecoder == null) { + memoizedDecoder = new ThreadLocal<>(); + } + return memoizedDecoder; + } + + /** + * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily. + */ + private ThreadLocal getEncoder() { + if (memoizedEncoder == null) { + memoizedEncoder = new ThreadLocal<>(); + } + return memoizedEncoder; } /** - * Proxy to use in place of serializing the {@link AvroCoder}. This allows the fields - * to remain final. + * Get the memoized {@link DatumReader}, possibly initializing it lazily. */ - private static class SerializedAvroCoderProxy implements Serializable { - private final Class type; - private final String schemaStr; + private ThreadLocal> getReader() { + if (memoizedReader == null) { + memoizedReader = new ThreadLocal>() { + @Override + public DatumReader initialValue() { + return createDatumReader(); + } + }; + } + return memoizedReader; + } - public SerializedAvroCoderProxy(Class type, String schemaStr) { - this.type = type; - this.schemaStr = schemaStr; + /** + * Get the memoized {@link DatumWriter}, possibly initializing it lazily. + */ + private ThreadLocal> getWriter() { + if (memoizedWriter == null) { + memoizedWriter = new ThreadLocal>() { + @Override + public DatumWriter initialValue() { + return createDatumWriter(); + } + }; } + return memoizedWriter; + } - private Object readResolve() { - // When deserialized, instances of this object should be replaced by - // constructing an AvroCoder. + /** + * Get the {@link Schema}, possibly initializing it lazily by parsing {@link + * AvroCoder#schemaStr}. + */ + private Schema getMemoizedSchema() { + if (schema == null) { Schema.Parser parser = new Schema.Parser(); - return new AvroCoder(type, parser.parse(schemaStr)); + schema = parser.parse(schemaStr); } + return schema; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index f6329a0..f2373d1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -39,6 +39,10 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -172,6 +176,35 @@ public class AvroCoderTest { CoderProperties.coderDecodeEncodeEqual(copied, value); } + /** + * Confirm that we can serialize and deserialize an AvroCoder object using Kryo. + * (BEAM-626). + * + * @throws Exception + */ + @Test + public void testKryoSerialization() throws Exception { + Pojo value = new Pojo("Hello", 42); + AvroCoder coder = AvroCoder.of(Pojo.class); + + //Kryo instantiation + Kryo kryo = new Kryo(); + kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy()); + + //Serialization of object + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + kryo.writeObject(output, coder); + output.close(); + + //De-serialization of object + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + Input input = new Input(bis); + AvroCoder copied = (AvroCoder) kryo.readObject(input, AvroCoder.class); + + CoderProperties.coderDecodeEncodeEqual(copied, value); + } + @Test public void testPojoEncoding() throws Exception { Pojo value = new Pojo("Hello", 42);