flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-7763] [table] Fix testing RowSink for enabled object reuse.
Date Mon, 16 Oct 2017 08:28:30 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 664a04c89 -> 80c23de70


[FLINK-7763] [table] Fix testing RowSink for enabled object reuse.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80c23de7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80c23de7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80c23de7

Branch: refs/heads/release-1.3
Commit: 80c23de709a56f9ea3b5ecd04876d95a1df7db96
Parents: 664a04c
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Oct 13 11:30:39 2017 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Oct 16 09:59:25 2017 +0200

----------------------------------------------------------------------
 .../flink/table/sinks/StreamTableSinksITCase.scala    | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80c23de7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
index 47c55f1..723b603 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
@@ -44,6 +44,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
   @Test(expected = classOf[TableException])
   def testAppendSinkOnUpdatingTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
@@ -59,6 +60,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testAppendSinkOnAppendTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -87,6 +89,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testRetractSinkOnUpdatingTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -118,6 +121,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase
{
   @Test
   def testRetractSinkOnAppendTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -152,6 +156,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase
{
   @Test
   def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -205,6 +210,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase
{
   @Test
   def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -242,6 +248,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase
{
   @Test
   def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -279,6 +286,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase
{
   @Test
   def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -316,6 +324,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase
{
   @Test
   def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -497,8 +506,11 @@ object RowCollector {
     new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
 
   def addValue(value: JTuple2[JBool, Row]): Unit = {
+
+    // make a copy
+    val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1))
     sink.synchronized {
-      sink += value
+      sink += copy
     }
   }
 


Mime
View raw message