flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject flink git commit: [FLINK-3002] Add Either type, EitherTypeInfo, and EitherSerializer to the Java API
Date Thu, 19 Nov 2015 13:23:14 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6888c9cf9 -> 6b253d9f8


[FLINK-3002] Add Either type, EitherTypeInfo, and EitherSerializer to the Java API

This closes #1371


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b253d9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b253d9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b253d9f

Branch: refs/heads/master
Commit: 6b253d9f8e3d3332784d87cd8b7b0d536e56caa7
Parents: 6888c9c
Author: vasia <vasia@apache.org>
Authored: Fri Nov 13 14:48:49 2015 +0100
Committer: vasia <vasia@apache.org>
Committed: Thu Nov 19 14:22:39 2015 +0100

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |   7 +
 .../apache/flink/api/java/typeutils/Either.java | 147 ++++++++++++++
 .../api/java/typeutils/EitherTypeInfo.java      | 111 +++++++++++
 .../typeutils/runtime/EitherSerializer.java     | 190 +++++++++++++++++++
 .../api/java/typeutils/EitherTypeInfoTest.java  |  58 ++++++
 .../typeutils/runtime/EitherSerializerTest.java | 111 +++++++++++
 6 files changed, 624 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b253d9f/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 5fbe27c..8248f5d 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1493,6 +1493,7 @@ There are six different categories of data types:
 4. **Regular Classes**
 5. **Values**
 6. **Hadoop Writables**
+7. **Special Types**
 
 #### Tuples and Case Classes
 
@@ -1651,6 +1652,12 @@ be altered, allowing programmers to reuse objects and take pressure
off the garb
 You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization
logic
 defined in the `write()`and `readFields()` methods will be used for serialization.
 
+#### Special Types
+
+You can use special types, including Scala's `Either`, `Option`, and `Try`.
+The Java API has its own custom implementation of `Either`.
+Similarly to Scala's `Either`, it represents a value of one two possible types, *Left* or
*Right*.
+`Either` can be useful for error handling or operators that need to output two different
types of records.
 
 #### Type Erasure & Type Inference
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b253d9f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
new file mode 100644
index 0000000..ba446a1
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+/**
+ * This type represents a value of one two possible types, Left or Right
+ * (a disjoint union), inspired by Scala's Either type.
+ *
+ * @param <L> the type of Left
+ * @param <R> the type of Right
+ */
+public abstract class Either<L, R> {
+
+	/**
+	 * Create a Left value of Either
+	 */
+	public static <L, R> Either<L, R> left(L value) {
+		return new Left<L, R>(value);
+	}
+
+	/**
+	 * Create a Right value of Either
+	 */
+	public static <L, R> Either<L, R> right(R value) {
+		return new Right<L, R>(value);
+	}
+
+	/**
+	 * Retrieve the Left value of Either.
+	 * @return the Left value
+	 * @throws IllegalStateException if called on a Right
+	 */
+	public abstract L left() throws IllegalStateException;
+
+	/**
+	 * Retrieve the Right value of Either.
+	 * @return the Right value
+	 * @throws IllegalStateException if called on a Left
+	 */
+	public abstract R right() throws IllegalStateException;
+
+	/**
+	 * 
+	 * @return true if this is a Left value, false if this is a Right value
+	 */
+	public final boolean isLeft() {
+		return getClass() == Left.class;
+	}
+
+	/**
+	 * 
+	 * @return true if this is a Right value, false if this is a Left value
+	 */
+	public final boolean isRight() {
+		return getClass() == Right.class;
+	}
+
+	private static class Left<L, R> extends Either<L, R> {
+		private final L value;
+
+		public Left(L value) {
+			this.value = java.util.Objects.requireNonNull(value);
+		}
+
+		@Override
+		public L left() {
+			return value;
+		}
+
+		@Override
+		public R right() {
+			throw new IllegalStateException("Cannot retrieve Right value on a Left");
+		}
+
+		@Override
+		public boolean equals(Object object) {
+			if (object instanceof Left<?, ?>) {
+				final Left<?, ?> other = (Left<?, ?>) object;
+				return value.equals(other.value);
+			}
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return value.hashCode();
+		}
+
+		@Override
+		public String toString() {
+			return "Left(" + value.toString() + ")";
+		}
+	}
+
+	private static class Right<L, R> extends Either<L, R> {
+		private final R value;
+
+		public Right(R value) {
+			this.value = java.util.Objects.requireNonNull(value);
+		}
+
+		@Override
+		public L left() {
+			throw new IllegalStateException("Cannot retrieve Left value on a Right");
+		}
+
+		@Override
+		public R right() {
+			return value;
+		}
+
+		@Override
+		public boolean equals(Object object) {
+			if (object instanceof Right<?, ?>) {
+				final Right<?, ?> other = (Right<?, ?>) object;
+				return value.equals(other.value);
+			}
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return value.hashCode();
+		}
+
+		@Override
+		public String toString() {
+			return "Right(" + value.toString() + ")";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b253d9f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
new file mode 100644
index 0000000..40ed0c0
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+
+/**
+ * A {@link TypeInformation} for the {@link Either} type of the Java API.
+ *
+ * @param <L> the Left value type
+ * @param <R> the Right value type
+ */
+public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>>
{
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeInformation<L> leftType;
+
+	private final TypeInformation<R> rightType;
+
+	public EitherTypeInfo(TypeInformation<L> leftType,TypeInformation<R> rightType)
{
+		this.leftType = leftType;
+		this.rightType = rightType;
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+
+	@Override
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Class<Either<L, R>> getTypeClass() {
+		return (Class<Either<L, R>>) (Class<?>) Either.class;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig config)
{
+		return new EitherSerializer<L, R>(leftType.createSerializer(config),
+				rightType.createSerializer(config));
+	}
+
+	@Override
+	public String toString() {
+		return "Either <" + leftType.toString() + ", " + rightType.toString() + ">";
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof EitherTypeInfo) {
+			EitherTypeInfo<L, R> other = (EitherTypeInfo<L, R>) obj;
+
+			return other.canEqual(this) &&
+				leftType.equals(other.leftType) &&
+				rightType.equals(other.rightType);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 17 * leftType.hashCode() + rightType.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof EitherTypeInfo;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b253d9f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
new file mode 100644
index 0000000..cfd1b5b
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * A {@link TypeSerializer} for the {@ link Either} type of the Java class.
+ *
+ * @param <L> the Left value type
+ * @param <R> the Right value type
+ */
+public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>>
{
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<L> leftSerializer;
+
+	private final TypeSerializer<R> rightSerializer;
+
+	public EitherSerializer(TypeSerializer<L> leftSerializer, TypeSerializer<R>
rightSerializer) {
+		this.leftSerializer = leftSerializer;
+		this.rightSerializer = rightSerializer;
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return leftSerializer.isImmutableType() && rightSerializer.isImmutableType();
+	}
+
+	@Override
+	public TypeSerializer<Either<L, R>> duplicate() {
+		TypeSerializer<L> duplicateLeft = leftSerializer.duplicate();
+		TypeSerializer<R> duplicateRight = rightSerializer.duplicate();
+
+		if ((leftSerializer != duplicateLeft) || (rightSerializer != duplicateRight)) {
+			// stateful
+			return new EitherSerializer<L, R>(duplicateLeft, duplicateRight);
+		}
+		else {
+			return this;
+		}
+	}
+
+
+	@Override
+	public Either<L, R> createInstance() {
+		// We arbitrarily always create a Right value instance.
+		return Either.right(rightSerializer.createInstance());
+	}
+
+	@Override
+	public Either<L, R> copy(Either<L, R> from) {
+		if (from.isLeft()) {
+			L left = from.left();
+			L copyLeft = leftSerializer.copy(left);
+			return Either.left(copyLeft);
+		}
+		else {
+			R right = from.right();
+			R copyRight = rightSerializer.copy(right);
+			return Either.right(copyRight);
+		}
+	}
+
+	@Override
+	public Either<L, R> copy(Either<L, R> from, Either<L, R> reuse) {
+		if (from.isRight()) {
+			final R right = from.right();
+			if (reuse.isRight()) {
+				R copyRight = rightSerializer.copy(right, reuse.right());
+				return Either.right(copyRight);
+			}
+			else {
+				// if the reuse record isn't a right value, we cannot reuse
+				R copyRight = rightSerializer.copy(right);
+				return Either.right(copyRight);
+			}
+		}
+		else {
+			L left = from.left();
+			// reuse record is never a left value because we always create a right instance
+			L copyLeft = leftSerializer.copy(left);
+			return Either.left(copyLeft);
+		}
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(Either<L, R> record, DataOutputView target) throws IOException
{
+		if (record.isLeft()) {
+			target.writeBoolean(true);
+			leftSerializer.serialize(record.left(), target);
+		}
+		else {
+			target.writeBoolean(false);
+			rightSerializer.serialize(record.right(), target);
+		}
+	}
+
+	@Override
+	public Either<L, R> deserialize(DataInputView source) throws IOException {
+		boolean isLeft = source.readBoolean();
+		if (isLeft) {
+			return Either.left(leftSerializer.deserialize(source));
+		}
+		else {
+			return Either.right(rightSerializer.deserialize(source));
+		}
+	}
+
+	@Override
+	public Either<L, R> deserialize(Either<L, R> reuse, DataInputView source) throws
IOException {
+		boolean isLeft = source.readBoolean();
+		if (!isLeft) {
+			if (reuse.isRight()) {
+				return Either.right(rightSerializer.deserialize(reuse.right(), source));
+			}
+			else {
+				// if the reuse record isn't a right value, we cannot reuse
+				return Either.right(rightSerializer.deserialize(source));
+			}
+		}
+		else {
+			// reuse record is never a left value because we always create a right instance
+			return Either.left(leftSerializer.deserialize(source));
+		}
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		boolean isLeft = source.readBoolean();
+		target.writeBoolean(isLeft);
+		if (isLeft) {
+			leftSerializer.copy(source, target);
+		}
+		else {
+			rightSerializer.copy(source, target);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof EitherSerializer) {
+			EitherSerializer<L, R> other = (EitherSerializer<L, R>) obj;
+
+			return other.canEqual(this) &&
+				leftSerializer.equals(other.leftSerializer) &&
+				rightSerializer.equals(other.rightSerializer);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof EitherSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return 17 * leftSerializer.hashCode() + rightSerializer.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b253d9f/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
new file mode 100644
index 0000000..b255136
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class EitherTypeInfoTest extends TestLogger {
+
+	Either<Integer, String> intEither = Either.left(1);
+	Either<Integer, String> stringEither = Either.right("boo");
+	Either<Integer, Tuple2<Double, Long>> tuple2Either = Either.right(new Tuple2<Double,
Long>(42.0, 2l));
+
+	@Test
+	public void testEitherTypeEquality() {
+		EitherTypeInfo<Integer, String> eitherInfo1 = new EitherTypeInfo<Integer, String>(
+				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		EitherTypeInfo<Integer, String> eitherInfo2 = new EitherTypeInfo<Integer, String>(
+				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		assertEquals(eitherInfo1, eitherInfo2);
+		assertEquals(eitherInfo1.hashCode(), eitherInfo2.hashCode());
+	}
+
+	@Test
+	public void testEitherTypeInEquality() {
+		EitherTypeInfo<Integer, String> eitherInfo1 = new EitherTypeInfo<Integer, String>(
+				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		EitherTypeInfo<Integer, Tuple2<Double, Long>> eitherInfo2 = new EitherTypeInfo<Integer,
Tuple2<Double, Long>>(
+				BasicTypeInfo.INT_TYPE_INFO, new TupleTypeInfo<Tuple2<Double, Long>>(
+				TypeExtractor.getForClass(Double.class), TypeExtractor.getForClass(String.class)));
+
+		assertNotEquals(eitherInfo1, eitherInfo2);
+		assertNotEquals(eitherInfo1.hashCode(), eitherInfo2.hashCode());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b253d9f/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
new file mode 100644
index 0000000..198f641
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+public class EitherSerializerTest {
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testStringDoubleEither() {
+
+	Either<String, Double>[] testData = new Either[] {
+			Either.left("banana"),
+			Either.left(""),
+			Either.right(32.0),
+			Either.right(Double.MIN_VALUE),
+			Either.right(Double.MAX_VALUE)};
+
+	EitherTypeInfo<String, Double> eitherTypeInfo = (EitherTypeInfo<String, Double>)
new EitherTypeInfo<String, Double>(
+			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO);
+	EitherSerializer<String, Double> eitherSerializer =
+			(EitherSerializer<String, Double>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+	SerializerTestInstance<Either<String, Double>> testInstance =
+			new EitherSerializerTestInstance<Either<String, Double>>(eitherSerializer,
eitherTypeInfo.getTypeClass(), -1, testData);
+	testInstance.testAll();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testEitherWithTuple() {
+
+	Either<Tuple2<Long, Long>, Double>[] testData = new Either[] {
+			Either.left(new Tuple2<>(2l, 9l)),
+			Either.left(new Tuple2<>(Long.MIN_VALUE, Long.MAX_VALUE)),
+			Either.right(32.0),
+			Either.right(Double.MIN_VALUE),
+			Either.right(Double.MAX_VALUE)};
+
+	EitherTypeInfo<Tuple2<Long, Long>, Double> eitherTypeInfo = (EitherTypeInfo<Tuple2<Long,
Long>, Double>)
+			new EitherTypeInfo<Tuple2<Long, Long>, Double>(
+			new TupleTypeInfo<Tuple2<Long, Long>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO),
+			BasicTypeInfo.DOUBLE_TYPE_INFO);
+	EitherSerializer<Tuple2<Long, Long>, Double> eitherSerializer =
+			(EitherSerializer<Tuple2<Long, Long>, Double>) eitherTypeInfo.createSerializer(new
ExecutionConfig());
+	SerializerTestInstance<Either<Tuple2<Long, Long>, Double>> testInstance
=
+			new EitherSerializerTestInstance<Either<Tuple2<Long, Long>, Double>>(
+					eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
+	testInstance.testAll();
+	}
+
+	/**
+	 * {@link org.apache.flink.api.common.typeutils.SerializerTestBase#testInstantiate()}
+	 * checks that the type of the created instance is the same as the type class parameter.
+	 * Since we arbitrarily create always create a Left instance we override this test.
+	 */
+	private class EitherSerializerTestInstance<T> extends SerializerTestInstance<T>
{
+
+		public EitherSerializerTestInstance(TypeSerializer<T> serializer,
+				Class<T> typeClass, int length, T[] testData) {
+			super(serializer, typeClass, length, testData);
+		}
+
+		@Override
+		@Test
+		public void testInstantiate() {
+			try {
+				TypeSerializer<T> serializer = getSerializer();
+
+				T instance = serializer.createInstance();
+				assertNotNull("The created instance must not be null.", instance);
+				
+				Class<T> type = getTypeClass();
+				assertNotNull("The test is corrupt: type class is null.", type);
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				fail("Exception in test: " + e.getMessage());
+			}
+		}
+		
+	}
+}


Mime
View raw message