Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B55C183A8 for ; Mon, 1 Feb 2016 14:57:22 +0000 (UTC) Received: (qmail 3383 invoked by uid 500); 1 Feb 2016 14:57:22 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 3277 invoked by uid 500); 1 Feb 2016 14:57:22 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 3154 invoked by uid 99); 1 Feb 2016 14:57:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Feb 2016 14:57:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2B4C3E03BE; Mon, 1 Feb 2016 14:57:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Mon, 01 Feb 2016 14:57:25 -0000 Message-Id: In-Reply-To: <711c397b82634a239bc6b97852651157@git.apache.org> References: <711c397b82634a239bc6b97852651157@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] flink git commit: [FLINK-3305] [core] Remove limited and inconsistent auto-magic for Joda Time [FLINK-3305] [core] Remove limited and inconsistent auto-magic for Joda Time The auto-magic for Joda Time was limited to very few classes. It was intransparent what cases would be handled. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6110dc3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6110dc3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6110dc3 Branch: refs/heads/master Commit: b6110dc35a17340653a39209038041a5e28054b4 Parents: c4bc47a Author: Stephan Ewen Authored: Fri Jan 29 18:51:03 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 1 14:46:06 2016 +0100 ---------------------------------------------------------------------- flink-java/pom.xml | 30 +++++++------- .../flink/api/java/ExecutionEnvironment.java | 26 ++++++++---- .../typeutils/runtime/kryo/Serializers.java | 42 +++----------------- .../api/operators/TimestampedCollector.java | 12 +++--- .../runtime/KryoGenericTypeSerializerTest.scala | 37 +++++++---------- .../api/scala/runtime/TupleSerializerTest.scala | 29 ++++---------- 6 files changed, 68 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 8383a4a..a31e89d 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -70,22 +70,6 @@ under the License. - de.javakaffee - kryo-serializers - 0.27 - - - - joda-time - joda-time - - - - org.joda - joda-convert - - - com.google.guava guava ${guava.version} @@ -104,6 +88,20 @@ under the License. test-jar test + + + joda-time + joda-time + 2.5 + test + + + + org.joda + joda-convert + 1.7 + test + http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 10cb5e3..253ffa3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -18,9 +18,10 @@ package org.apache.flink.api.java; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; +import com.esotericsoftware.kryo.Serializer; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -46,7 +47,10 @@ import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.operators.OperatorTranslation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.*; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -54,14 +58,22 @@ import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; import org.apache.flink.util.SplittableIterator; import org.apache.flink.util.Visitor; + import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.esotericsoftware.kryo.Serializer; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; /** * The ExecutionEnvironment is the context in which a program is executed. A http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java index 6903d35..8bac729 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java @@ -23,13 +23,9 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CollectionSerializer; - -import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer; -import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.specific.SpecificRecordBase; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; @@ -37,11 +33,13 @@ import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.joda.time.DateTime; -import org.joda.time.Interval; import java.io.Serializable; -import java.lang.reflect.*; +import java.lang.reflect.Field; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.Modifier; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -165,36 +163,6 @@ public class Serializers { // ClassTag tag = scala.reflect.ClassTag$.MODULE$.apply(avroType); // reg.registerTypeWithKryoSerializer(avroType, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); } - - /** - * Currently, the following classes of JodaTime are supported: - * - DateTime - * - Interval - * - * The following chronologies are supported: (see {@link de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer}) - *
    - *
  • {@link org.joda.time.chrono.ISOChronology}
  • - *
  • {@link org.joda.time.chrono.CopticChronology}
  • - *
  • {@link org.joda.time.chrono.EthiopicChronology}
  • - *
  • {@link org.joda.time.chrono.GregorianChronology}
  • - *
  • {@link org.joda.time.chrono.JulianChronology}
  • - *
  • {@link org.joda.time.chrono.IslamicChronology}
  • - *
  • {@link org.joda.time.chrono.BuddhistChronology}
  • - *
  • {@link org.joda.time.chrono.GJChronology}
  • - *
- */ - public static void registerJodaTime(ExecutionConfig reg) { - reg.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class); - reg.registerTypeWithKryoSerializer(Interval.class, JodaIntervalSerializer.class); - } - - /** - * Register less frequently used serializers - */ - public static void registerJavaUtils(ExecutionConfig reg) { - // BitSet, Regex is already present through twitter-chill. - } - // -------------------------------------------------------------------------------------------- // Custom Serializers http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java index 62514fc..5af5109 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; -import org.joda.time.Instant; /** * Wrapper around an {@link Output} for user functions that expect a {@link Collector}. * Before giving the {@link TimestampedCollector} to a user function you must set - * the {@link Instant timestamp} that should be attached to emitted elements. Most operators - * would set the {@link Instant timestamp} of the incoming {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here. + * the timestamp that should be attached to emitted elements. Most operators + * would set the timestamp of the incoming + * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here. * * @param The type of the elments that can be emitted. */ @@ -52,8 +53,9 @@ public class TimestampedCollector implements Collector { } /** - * Sets the {@link Instant timestamp} that is attached to elements that get emitted using - * {@link #collect} + * Sets the timestamp (long milliseconds) that is attached to elements that get emitted using + * {@link #collect(Object)} + * * @param timestamp The timestamp in milliseconds */ public void setTimestamp(long timestamp) { http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala index 859ad2d..08a0a96 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala @@ -17,17 +17,18 @@ */ package org.apache.flink.api.scala.runtime +import com.esotericsoftware.kryo.{Kryo, Serializer} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.SerializerTestInstance import org.apache.flink.api.java.typeutils.GenericTypeInfo -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers + +import org.joda.time.LocalDate + import org.junit.Test + import scala.reflect._ -import org.joda.time.{DateTime, LocalDate} -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.io.Output -import com.esotericsoftware.kryo.io.Input class KryoGenericTypeSerializerTest { @@ -84,56 +85,49 @@ class KryoGenericTypeSerializerTest { } @Test - def testThrowableSerialization: Unit = { + def testThrowableSerialization(): Unit = { val a = List(new RuntimeException("Hello"), new RuntimeException("there")) runTests(a) } @Test - def jodaSerialization: Unit = { - val a = List(new DateTime(1), new DateTime(2)) - - runTests(a) - } - - @Test - def jodaSerialization1: Unit = { + def jodaSerialization(): Unit = { val a = List(new LocalDate(1), new LocalDate(2)) runTests(a) } @Test - def testScalaListSerialization: Unit = { + def testScalaListSerialization(): Unit = { val a = List(42,1,49,1337) runTests(a) } @Test - def testScalaMutablelistSerialization: Unit = { + def testScalaMutablelistSerialization(): Unit = { val a = scala.collection.mutable.ListBuffer(42,1,49,1337) runTests(a) } @Test - def testScalaMapSerialization: Unit = { + def testScalaMapSerialization(): Unit = { val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337)) runTests(Seq(a)) } @Test - def testMutableMapSerialization: Unit ={ + def testMutableMapSerialization(): Unit ={ val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3")) runTests(Seq(a)) } @Test - def testScalaListComplexTypeSerialization: Unit = { + def testScalaListComplexTypeSerialization(): Unit = { val a = ComplexType("1234", 42, List(1,2,3,4)) val b = ComplexType("4321", 24, List(4,3,2,1)) val c = ComplexType("1337", 1, List(1)) @@ -143,7 +137,7 @@ class KryoGenericTypeSerializerTest { } @Test - def testHeterogenousScalaList: Unit = { + def testHeterogenousScalaList(): Unit = { val a = new DerivedType("foo", "bar") val b = new BaseType("foobar") val c = new DerivedType2("bar", "foo") @@ -201,7 +195,6 @@ class KryoGenericTypeSerializerTest { // Register the custom Kryo Serializer val conf = new ExecutionConfig conf.registerTypeWithKryoSerializer(classOf[LocalDate], classOf[LocalDateSerializer]) - Serializers.registerJodaTime(conf) val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]]) val serializer = typeInfo.createSerializer(conf) val typeClass = typeInfo.getTypeClass http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala index c436d62..368204b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala @@ -18,20 +18,21 @@ package org.apache.flink.api.scala.runtime import java.util +import java.util.Random + +import org.apache.flink.api.scala._ import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.runtime.kryo.{Serializers, KryoSerializer} import org.apache.flink.util.StringUtils -import org.joda.time.DateTime + import org.joda.time.LocalDate + import org.junit.Assert import org.junit.Test -import org.apache.flink.api.scala._ + import scala.collection.JavaConverters._ -import java.util.Random class TupleSerializerTest { @@ -98,20 +99,6 @@ class TupleSerializerTest { val rnd: Random = new Random(807346528946L) val testTuples = Array( - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)), - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)), - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)), - ("", rnd.nextDouble), - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)), - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt))) - runTests(testTuples) - } - - @Test - def testTuple2StringJodaTime2(): Unit = { - val rnd: Random = new Random(807346528946L) - - val testTuples = Array( (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)), (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)), (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)), @@ -208,9 +195,9 @@ class TupleSerializerTest { private final def runTests[T <: Product : TypeInformation](instances: Array[T]) { try { // Register the custom Kryo Serializer - val conf = new ExecutionConfig + val conf = new ExecutionConfig() conf.registerTypeWithKryoSerializer(classOf[LocalDate], classOf[LocalDateSerializer]) - Serializers.registerJodaTime(conf) + val tupleTypeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]] val serializer = tupleTypeInfo.createSerializer(conf) val tupleClass = tupleTypeInfo.getTypeClass