From commits-return-15629-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Feb 6 20:02:31 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id A02BD1807B4 for ; Tue, 6 Feb 2018 20:02:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 98CAE160C5E; Tue, 6 Feb 2018 19:02:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 69F2E160C5B for ; Tue, 6 Feb 2018 20:02:28 +0100 (CET) Received: (qmail 82380 invoked by uid 500); 6 Feb 2018 19:02:27 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 81813 invoked by uid 99); 6 Feb 2018 19:02:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Feb 2018 19:02:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F204CF217E; Tue, 6 Feb 2018 19:02:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tzulitai@apache.org To: commits@flink.apache.org Date: Tue, 06 Feb 2018 19:02:53 -0000 Message-Id: <3f2e9ab3768e444284632f78911579fa@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [29/31] flink git commit: [FLINK-8421] [core] Introduce PostVersionedIOReadableWritable [FLINK-8421] [core] Introduce PostVersionedIOReadableWritable Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba8ffdd6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba8ffdd6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba8ffdd6 Branch: refs/heads/master Commit: ba8ffdd6bc5222b1b3e12aea572417ea3a455d7b Parents: 9659ffd Author: Tzu-Li (Gordon) Tai Authored: Wed Jan 24 18:08:52 2018 +0100 Committer: Tzu-Li (Gordon) Tai Committed: Tue Feb 6 19:58:59 2018 +0100 ---------------------------------------------------------------------- .../io/PostVersionedIOReadableWritable.java | 88 +++++++++++++ .../io/PostVersionedIOReadableWritableTest.java | 132 +++++++++++++++++++ 2 files changed, 220 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ffdd6/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java new file mode 100644 index 0000000..6edc983 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; +import java.util.Arrays; + +/** + * A {@link VersionedIOReadableWritable} which allows to differentiate whether the previous + * data was versioned with a {@link VersionedIOReadableWritable}. This can be used if previously + * written data was not versioned, and is to be migrated to a versioned format. + */ +@Internal +public abstract class PostVersionedIOReadableWritable extends VersionedIOReadableWritable { + + /** NOTE: CANNOT CHANGE! */ + private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, -51, -123, -97}; + + /** + * Read from the provided {@link DataInputView in}. A flag {@code wasVersioned} can be + * used to determine whether or not the data to read was previously written + * by a {@link VersionedIOReadableWritable}. + */ + protected abstract void read(DataInputView in, boolean wasVersioned) throws IOException; + + @Override + public void write(DataOutputView out) throws IOException { + out.write(VERSIONED_IDENTIFIER); + super.write(out); + } + + /** + * This read attempts to first identify if the input view contains the special + * {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few bytes. + * If identified to be versioned, the usual version resolution read path + * in {@link VersionedIOReadableWritable#read(DataInputView)} is invoked. + * Otherwise, we "reset" the input stream by pushing back the read buffered bytes + * into the stream. + */ + public final void read(InputStream inputStream) throws IOException { + byte[] tmp = new byte[VERSIONED_IDENTIFIER.length]; + inputStream.read(tmp); + + if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) { + DataInputView inputView = new DataInputViewStreamWrapper(inputStream); + + super.read(inputView); + read(inputView, true); + } else { + PushbackInputStream resetStream = new PushbackInputStream(inputStream, VERSIONED_IDENTIFIER.length); + resetStream.unread(tmp); + + read(new DataInputViewStreamWrapper(resetStream), false); + } + } + + /** + * We do not support reading from a {@link DataInputView}, because it does not + * support pushing back already read bytes. + */ + @Override + public final void read(DataInputView in) throws IOException { + throw new UnsupportedOperationException("PostVersionedIOReadableWritable cannot read from a DataInputView."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ffdd6/flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java b/flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java new file mode 100644 index 0000000..2954536 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** + * Suite of tests for {@link PostVersionedIOReadableWritable}. + */ +public class PostVersionedIOReadableWritableTest { + + @Test + public void testReadVersioned() throws IOException { + + String payload = "test-data"; + TestPostVersionedReadableWritable versionedReadableWritable = new TestPostVersionedReadableWritable(payload); + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + versionedReadableWritable.write(new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); + } + + TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable(); + try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + restoredVersionedReadableWritable.read(in); + } + + Assert.assertEquals(payload, restoredVersionedReadableWritable.getData()); + } + + @Test + public void testReadNonVersioned() throws IOException { + int preVersionedPayload = 563; + + TestNonVersionedReadableWritable nonVersionedReadableWritable = new TestNonVersionedReadableWritable(preVersionedPayload); + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + nonVersionedReadableWritable.write(new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); + } + + TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable(); + try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + restoredVersionedReadableWritable.read(in); + } + + Assert.assertEquals(String.valueOf(preVersionedPayload), restoredVersionedReadableWritable.getData()); + } + + static class TestPostVersionedReadableWritable extends PostVersionedIOReadableWritable { + + private static final int VERSION = 1; + private String data; + + TestPostVersionedReadableWritable() {} + + TestPostVersionedReadableWritable(String data) { + this.data = data; + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + out.writeUTF(data); + } + + @Override + protected void read(DataInputView in, boolean wasVersioned) throws IOException { + if (wasVersioned) { + this.data = in.readUTF(); + } else { + // in the previous non-versioned format, we wrote integers instead + this.data = String.valueOf(in.readInt()); + } + } + + public String getData() { + return data; + } + } + + static class TestNonVersionedReadableWritable implements IOReadableWritable { + + private int data; + + TestNonVersionedReadableWritable(int data) { + this.data = data; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(data); + } + + @Override + public void read(DataInputView in) throws IOException { + this.data = in.readInt(); + } + } + +}