Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6F70817685 for ; Tue, 29 Sep 2015 12:08:33 +0000 (UTC) Received: (qmail 98198 invoked by uid 500); 29 Sep 2015 12:08:33 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 98102 invoked by uid 500); 29 Sep 2015 12:08:33 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 97881 invoked by uid 99); 29 Sep 2015 12:08:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Sep 2015 12:08:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EAC1DE00DC; Tue, 29 Sep 2015 12:08:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: sewen@apache.org To: commits@flink.apache.org Date: Tue, 29 Sep 2015 12:08:36 -0000 Message-Id: In-Reply-To: <7a1e887c1d01450abe64a5b5fc92d177@git.apache.org> References: <7a1e887c1d01450abe64a5b5fc92d177@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/8] flink git commit: [FLINK-2723] [core] CopyableValue method to copy into new instance [FLINK-2723] [core] CopyableValue method to copy into new instance This closes #1169 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e727355e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e727355e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e727355e Branch: refs/heads/master Commit: e727355e42bd0ad7d403aee703aaf33a68a839d2 Parents: 40cbf7e Author: Greg Hogan Authored: Mon Sep 21 15:14:09 2015 -0400 Committer: Stephan Ewen Committed: Tue Sep 29 12:18:07 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/types/BooleanValue.java | 6 +- .../java/org/apache/flink/types/ByteValue.java | 7 +- .../java/org/apache/flink/types/CharValue.java | 7 +- .../org/apache/flink/types/CopyableValue.java | 30 ++++- .../org/apache/flink/types/DoubleValue.java | 7 +- .../java/org/apache/flink/types/FloatValue.java | 7 +- .../java/org/apache/flink/types/IntValue.java | 9 +- .../java/org/apache/flink/types/LongValue.java | 7 +- .../java/org/apache/flink/types/NullValue.java | 7 +- .../java/org/apache/flink/types/Record.java | 5 + .../java/org/apache/flink/types/ShortValue.java | 7 +- .../org/apache/flink/types/StringValue.java | 7 +- .../apache/flink/types/CopyableValueTest.java | 109 +++++++++++++++++++ 13 files changed, 203 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java index 071e650..e034648 100644 --- a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java @@ -48,7 +48,6 @@ public class BooleanValue implements NormalizableKey, ResettableVa this.value = value; } - public boolean get() { return value; } @@ -118,6 +117,11 @@ public class BooleanValue implements NormalizableKey, ResettableVa } @Override + public BooleanValue copy() { + return new BooleanValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 1); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ByteValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java index c2f1f10..40ed1ad 100644 --- a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java @@ -147,13 +147,18 @@ public class ByteValue implements NormalizableKey, ResettableValue, ResettableValue extends Value { */ int getBinaryLength(); + /** + * Performs a deep copy of this object into the {@code target} instance. + * + * @param target Object to copy into. + */ void copyTo(T target); - + + /** + * Performs a deep copy of this object into a new instance. + * + * This method is useful for generic user-defined functions to clone a + * {@link CopyableValue} when storing multiple objects. With object reuse + * a deep copy must be created and type erasure prevents calling new. + * + * @return New object with copied fields. + */ + T copy(); + + /** + * Copies the next serialized instance from {@code source} to {@code target}. + * + * This method is equivalent to calling {@code IOReadableWritable.read(DataInputView)} + * followed by {@code IOReadableWritable.write(DataOutputView)} but does not require + * intermediate deserialization. + * + * @param source Data source for serialized instance. + * @param target Data target for serialized instance. + * + * @see org.apache.flink.core.io.IOReadableWritable + */ void copy(DataInputView source, DataOutputView target) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java index abcbcf5..3158e40 100644 --- a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java @@ -122,13 +122,18 @@ public class DoubleValue implements Key, ResettableValue, ResettableValue, public int getBinaryLength() { return 4; } - + @Override public void copyTo(FloatValue target) { target.value = this.value; } @Override + public FloatValue copy() { + return new FloatValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 4); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/IntValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/IntValue.java b/flink-core/src/main/java/org/apache/flink/types/IntValue.java index 423c8c1..1b893f0 100644 --- a/flink-core/src/main/java/org/apache/flink/types/IntValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/IntValue.java @@ -139,7 +139,7 @@ public class IntValue implements NormalizableKey, ResettableValue, ResettableValue, ResettableValue, CopyableValu public int getBinaryLength() { return 1; } - + @Override public void copyTo(NullValue target) { } @Override + public NullValue copy() { + return NullValue.getInstance(); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { source.readBoolean(); target.writeBoolean(false); http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/Record.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java index 218d6ce..24ff979 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Record.java +++ b/flink-core/src/main/java/org/apache/flink/types/Record.java @@ -763,6 +763,11 @@ public final class Record implements Value, CopyableValue { } @Override + public Record copy() { + return createCopy(); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { int val = source.readUnsignedByte(); target.writeByte(val); http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ShortValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java index ec03497..f18ce7f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java @@ -154,13 +154,18 @@ public class ShortValue implements NormalizableKey, ResettableValue< public int getBinaryLength() { return 2; } - + @Override public void copyTo(ShortValue target) { target.value = this.value; } @Override + public ShortValue copy() { + return new ShortValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 2); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/StringValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java index db0f184..2249019 100644 --- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java @@ -682,7 +682,12 @@ public class StringValue implements NormalizableKey, CharSequence, target.ensureSize(this.len); System.arraycopy(this.value, 0, target.value, 0, this.len); } - + + @Override + public StringValue copy() { + return new StringValue(this); + } + @Override public void copy(DataInputView in, DataOutputView target) throws IOException { int len = in.readUnsignedByte(); http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java new file mode 100644 index 0000000..76bdece --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class CopyableValueTest { + + @Test + public void testCopy() { + CopyableValue[] value_types = new CopyableValue[] { + new BooleanValue(true), + new ByteValue((byte) 42), + new CharValue('q'), + new DoubleValue(3.1415926535897932), + new FloatValue((float) 3.14159265), + new IntValue(42), + new LongValue(42l), + new NullValue(), + new ShortValue((short) 42), + new StringValue("QED") + }; + + for (CopyableValue type : value_types) { + assertEquals(type, type.copy()); + } + } + + @Test + public void testCopyTo() { + BooleanValue boolean_from = new BooleanValue(true); + BooleanValue boolean_to = new BooleanValue(false); + + boolean_from.copyTo(boolean_to); + assertEquals(boolean_from, boolean_to); + + ByteValue byte_from = new ByteValue((byte) 3); + ByteValue byte_to = new ByteValue((byte) 7); + + byte_from.copyTo(byte_to); + assertEquals(byte_from, byte_to); + + CharValue char_from = new CharValue('α'); + CharValue char_to = new CharValue('ω'); + + char_from.copyTo(char_to); + assertEquals(char_from, char_to); + + DoubleValue double_from = new DoubleValue(2.7182818284590451); + DoubleValue double_to = new DoubleValue(0); + + double_from.copyTo(double_to); + assertEquals(double_from, double_to); + + FloatValue float_from = new FloatValue((float) 2.71828182); + FloatValue float_to = new FloatValue((float) 1.41421356); + + float_from.copyTo(float_to); + assertEquals(float_from, float_to); + + IntValue int_from = new IntValue(8191); + IntValue int_to = new IntValue(131071); + + int_from.copyTo(int_to); + assertEquals(int_from, int_to); + + LongValue long_from = new LongValue(524287); + LongValue long_to = new LongValue(2147483647); + + long_from.copyTo(long_to); + assertEquals(long_from, long_to); + + NullValue null_from = new NullValue(); + NullValue null_to = new NullValue(); + + null_from.copyTo(null_to); + assertEquals(null_from, null_to); + + ShortValue short_from = new ShortValue((short) 31); + ShortValue short_to = new ShortValue((short) 127); + + short_from.copyTo(short_to); + assertEquals(short_from, short_to); + + StringValue string_from = new StringValue("2305843009213693951"); + StringValue string_to = new StringValue("618970019642690137449562111"); + + string_from.copyTo(string_to); + assertEquals(string_from, string_to); + } +}