flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [29/31] flink git commit: [FLINK-8421] [core] Introduce PostVersionedIOReadableWritable
Date Tue, 06 Feb 2018 19:02:53 GMT
[FLINK-8421] [core] Introduce PostVersionedIOReadableWritable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba8ffdd6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba8ffdd6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba8ffdd6

Branch: refs/heads/master
Commit: ba8ffdd6bc5222b1b3e12aea572417ea3a455d7b
Parents: 9659ffd
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Wed Jan 24 18:08:52 2018 +0100
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Feb 6 19:58:59 2018 +0100

----------------------------------------------------------------------
 .../io/PostVersionedIOReadableWritable.java     |  88 +++++++++++++
 .../io/PostVersionedIOReadableWritableTest.java | 132 +++++++++++++++++++
 2 files changed, 220 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ffdd6/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java
b/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java
new file mode 100644
index 0000000..6edc983
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+import java.util.Arrays;
+
+/**
+ * A {@link VersionedIOReadableWritable} which allows to differentiate whether the previous
+ * data was versioned with a {@link VersionedIOReadableWritable}. This can be used if previously
+ * written data was not versioned, and is to be migrated to a versioned format.
+ */
+@Internal
+public abstract class PostVersionedIOReadableWritable extends VersionedIOReadableWritable
{
+
+	/** NOTE: CANNOT CHANGE! */
+	private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, -51, -123, -97};
+
+	/**
+	 * Read from the provided {@link DataInputView in}. A flag {@code wasVersioned} can be
+	 * used to determine whether or not the data to read was previously written
+	 * by a {@link VersionedIOReadableWritable}.
+	 */
+	protected abstract void read(DataInputView in, boolean wasVersioned) throws IOException;
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.write(VERSIONED_IDENTIFIER);
+		super.write(out);
+	}
+
+	/**
+	 * This read attempts to first identify if the input view contains the special
+	 * {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few bytes.
+	 * If identified to be versioned, the usual version resolution read path
+	 * in {@link VersionedIOReadableWritable#read(DataInputView)} is invoked.
+	 * Otherwise, we "reset" the input stream by pushing back the read buffered bytes
+	 * into the stream.
+	 */
+	public final void read(InputStream inputStream) throws IOException {
+		byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
+		inputStream.read(tmp);
+
+		if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) {
+			DataInputView inputView = new DataInputViewStreamWrapper(inputStream);
+
+			super.read(inputView);
+			read(inputView, true);
+		} else {
+			PushbackInputStream resetStream = new PushbackInputStream(inputStream, VERSIONED_IDENTIFIER.length);
+			resetStream.unread(tmp);
+
+			read(new DataInputViewStreamWrapper(resetStream), false);
+		}
+	}
+
+	/**
+	 * We do not support reading from a {@link DataInputView}, because it does not
+	 * support pushing back already read bytes.
+	 */
+	@Override
+	public final void read(DataInputView in) throws IOException {
+		throw new UnsupportedOperationException("PostVersionedIOReadableWritable cannot read from
a DataInputView.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ffdd6/flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java
b/flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java
new file mode 100644
index 0000000..2954536
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Suite of tests for {@link PostVersionedIOReadableWritable}.
+ */
+public class PostVersionedIOReadableWritableTest {
+
+	@Test
+	public void testReadVersioned() throws IOException {
+
+		String payload = "test-data";
+		TestPostVersionedReadableWritable versionedReadableWritable = new TestPostVersionedReadableWritable(payload);
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			versionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
+		try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			restoredVersionedReadableWritable.read(in);
+		}
+
+		Assert.assertEquals(payload, restoredVersionedReadableWritable.getData());
+	}
+
+	@Test
+	public void testReadNonVersioned() throws IOException {
+		int preVersionedPayload = 563;
+
+		TestNonVersionedReadableWritable nonVersionedReadableWritable = new TestNonVersionedReadableWritable(preVersionedPayload);
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			nonVersionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
+		try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			restoredVersionedReadableWritable.read(in);
+		}
+
+		Assert.assertEquals(String.valueOf(preVersionedPayload), restoredVersionedReadableWritable.getData());
+	}
+
+	static class TestPostVersionedReadableWritable extends PostVersionedIOReadableWritable {
+
+		private static final int VERSION = 1;
+		private String data;
+
+		TestPostVersionedReadableWritable() {}
+
+		TestPostVersionedReadableWritable(String data) {
+			this.data = data;
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+			out.writeUTF(data);
+		}
+
+		@Override
+		protected void read(DataInputView in, boolean wasVersioned) throws IOException {
+			if (wasVersioned) {
+				this.data = in.readUTF();
+			} else {
+				// in the previous non-versioned format, we wrote integers instead
+				this.data = String.valueOf(in.readInt());
+			}
+		}
+
+		public String getData() {
+			return data;
+		}
+	}
+
+	static class TestNonVersionedReadableWritable implements IOReadableWritable {
+
+		private int data;
+
+		TestNonVersionedReadableWritable(int data) {
+			this.data = data;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(data);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			this.data = in.readInt();
+		}
+	}
+
+}


Mime
View raw message