flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [02/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:02 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
deleted file mode 100644
index 8ff0b1b..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
-import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("unchecked")
-public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
-
-	ExecutionConfig ec = new ExecutionConfig();
-	
-	@Test
-	public void testJavaList(){
-		Collection<Integer> a = new ArrayList<>();
-
-		fillCollection(a);
-
-		runTests(a);
-	}
-
-	@Test
-	public void testJavaSet(){
-		Collection<Integer> b = new HashSet<>();
-
-		fillCollection(b);
-
-		runTests(b);
-	}
-
-
-
-	@Test
-	public void testJavaDequeue(){
-		Collection<Integer> c = new LinkedList<>();
-		fillCollection(c);
-		runTests(c);
-	}
-
-	private void fillCollection(Collection<Integer> coll) {
-		coll.add(42);
-		coll.add(1337);
-		coll.add(49);
-		coll.add(1);
-	}
-
-	@Override
-	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-		return new KryoSerializer<T>(type, ec);
-	}
-	
-	/**
-	 * Make sure that the kryo serializer forwards EOF exceptions properly when serializing
-	 */
-	@Test
-	public void testForwardEOFExceptionWhileSerializing() {
-		try {
-			// construct a long string
-			String str;
-			{
-				char[] charData = new char[40000];
-				Random rnd = new Random();
-				
-				for (int i = 0; i < charData.length; i++) {
-					charData[i] = (char) rnd.nextInt(10000);
-				}
-				
-				str = new String(charData);
-			}
-			
-			// construct a memory target that is too small for the string
-			TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000);
-			KryoSerializer<String> serializer = new KryoSerializer<String>(String.class,
new ExecutionConfig());
-			
-			try {
-				serializer.serialize(str, target);
-				fail("should throw a java.io.EOFException");
-			}
-			catch (java.io.EOFException e) {
-				// that is how we like it
-			}
-			catch (Exception e) {
-				fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Make sure that the kryo serializer forwards EOF exceptions properly when serializing
-	 */
-	@Test
-	public void testForwardEOFExceptionWhileDeserializing() {
-		try {
-			int numElements = 100;
-			// construct a memory target that is too small for the string
-			TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements);
-			KryoSerializer<Integer> serializer = new KryoSerializer<>(Integer.class, new
ExecutionConfig());
-
-			for(int i = 0; i < numElements; i++){
-				serializer.serialize(i, target);
-			}
-
-			ComparatorTestBase.TestInputView source = new ComparatorTestBase.TestInputView(target.copyByteBuffer());
-
-			for(int i = 0; i < numElements; i++){
-				int value = serializer.deserialize(source);
-				assertEquals(i, value);
-			}
-
-			try {
-				serializer.deserialize(source);
-				fail("should throw a java.io.EOFException");
-			}
-			catch (java.io.EOFException e) {
-				// that is how we like it :-)
-			}
-			catch (Exception e) {
-				fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void validateReferenceMappingDisabled() {
-		KryoSerializer<String> serializer = new KryoSerializer<>(String.class, new
ExecutionConfig());
-		Kryo kryo = serializer.getKryo();
-		assertFalse(kryo.getReferences());
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
deleted file mode 100644
index d68afd6..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import java.util.Collection;
-import java.util.HashSet;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
-import org.joda.time.LocalDate;
-import org.junit.Test;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-@SuppressWarnings("unchecked")
-public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializerTest {
-	
-
-	@Test
-	public void testJodaTime(){
-		Collection<LocalDate> b = new HashSet<LocalDate>();
-
-		b.add(new LocalDate(1L));
-		b.add(new LocalDate(2L));
-
-		runTests(b);
-	}
-
-	@Override
-	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-		ExecutionConfig conf = new ExecutionConfig();
-		conf.registerTypeWithKryoSerializer(LocalDate.class, LocalDateSerializer.class);
-		TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type);
-		return typeInfo.createSerializer(conf);
-	}
-	
-	public static final class LocalDateSerializer extends Serializer<LocalDate> implements
java.io.Serializable {
-		
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void write(Kryo kryo, Output output, LocalDate object) {
-			output.writeInt(object.getYear());
-			output.writeInt(object.getMonthOfYear());
-			output.writeInt(object.getDayOfMonth());
-		}
-		
-		@Override
-		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) {
-			return new LocalDate(input.readInt(), input.readInt(), input.readInt());
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
deleted file mode 100644
index 7c6d023..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import org.apache.flink.api.common.ExecutionConfig;
-
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.core.fs.Path;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import static org.junit.Assert.assertTrue;
-
-public class SerializersTest {
-
-	// recursive
-	public static class Node {
-		private Node parent;
-	}
-	
-	public static class FromNested {
-		Node recurseMe;
-	}
-	
-	public static class FromGeneric1 {}
-	public static class FromGeneric2 {}
-	
-	public static class Nested1 {
-		private FromNested fromNested;
-		private Path yodaIntervall;
-	}
-
-	public static class ClassWithNested {
-		
-		Nested1 nested;
-		int ab;
-		
-		ArrayList<FromGeneric1> addGenType;
-		FromGeneric2[] genericArrayType;
-	}
-
-	@Test
-	public void testTypeRegistration() {
-		ExecutionConfig conf = new ExecutionConfig();
-		Serializers.recursivelyRegisterType(ClassWithNested.class, conf, new HashSet<Class<?>>());
-		
-		KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); //
we create Kryo from another type.
-
-		Assert.assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
-		Assert.assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0);
-		Assert.assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0);
-		
-		// check if the generic type from one field is also registered (its very likely that
-		// generic types are also used as fields somewhere.
-		Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0);
-		Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0);
-		Assert.assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0);
-		
-		
-		// register again and make sure classes are still registered
-		ExecutionConfig conf2 = new ExecutionConfig();
-		Serializers.recursivelyRegisterType(ClassWithNested.class, conf2, new HashSet<Class<?>>());
-		KryoSerializer<String> kryo2 = new KryoSerializer<>(String.class, conf);
-		assertTrue(kryo2.getKryo().getRegistration(FromNested.class).getId() > 0);
-	}
-
-	@Test
-	public void testTypeRegistrationFromTypeInfo() {
-		ExecutionConfig conf = new ExecutionConfig();
-		Serializers.recursivelyRegisterType(new GenericTypeInfo<>(ClassWithNested.class),
conf, new HashSet<Class<?>>());
-
-		KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); //
we create Kryo from another type.
-
-		assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
-		assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0);
-		assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0);
-
-		// check if the generic type from one field is also registered (its very likely that
-		// generic types are also used as fields somewhere.
-		assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0);
-		assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0);
-		assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
deleted file mode 100644
index faab26a..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.tuple.base;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-
-import static org.junit.Assert.assertEquals;
-
-public abstract class TupleComparatorTestBase<T extends Tuple> extends ComparatorTestBase<T>
{
-
-	@Override
-	protected void deepEquals(String message, T should, T is) {
-		for (int x = 0; x < should.getArity(); x++) {
-			assertEquals(should.getField(x), is.getField(x));
-		}
-	}
-
-	@Override
-	protected abstract TupleComparator<T> createComparator(boolean ascending);
-
-	@Override
-	protected abstract TupleSerializer<T> createSerializer();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
deleted file mode 100644
index 1d414d8..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.tuple.base;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-/**
- * Abstract test base for TuplePairComparators.
- *
- * @param <T>
- * @param <R>
- */
-public abstract class TuplePairComparatorTestBase<T extends Tuple, R extends Tuple>
extends TestLogger {
-
-	protected abstract TypePairComparator<T, R> createComparator(boolean ascending);
-
-	protected abstract Tuple2<T[], R[]> getSortedTestData();
-
-	@Test
-	public void testEqualityWithReference() {
-		try {
-			TypePairComparator<T, R> comparator = getComparator(true);
-			Tuple2<T[], R[]> data = getSortedData();
-			for (int x = 0; x < data.f0.length; x++) {
-				comparator.setReference(data.f0[x]);
-
-				assertTrue(comparator.equalToReference(data.f1[x]));
-			}
-		} catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Exception in test: " + e.getMessage());
-		}
-	}
-
-	@Test
-	public void testInequalityWithReference() {
-		testGreatSmallAscDescWithReference(true);
-		testGreatSmallAscDescWithReference(false);
-	}
-
-	protected void testGreatSmallAscDescWithReference(boolean ascending) {
-		try {
-			Tuple2<T[], R[]> data = getSortedData();
-
-			TypePairComparator<T, R> comparator = getComparator(ascending);
-
-			//compares every element in high with every element in low
-			for (int x = 0; x < data.f0.length - 1; x++) {
-				for (int y = x + 1; y < data.f1.length; y++) {
-					comparator.setReference(data.f0[x]);
-					if (ascending) {
-						assertTrue(comparator.compareToReference(data.f1[y]) > 0);
-					} else {
-						assertTrue(comparator.compareToReference(data.f1[y]) < 0);
-					}
-				}
-			}
-		} catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Exception in test: " + e.getMessage());
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	protected TypePairComparator<T, R> getComparator(boolean ascending) {
-		TypePairComparator<T, R> comparator = createComparator(ascending);
-		if (comparator == null) {
-			throw new RuntimeException("Test case corrupt. Returns null as comparator.");
-		}
-		return comparator;
-	}
-
-	protected Tuple2<T[], R[]> getSortedData() {
-		Tuple2<T[], R[]> data = getSortedTestData();
-		if (data == null || data.f0 == null || data.f1 == null) {
-			throw new RuntimeException("Test case corrupt. Returns null as test data.");
-		}
-		if (data.f0.length < 2 || data.f1.length < 2) {
-			throw new RuntimeException("Test case does not provide enough sorted test data.");
-		}
-
-		return data;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 1534ebf..0479c0b 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.AggregateOperator;
 import org.apache.flink.api.java.operators.CoGroupRawOperator;
 import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
 import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.operators.SortedGrouping;
 import org.apache.flink.api.java.operators.UdfOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index d25fa9d..22be45a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -20,13 +20,14 @@ package org.apache.flink.api.java.table
 
 import java.lang.reflect.Modifier
 
+import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.aggregation.AggregationFunction
 import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
-import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping}
+import Keys.ExpressionKeys
+import org.apache.flink.api.java.operators.{GroupReduceOperator, MapOperator, UnsortedGrouping}
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
 import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields
 import org.apache.flink.api.table.plan._

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index a79c843..1e0a2b3 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.java.aggregation.AggregationFunction;
 import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.operators.SingleInputOperator;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
index 3eb6472..a6cce43 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
@@ -21,10 +21,10 @@ package org.apache.flink.api.scala
 import org.apache.commons.lang3.tuple.{ImmutablePair, Pair}
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.functions.{CoGroupFunction, Partitioner, RichCoGroupFunction}
-import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.operators.{Keys, Order}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.util.Collector
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 35f0faf..151e6b3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
-import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.operators.{Keys, Order}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.Utils.CountHelper
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
 import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.java.{DataSet => JavaDataSet, Utils}
@@ -93,6 +93,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 
   /**
    * Returns the execution environment associated with the current DataSet.
+ *
    * @return associated execution environment
    */
   def getExecutionEnvironment: ExecutionEnvironment =
@@ -515,7 +516,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * Convenience method to get the count (number of elements) of a DataSet
    *
    * @return A long integer that represents the number of elements in the set
-   *
    * @see org.apache.flink.api.java.Utils.CountHelper
    */
   @throws(classOf[Exception])
@@ -531,7 +531,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * As DataSet can contain a lot of data, this method should be used with caution.
    *
    * @return A Seq containing the elements of the DataSet
-   *
    * @see org.apache.flink.api.java.Utils.CollectHelper
    */
   @throws(classOf[Exception])
@@ -1369,7 +1368,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   /**
    * Range-partitions a DataSet on the specified fields.
    *
-  '''important:''' This operation requires an extra pass over the DataSet to compute the
range
+  *'''important:''' This operation requires an extra pass over the DataSet to compute the
range
    * boundaries and shuffles the whole DataSet over the network.
    * This can take significant amount of time.
    */
@@ -1385,7 +1384,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   /**
    * Range-partitions a DataSet using the specified key selector function.
    *
-  '''important:''' This operation requires an extra pass over the DataSet to compute the
range
+  *'''important:''' This operation requires an extra pass over the DataSet to compute the
range
    * boundaries and shuffles the whole DataSet over the network.
    * This can take significant amount of time.
    */
@@ -1516,6 +1515,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   /**
    * Writes `this` DataSet to the specified location. This uses [[AnyRef.toString]] on
    * each element.
+ *
    * @see org.apache.flink.api.java.DataSet#writeAsText(String)
    */
   def writeAsText(
@@ -1532,6 +1532,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * Writes `this` DataSet to the specified location as CSV file(s).
    *
    * This only works on Tuple DataSets. For individual tuple fields [[AnyRef.toString]] is
used.
+ *
    * @see org.apache.flink.api.java.DataSet#writeAsText(String)
    */
   def writeAsCsv(
@@ -1623,8 +1624,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * *
    * Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed.
    * This uses [[AnyRef.toString]] on each element.
-   * @param sinkIdentifier The string to prefix the output with.
-   * 
+ *
+   * @param sinkIdentifier The string to prefix the output with. 
    * @deprecated Use [[printOnTaskManager(String)]] instead.
    */
   @Deprecated
@@ -1636,8 +1637,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   /**
    * Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed.
    * This uses [[AnyRef.toString]] on each element.
-   * @param sinkIdentifier The string to prefix the output with.
-   * 
+ *
+   * @param sinkIdentifier The string to prefix the output with. 
    * @deprecated Use [[printOnTaskManager(String)]] instead.
    */
   @Deprecated

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index 7d5419b..bb8287a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -19,11 +19,11 @@ package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceFunction,
Partitioner, ReduceFunction}
-import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.operators.{Keys, Order}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.scala.operators.ScalaAggregateOperator
 import org.apache.flink.util.Collector

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
index 91f8c85..ace0790 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
@@ -36,6 +37,7 @@ import scala.reflect.ClassTag
  *   val right = ...
  *   val coGroupResult = left.coGroup(right).where(...).isEqualTo(...)
  * }}}
+ *
  * @tparam L The type of the left input of the coGroup.
  * @tparam R The type of the right input of the coGroup.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index 49b2701..71f2bfb 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction, Partitioner,
RichFlatJoinFunction}
+import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlatJoinFunction
@@ -256,6 +257,7 @@ private[flink] abstract class UnfinishedJoinOperationBase[L, R, O <:
JoinFunctio
  *   val right = ...
  *   val joinResult = left.join(right).where(...).equalTo(...)
  * }}}
+ *
  * @tparam L The type of the left input of the join.
  * @tparam R The type of the right input of the join.
  */
@@ -287,6 +289,7 @@ class UnfinishedJoinOperation[L, R](
  *     (first, second) => ...
  *   }
  * }}}
+ *
  * @tparam L The type of the left input of the join.
  * @tparam R The type of the right input of the join.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index 5db7a91..e8bc3a4 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -22,11 +22,12 @@ import java.util
 import java.util.regex.{Pattern, Matcher}
 
 import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
 InvalidFieldReferenceException, FlatFieldDescriptor}
 import org.apache.flink.api.common.typeutils._
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import Keys.ExpressionKeys
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
index 08d0242..ddb45a4 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
@@ -19,10 +19,10 @@
 package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.operators.Keys
 
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import Keys.ExpressionKeys
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 254af19..f4b6e7f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -37,7 +37,7 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index afbd8ab..cf40a3b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index dd8dec9..d0617d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
index 46059a6..ab5ebf5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
@@ -20,7 +20,8 @@ package org.apache.flink.api.scala.operators
 import java.util
 
 import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException
+import org.apache.flink.api.common.operators.Keys
+import Keys.IncompatibleKeysException
 import org.junit.Assert
 import org.junit.Test
 import org.apache.flink.api.scala._

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 576ecdf..2dabb56 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -19,9 +19,9 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.javaApiOperators.GroupCombineITCase
+import org.apache.flink.test.javaApiOperators.GroupCombineITCase.ScalaGroupCombineFunctionExample
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.util.Collector
@@ -43,7 +43,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
       .map(str => Tuple1(str))
 
     // all methods on DataSet
-    ds.combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample())
+    ds.combineGroup(new ScalaGroupCombineFunctionExample())
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
@@ -51,7 +51,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     // all methods on UnsortedGrouping
     ds.groupBy(0)
-      .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample())
+      .combineGroup(new ScalaGroupCombineFunctionExample())
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     ds.groupBy(0)
@@ -60,7 +60,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     // all methods on SortedGrouping
     ds.groupBy(0).sortGroup(0, Order.ASCENDING)
-      .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample())
+      .combineGroup(new ScalaGroupCombineFunctionExample())
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     ds.groupBy(0).sortGroup(0, Order.ASCENDING)

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
index 0d9ea9e..81a7d7e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
@@ -20,7 +20,8 @@ package org.apache.flink.api.scala.operators
 import java.util
 
 import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException
+import org.apache.flink.api.common.operators.Keys
+import Keys.IncompatibleKeysException
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.junit.{Assert, Test}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
index 70276cb..46495d0 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.types
 
 import java.io.{DataInput, DataOutput}
 
-import org.apache.flink.api.java.`type`.extractor.TypeExtractorTest.CustomTuple
+import org.apache.flink.api.java.typeutils.TypeExtractorTest.CustomTuple
 import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.hadoop.io.Writable
 import org.junit.{Assert, Test}
@@ -602,7 +602,7 @@ class TypeInformationGenTest {
     // This checks the condition in checkCollection. If this fails with IllegalArgumentException,
     // then things like "env.fromElements((),(),())" won't work.
     import scala.collection.JavaConversions._
-    CollectionInputFormat.checkCollection(Seq((),(),()), (new UnitTypeInfo).getTypeClass)
+    CollectionInputFormat.checkCollection(Seq((),(),()), (new UnitTypeInfo).getTypeClass())
   }
 }
 


Mime
View raw message