flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [1/5] flink git commit: [FLINK-7337] [table] Efficient handling of rowtime timestamps
Date Wed, 23 Aug 2017 08:15:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6ed5815e8 -> 47944b1bb


http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index 04b63a1..9210c00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -17,23 +17,21 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
-import java.sql.Timestamp
 import java.util
 import java.util.{List => JList}
 
-import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state._
-import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.slf4j.LoggerFactory
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.{Logger, LoggerFactory}
 
 
 /**
@@ -52,6 +50,8 @@ abstract class RowTimeUnboundedOver(
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
 
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
   protected var output: CRow = _
   // state to hold the accumulators of the aggregations
   private var accumulatorState: ValueState[Row] = _
@@ -60,7 +60,6 @@ abstract class RowTimeUnboundedOver(
   // list to sort timestamps to access rows in timestamp order
   private var sortedTimestamps: util.LinkedList[Long] = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   protected var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
@@ -111,7 +110,7 @@ abstract class RowTimeUnboundedOver(
     // register state-cleanup timer
     registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
 
-    val timestamp = SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp])
+    val timestamp = input.getField(rowTimeIdx).asInstanceOf[Long]
     val curWatermark = ctx.timerService().currentWatermark()
 
     // discard late record

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
index 4ec5239..16e4a0b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -59,7 +59,7 @@ abstract class TimeWindowPropertyCollector[T](
     if (windowRowtimeOffset.isDefined) {
       output.setField(
         lastFieldPos + windowRowtimeOffset.get,
-        SqlFunctions.internalToTimestamp(windowEnd - 1))
+        windowEnd - 1)
     }
 
     wrappedCollector.collect(record)

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapFunction.scala
new file mode 100644
index 0000000..6b4f87e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapFunction.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+import _root_.java.lang.{Boolean => JBool}
+
+/**
+  * Convert [[CRow]] to a [[JTuple2]] containing a [[Row]].
+  */
+class CRowToJavaTupleMapFunction extends MapFunction[CRow, JTuple2[JBool, Row]] {
+
+  val out: JTuple2[JBool, Row] = new JTuple2(true.asInstanceOf[JBool], null.asInstanceOf[Row])
+
+  override def map(cRow: CRow): JTuple2[JBool, Row] = {
+    out.f0 = cRow.change
+    out.f1 = cRow.row
+    out
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala
new file mode 100644
index 0000000..95f304d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * Convert [[CRow]] to a [[JTuple2]].
+  */
+class CRowToJavaTupleMapRunner(
+    name: String,
+    code: String,
+    @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
+  extends RichMapFunction[CRow, Any]
+  with ResultTypeQueryable[JTuple2[JBool, Any]]
+  with Compiler[MapFunction[Row, Any]] {
+
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Row, Any] = _
+  private var tupleWrapper: JTuple2[JBool, Any] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+    tupleWrapper = new JTuple2[JBool, Any]()
+  }
+
+  override def map(in: CRow): JTuple2[JBool, Any] = {
+    tupleWrapper.f0 = in.change
+    tupleWrapper.f1 = function.map(in.row)
+    tupleWrapper
+  }
+
+  override def getProducedType: TypeInformation[JTuple2[JBool, Any]] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToRowMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToRowMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToRowMapFunction.scala
new file mode 100644
index 0000000..050f15f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToRowMapFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Maps a CRow to a Row.
+  */
+class CRowToRowMapFunction extends MapFunction[CRow, Row] {
+
+  override def map(value: CRow): Row = value.row
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapFunction.scala
new file mode 100644
index 0000000..6461cc4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Convert [[CRow]] to a [[Tuple2]].
+  */
+class CRowToScalaTupleMapFunction extends MapFunction[CRow, (Boolean, Row)] {
+
+  override def map(cRow: CRow): (Boolean, Row) = {
+    (cRow.change, cRow.row)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapRunner.scala
new file mode 100644
index 0000000..c7d71a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapRunner.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * Convert [[CRow]] to a [[Tuple2]].
+  */
+class CRowToScalaTupleMapRunner(
+  name: String,
+  code: String,
+  @transient var returnType: TypeInformation[(Boolean, Any)])
+  extends RichMapFunction[CRow, (Boolean, Any)]
+  with ResultTypeQueryable[(Boolean, Any)]
+  with Compiler[MapFunction[Row, Any]] {
+
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Row, Any] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def map(in: CRow): (Boolean, Any) =
+    (in.change, function.map(in.row))
+
+  override def getProducedType: TypeInformation[(Boolean, Any)] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
index e0e054b..824f3fb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
@@ -20,13 +20,14 @@ package org.apache.flink.table.typeutils
 
 import java.sql.Timestamp
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
-import org.apache.flink.api.common.typeutils.TypeComparator
-import org.apache.flink.api.common.typeutils.base.{SqlTimestampComparator, SqlTimestampSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.base.{LongSerializer, SqlTimestampComparator, SqlTimestampSerializer}
 
 /**
   * Type information for indicating event or processing time. However, it behaves like a
-  * regular SQL timestamp.
+  * regular SQL timestamp but is serialized as Long.
   */
 class TimeIndicatorTypeInfo(val isEventTime: Boolean)
   extends SqlTimeTypeInfo[Timestamp](
@@ -34,6 +35,12 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean)
     SqlTimestampSerializer.INSTANCE,
     classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) {
 
+  // this replaces the effective serializer by a LongSerializer
+  // it is a hacky but efficient solution to keep the object creation overhead low but still
+  // be compatible with the corresponding SqlTimestampTypeInfo
+  override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Timestamp] =
+    LongSerializer.INSTANCE.asInstanceOf[TypeSerializer[Timestamp]]
+
   override def toString: String =
     s"TimeIndicatorTypeInfo(${if (isEventTime) "rowtime" else "proctime" })"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index ba044be..ba36e18 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.table.runtime.harness
 
 import java.lang.{Long => JLong}
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTS}
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator
@@ -34,7 +34,7 @@ import org.junit.Test
 
 class OverWindowHarnessTest extends HarnessTestBase{
 
-  protected var queryConfig =
+  protected var queryConfig: StreamQueryConfig =
     new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
 
   @Test
@@ -60,75 +60,75 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1)
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 1L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 1L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "bbb", 10L: JLong), true)))
+      CRow(Row.of(1L: JLong, "bbb", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 2L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 2L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 3L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 3L: JLong), change = true)))
 
     // register cleanup timer with 4100
     testHarness.setProcessingTime(1100)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "bbb", 20L: JLong), true)))
+      CRow(Row.of(1L: JLong, "bbb", 20L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 4L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 4L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 5L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 5L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 6L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 6L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(1), "bbb", 30L: JLong), true)))
+      CRow(Row.of(1L: JLong, "bbb", 30L: JLong), change = true)))
 
     // register cleanup timer with 6001
     testHarness.setProcessingTime(3001)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 7L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 7L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 8L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 8L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 9L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 9L: JLong), change = true)))
 
     // trigger cleanup timer and register cleanup timer with 9002
     testHarness.setProcessingTime(6002)
     testHarness.processElement(new StreamRecord(
-        CRow(Row.of(toTS(2), "aaa", 10L: JLong), true)))
+        CRow(Row.of(2L: JLong, "aaa", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(2), "bbb", 40L: JLong), true)))
+      CRow(Row.of(2L: JLong, "bbb", 40L: JLong), change = true)))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
+      CRow(Row.of(1L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true)))
+      CRow(Row.of(1L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true)))
+      CRow(Row.of(1L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(1), "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true)))
+      CRow(Row.of(1L: JLong, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 10L: JLong, 10L: JLong, 10L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true)))
+      CRow(Row.of(2L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -160,51 +160,51 @@ class OverWindowHarnessTest extends HarnessTestBase{
     // register cleanup timer with 3003
     testHarness.setProcessingTime(3)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 1L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 1L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 10L: JLong), change = true)))
 
     testHarness.setProcessingTime(4)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 2L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 2L: JLong), change = true)))
 
     // trigger cleanup timer and register cleanup timer with 6003
     testHarness.setProcessingTime(3003)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 3L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 3L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 20L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 20L: JLong), change = true)))
 
     testHarness.setProcessingTime(5)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 4L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 4L: JLong), change = true)))
 
     // register cleanup timer with 9002
     testHarness.setProcessingTime(6002)
 
     testHarness.setProcessingTime(7002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 5L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 5L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 6L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 6L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 30L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 30L: JLong), change = true)))
 
     // register cleanup timer with 14002
     testHarness.setProcessingTime(11002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 7L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 7L: JLong), change = true)))
 
     testHarness.setProcessingTime(11004)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 8L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 8L: JLong), change = true)))
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 9L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 9L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 40L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 40L: JLong), change = true)))
 
     testHarness.setProcessingTime(11006)
 
@@ -214,33 +214,33 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same proc timestamp have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 3L: JLong, 3L: JLong, 4L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 3L: JLong, 3L: JLong, 4L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 20L: JLong, 20L: JLong, 20L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 20L: JLong, 20L: JLong, 20L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 4L: JLong, 4L: JLong, 4L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 4L: JLong, 4L: JLong, 4L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 5L: JLong, 5L: JLong, 6L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 5L: JLong, 5L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 30L: JLong, 30L: JLong, 30L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 30L: JLong, 30L: JLong, 30L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 8L: JLong, 7L: JLong, 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 8L: JLong, 7L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 9L: JLong, 7L: JLong, 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 9L: JLong, 7L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 10L: JLong, 7L: JLong, 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -268,69 +268,69 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1003)
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 1L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 1L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 2L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 2L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 3L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 3L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 20L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 20L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 4L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 4L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 5L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 5L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 6L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 6L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 30L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 30L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 7L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 7L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 8L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 8L: JLong), change = true)))
 
     // trigger cleanup timer and register cleanup timer with 8003
     testHarness.setProcessingTime(5003)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 9L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 9L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 40L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 40L: JLong), change = true)))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 9L: JLong, 9L: JLong, 9L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 9L: JLong, 9L: JLong, 9L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true)))
+      CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(0), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true)))
+      CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -361,51 +361,51 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(1)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 1L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 1L: JLong), change = true)))
 
     testHarness.processWatermark(2)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(3), "bbb", 10L: JLong), true)))
+      CRow(Row.of(3L: JLong, "bbb", 10L: JLong), change = true)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
 
     testHarness.processWatermark(4001)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4002), "aaa", 3L: JLong), true)))
+      CRow(Row.of(4002L: JLong, "aaa", 3L: JLong), change = true)))
 
     testHarness.processWatermark(4002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4003), "aaa", 4L: JLong), true)))
+      CRow(Row.of(4003L: JLong, "aaa", 4L: JLong), change = true)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4801), "bbb", 25L: JLong), true)))
+      CRow(Row.of(4801L: JLong, "bbb", 25L: JLong), change = true)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true)))
+      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true)))
+      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
 
     testHarness.processWatermark(19000)
 
@@ -415,10 +415,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is removed after max retention time
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 3000
+      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 3000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 4500
+      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 4500
     testHarness.processWatermark(20010) // compute output
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
@@ -430,7 +430,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is only removed if all data was processed
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20011), "ccc", 3L: JLong), true))) // clean-up 6500
+      CRow(Row.of(20011L: JLong, "ccc", 3L: JLong), change = true))) // clean-up 6500
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
     testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
@@ -450,40 +450,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same row-time have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(2L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(3), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
+      CRow(Row.of(3L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4002), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
+      CRow(Row.of(4002L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4003), "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true)))
+      CRow(Row.of(4003L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4801), "bbb", 25L: JLong, 25L: JLong, 25L: JLong), true)))
+      CRow(Row.of(4801L: JLong, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true)))
+      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true)))
+      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 25L: JLong, 30L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 8L: JLong, 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20011), "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true)))
+      CRow(Row.of(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), change = true)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -511,47 +511,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(801), "aaa", 1L: JLong), true)))
+      CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true)))
+      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true)))
+      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true)))
+      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true)))
+      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
 
     testHarness.processWatermark(19000)
 
@@ -561,10 +561,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is removed after max retention time
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 3000
+      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 3000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 4500
+      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 4500
     testHarness.processWatermark(20010) // compute output
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
@@ -575,7 +575,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is only removed if all data was processed
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20011), "ccc", 3L: JLong), true))) // clean-up 6500
+      CRow(Row.of(20011L: JLong, "ccc", 3L: JLong), change = true))) // clean-up 6500
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
     testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
@@ -595,40 +595,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
+      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true)))
+      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true)))
+      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true)))
+      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), change = true)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20011), "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true)))
+      CRow(Row.of(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), change = true)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -659,47 +659,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1000)
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(801), "aaa", 1L: JLong), true)))
+      CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true)))
+      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true)))
+      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true)))
+      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true)))
+      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
 
     testHarness.processWatermark(19000)
 
@@ -712,10 +712,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 5000
+      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 5000
+      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 5000
 
     assert(testHarness.numKeyedStateEntries() > 0)
     testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
@@ -733,38 +733,38 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same row-time have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
+      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true)))
+      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true)))
+      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true)))
+      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 1L: JLong, 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), change = true)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -792,47 +792,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1000)
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(801), "aaa", 1L: JLong), true)))
+      CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true)))
+      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true)))
+      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true)))
+      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true)))
+      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
 
     testHarness.processWatermark(19000)
 
@@ -845,10 +845,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 5000
+      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 5000
+      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 5000
 
     assert(testHarness.numKeyedStateEntries() > 0)
     testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
@@ -865,38 +865,38 @@ class OverWindowHarnessTest extends HarnessTestBase{
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
+      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true)))
+      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true)))
+      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true)))
+      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true)))
+      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true)))
+      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true)))
+      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), change = true)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true)))
+      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true)))
+      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
index 18ba6bb..9490039 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.table.runtime.harness
 import java.lang.{Integer => JInt, Long => JLong}
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTS}
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
@@ -189,35 +188,35 @@ class SortProcessFunctionHarnessTest {
 
       // timestamp is ignored in processing time
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", toTS(1001)), true)))
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1001L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", toTS(2002)), true)))
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 2002L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", toTS(2002)), true)))
+      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 2002L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2002)), true)))
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2002L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true)))
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 2002L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2004)), true)))
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2004L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2006)), true)))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 2006L: JLong), true)))
 
     // move watermark forward
     testHarness.processWatermark(2007)
 
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", toTS(2008)), true)))
+      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 2008L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true))) // too late
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 2002L: JLong), true))) // too late
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2019)), true))) // too early
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2019L: JLong), true))) // too early
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", toTS(2008)), true)))
+      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 2008L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2010)), true)))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 2010L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", toTS(2008)), true)))
+      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 2008L: JLong), true)))
 
     // move watermark forward
     testHarness.processWatermark(2012)
@@ -231,29 +230,29 @@ class SortProcessFunctionHarnessTest {
     // (10,0) (11,1) (12,2) (12,1) (12,0)
     expectedOutput.add(new Watermark(3))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", toTS(1001)), true)))
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1001L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2002)), true)))
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2002L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", toTS(2002)), true)))
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 2002L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", toTS(2002)), true)))
+      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 2002L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true)))
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 2002L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2004)), true)))
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2004L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2006)), true)))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 2006L: JLong), true)))
     expectedOutput.add(new Watermark(2007))
 
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", toTS(2008)), true)))
+      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 2008L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", toTS(2008)), true)))
+      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 2008L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", toTS(2008)), true)))
+      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 2008L: JLong), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2010)), true)))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 2010L: JLong), true)))
 
     expectedOutput.add(new Watermark(2012))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 4c478de..24d8695 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -63,7 +63,8 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val stream = env
       .fromCollection(data)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+    val table = stream.toTable(
+      tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime)
 
     val t = table.select('rowtime.cast(Types.STRING))
 
@@ -123,6 +124,13 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime)
     val func = new TableFunc
 
+    // we test if this can be executed with any exceptions
+    table.join(func('proctime, 'proctime, 'string) as 's).toAppendStream[Row]
+
+    // we test if this can be executed with any exceptions
+    table.join(func('rowtime, 'rowtime, 'string) as 's).toAppendStream[Row]
+
+    // we can only test rowtime, not proctime
     val t = table.join(func('rowtime, 'proctime, 'string) as 's).select('rowtime, 's)
 
     val results = t.toAppendStream[Row]

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 82ed81c..830359f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -421,8 +421,8 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
           ctx: ProcessFunction[(Boolean, Row), Row]#Context,
           out: Collector[Row]): Unit = {
 
-          val rowTS: Long = row._2.getField(2).asInstanceOf[Long]
-          if (ctx.timestamp() == rowTS) {
+          val rowTs = row._2.getField(2).asInstanceOf[Long]
+          if (ctx.timestamp() == rowTs) {
             out.collect(row._2)
           }
         }


Mime
View raw message