flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-7398] [table] Add Logging trait to prevent serialization of Logger.
Date Tue, 29 Aug 2017 20:12:27 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 51253c7f0 -> eec261e92


[FLINK-7398] [table] Add Logging trait to prevent serialization of Logger.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3167f72d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3167f72d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3167f72d

Branch: refs/heads/release-1.3
Commit: 3167f72d95719ee551e798196fd26b9503e2dfd9
Parents: 51253c7
Author: Haohui Mai <wheat9@apache.org>
Authored: Wed Aug 23 14:13:07 2017 -0700
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Aug 29 17:22:30 2017 +0200

----------------------------------------------------------------------
 .../table/catalog/ExternalCatalogSchema.scala   |  8 +++---
 .../table/catalog/ExternalTableSourceUtil.scala |  6 ++---
 .../datastream/DataStreamGroupAggregate.scala   |  9 +++----
 .../DataStreamGroupWindowAggregate.scala        | 11 ++++----
 .../datastream/DataStreamOverAggregate.scala    | 12 ++++-----
 .../runtime/CRowCorrelateProcessRunner.scala    |  7 +++--
 .../table/runtime/CRowInputMapRunner.scala      |  7 +++--
 .../runtime/CRowInputTupleOutputMapRunner.scala | 15 +++++------
 .../table/runtime/CRowOutputMapRunner.scala     |  7 +++--
 .../flink/table/runtime/CRowProcessRunner.scala |  7 +++--
 .../table/runtime/CorrelateFlatMapRunner.scala  |  7 +++--
 .../flink/table/runtime/FlatJoinRunner.scala    |  9 +++----
 .../flink/table/runtime/FlatMapRunner.scala     |  9 +++----
 .../apache/flink/table/runtime/MapRunner.scala  |  9 +++----
 .../flink/table/runtime/MapSideJoinRunner.scala |  9 +++----
 .../aggregate/AggregateAggFunction.scala        |  5 ++--
 .../runtime/aggregate/DataSetAggFunction.scala  |  5 ++--
 .../aggregate/DataSetFinalAggFunction.scala     |  5 ++--
 .../aggregate/DataSetPreAggFunction.scala       |  6 ++---
 ...SetSessionWindowAggReduceGroupFunction.scala |  8 +++---
 ...aSetSessionWindowAggregatePreProcessor.scala |  8 +++---
 ...tSlideTimeWindowAggReduceGroupFunction.scala |  6 ++---
 ...taSetSlideWindowAggReduceGroupFunction.scala |  6 ++---
 ...umbleCountWindowAggReduceGroupFunction.scala |  6 ++---
 ...TumbleTimeWindowAggReduceGroupFunction.scala |  6 ++---
 .../aggregate/DataSetWindowAggMapFunction.scala |  6 ++---
 .../aggregate/GroupAggProcessFunction.scala     |  7 ++---
 .../aggregate/ProcTimeBoundedRangeOver.scala    |  6 ++---
 .../aggregate/ProcTimeBoundedRowsOver.scala     | 10 +++----
 .../aggregate/ProcTimeUnboundedOver.scala       | 10 +++----
 .../aggregate/RowTimeBoundedRangeOver.scala     |  6 ++---
 .../aggregate/RowTimeBoundedRowsOver.scala      |  6 ++---
 .../aggregate/RowTimeUnboundedOver.scala        |  8 +++---
 .../runtime/io/CRowValuesInputFormat.scala      |  7 +++--
 .../table/runtime/io/ValuesInputFormat.scala    |  9 +++----
 .../org/apache/flink/table/util/Logging.scala   | 28 ++++++++++++++++++++
 36 files changed, 152 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index ad96e77..2783bca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.table.catalog
 
-import java.util.{Collections => JCollections, Collection => JCollection, LinkedHashSet => JLinkedHashSet, Set => JSet}
+import java.util.{Collection => JCollection, Collections => JCollections, LinkedHashSet => JLinkedHashSet, Set => JSet}
 
 import org.apache.calcite.linq4j.tree.Expression
 import org.apache.calcite.schema._
 import org.apache.flink.table.api.{CatalogNotExistException, TableNotExistException}
-import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.table.util.Logging
 
 import scala.collection.JavaConverters._
 
@@ -38,9 +38,7 @@ import scala.collection.JavaConverters._
   */
 class ExternalCatalogSchema(
     catalogIdentifier: String,
-    catalog: ExternalCatalog) extends Schema {
-
-  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+    catalog: ExternalCatalog) extends Schema with Logging {
 
   /**
     * Looks up a sub-schema by the given sub-schema name in the external catalog.

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index ccc2e9e..6bacac1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -27,9 +27,9 @@ import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMat
 import org.apache.flink.table.plan.schema.{StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
-import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -37,13 +37,11 @@ import scala.collection.mutable
 /**
   * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
   */
-object ExternalTableSourceUtil {
+object ExternalTableSourceUtil extends Logging {
 
   // config file to specify scan package to search TableSourceConverter
   private val tableSourceConverterConfigFileName = "tableSourceConverter.properties"
 
-  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
-
   // registered table type with the TableSourceConverter.
   // Key is table type name, Value is set of converter class.
   private val tableTypeToTableSourceConvertersClazz = {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index d54c04b..01314ab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -18,10 +18,8 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
@@ -32,7 +30,7 @@ import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.util.Logging
 
 /**
   *
@@ -57,9 +55,8 @@ class DataStreamGroupAggregate(
     groupings: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode)
     with CommonAggregate
-    with DataStreamRel {
-
-  private val LOG = LoggerFactory.getLogger(this.getClass)
+    with DataStreamRel
+    with Logging {
 
   override def deriveRowType() = schema.logicalType
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index d860cbe..4af68cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow}
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -42,7 +42,7 @@ import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
 import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.util.Logging
 
 class DataStreamGroupWindowAggregate(
     window: LogicalWindow,
@@ -54,9 +54,10 @@ class DataStreamGroupWindowAggregate(
     schema: RowSchema,
     inputSchema: RowSchema,
     grouping: Array[Int])
-  extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
-
-  private val LOG = LoggerFactory.getLogger(this.getClass)
+  extends SingleRel(cluster, traitSet, inputNode)
+    with CommonAggregate
+    with DataStreamRel
+    with Logging {
 
   override def deriveRowType(): RelDataType = schema.logicalType
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index c03dac6..0db30ab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -20,23 +20,23 @@ package org.apache.flink.table.plan.nodes.datastream
 import java.util.{List => JList}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.OverAggregate
-import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.util.Logging
 
 class DataStreamOverAggregate(
     logicWindow: Window,
@@ -47,8 +47,8 @@ class DataStreamOverAggregate(
     inputSchema: RowSchema)
   extends SingleRel(cluster, traitSet, inputNode)
   with OverAggregate
-  with DataStreamRel {
-  private val LOG = LoggerFactory.getLogger(this.getClass)
+  with DataStreamRel
+  with Logging {
 
   override def deriveRowType(): RelDataType = schema.logicalType
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
index 4f0a785..2553d9c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
@@ -25,9 +25,9 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.{Logger, LoggerFactory}
 
 /**
   * A CorrelateProcessRunner with [[CRow]] input and [[CRow]] output.
@@ -40,9 +40,8 @@ class CRowCorrelateProcessRunner(
     @transient var returnType: TypeInformation[CRow])
   extends ProcessFunction[CRow, CRow]
   with ResultTypeQueryable[CRow]
-  with Compiler[Any] {
-
-  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+  with Compiler[Any]
+  with Logging {
 
   private var function: ProcessFunction[Row, Row] = _
   private var collector: TableFunctionCollector[_] = _

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
index 109c6e1..f5783e0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
 
 /**
   * MapRunner with [[CRow]] input.
@@ -36,9 +36,8 @@ class CRowInputMapRunner[OUT](
     @transient var returnType: TypeInformation[OUT])
   extends RichMapFunction[CRow, OUT]
   with ResultTypeQueryable[OUT]
-  with Compiler[MapFunction[Row, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  with Compiler[MapFunction[Row, OUT]]
+  with Logging {
 
   private var function: MapFunction[Row, OUT] = _
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
index 7c96437..8bf340d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
@@ -22,13 +22,13 @@ import java.lang.{Boolean => JBool}
 
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 
 /**
   * Convert [[CRow]] to a [[JTuple2]]
@@ -38,10 +38,9 @@ class CRowInputJavaTupleOutputMapRunner(
     code: String,
     @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
   extends RichMapFunction[CRow, Any]
-          with ResultTypeQueryable[JTuple2[JBool, Any]]
-          with Compiler[MapFunction[Row, Any]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+    with ResultTypeQueryable[JTuple2[JBool, Any]]
+    with Compiler[MapFunction[Row, Any]]
+    with Logging {
 
   private var function: MapFunction[Row, Any] = _
   private var tupleWrapper: JTuple2[JBool, Any] = _
@@ -72,9 +71,9 @@ class CRowInputScalaTupleOutputMapRunner(
   @transient var returnType: TypeInformation[(Boolean, Any)])
   extends RichMapFunction[CRow, (Boolean, Any)]
     with ResultTypeQueryable[(Boolean, Any)]
-    with Compiler[MapFunction[Row, Any]] {
+    with Compiler[MapFunction[Row, Any]]
+    with Logging {
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
 
   private var function: MapFunction[Row, Any] = _
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
index cb8f695..60aaec1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
 
 /**
   * MapRunner with [[CRow]] output.
@@ -36,9 +36,8 @@ class CRowOutputMapRunner(
     @transient var returnType: TypeInformation[CRow])
   extends RichMapFunction[Any, CRow]
   with ResultTypeQueryable[CRow]
-  with Compiler[MapFunction[Any, Row]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  with Compiler[MapFunction[Any, Row]]
+  with Logging {
 
   private var function: MapFunction[Any, Row] = _
   private var outCRow: CRow = _

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
index cef62a5..a7f3d72 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
@@ -25,9 +25,9 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * ProcessRunner with [[CRow]] input and [[CRow]] output.
@@ -38,9 +38,8 @@ class CRowProcessRunner(
     @transient var returnType: TypeInformation[CRow])
   extends ProcessFunction[CRow, CRow]
   with ResultTypeQueryable[CRow]
-  with Compiler[ProcessFunction[Row, Row]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  with Compiler[ProcessFunction[Row, Row]]
+  with Logging {
 
   private var function: ProcessFunction[Row, Row] = _
   private var cRowWrapper: CRowWrappingCollector = _

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
index 478b6b6..e2f5e61 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
 import org.apache.flink.util.Collector
-import org.slf4j.{Logger, LoggerFactory}
 
 class CorrelateFlatMapRunner[IN, OUT](
     flatMapName: String,
@@ -35,9 +35,8 @@ class CorrelateFlatMapRunner[IN, OUT](
     @transient var returnType: TypeInformation[OUT])
   extends RichFlatMapFunction[IN, OUT]
   with ResultTypeQueryable[OUT]
-  with Compiler[Any] {
-
-  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+  with Compiler[Any]
+  with Logging {
 
   private var function: FlatMapFunction[IN, OUT] = _
   private var collector: TableFunctionCollector[_] = _

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
index 67acc0b..0bf6569 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.runtime
 import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 class FlatJoinRunner[IN1, IN2, OUT](
     name: String,
@@ -32,9 +32,8 @@ class FlatJoinRunner[IN1, IN2, OUT](
     @transient var returnType: TypeInformation[OUT])
   extends RichFlatJoinFunction[IN1, IN2, OUT]
   with ResultTypeQueryable[OUT]
-  with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  with Compiler[FlatJoinFunction[IN1, IN2, OUT]]
+  with Logging {
 
   private var function: FlatJoinFunction[IN1, IN2, OUT] = null
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
index 938da59..6c1f804 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.functions.util.FunctionUtils
 import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 class FlatMapRunner(
     name: String,
@@ -34,9 +34,8 @@ class FlatMapRunner(
     @transient var returnType: TypeInformation[Row])
   extends RichFlatMapFunction[Row, Row]
   with ResultTypeQueryable[Row]
-  with Compiler[FlatMapFunction[Row, Row]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  with Compiler[FlatMapFunction[Row, Row]]
+  with Logging {
 
   private var function: FlatMapFunction[Row, Row] = _
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
index 14eeecf..00d18ec 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.runtime
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
 
 class MapRunner[IN, OUT](
     name: String,
@@ -31,9 +31,8 @@ class MapRunner[IN, OUT](
     @transient var returnType: TypeInformation[OUT])
   extends RichMapFunction[IN, OUT]
   with ResultTypeQueryable[OUT]
-  with Compiler[MapFunction[IN, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  with Compiler[MapFunction[IN, OUT]]
+  with Logging {
 
   private var function: MapFunction[IN, OUT] = _
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
index 00b7b8e..5f5a2cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.runtime
 import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
 
 abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
     name: String,
@@ -32,9 +32,8 @@ abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
     broadcastSetName: String)
   extends RichFlatMapFunction[MULTI_IN, OUT]
     with ResultTypeQueryable[OUT]
-    with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+    with Compiler[FlatJoinFunction[IN1, IN2, OUT]]
+    with Logging {
 
   protected var function: FlatJoinFunction[IN1, IN2, OUT] = _
   protected var broadcastSet: Option[SINGLE_IN] = _

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index dd9c015..d3bffda 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.runtime.aggregate
 import org.apache.flink.api.common.functions.AggregateFunction
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
 
 /**
   * Aggregate Function used for the aggregate operator in
@@ -31,9 +31,8 @@ import org.slf4j.LoggerFactory
   * @param genAggregations Generated aggregate helper function
   */
 class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
-  extends AggregateFunction[CRow, Row, Row] with Compiler[GeneratedAggregations] {
+  extends AggregateFunction[CRow, Row, Row] with Compiler[GeneratedAggregations] with Logging {
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def createAccumulator(): Row = {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
index 5f459f9..83e1b13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
@@ -22,9 +22,9 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * [[RichGroupReduceFunction]] to compute aggregates that do not support pre-aggregation for batch
@@ -35,12 +35,11 @@ import org.slf4j.LoggerFactory
 class DataSetAggFunction(
     private val genAggregations: GeneratedAggregationsFunction)
   extends RichGroupReduceFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations] with Logging {
 
   private var output: Row = _
   private var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
index 9b81992..5276271 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
@@ -23,9 +23,9 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * [[RichGroupReduceFunction]] to compute the final result of a pre-aggregated aggregation
@@ -36,12 +36,11 @@ import org.slf4j.LoggerFactory
 class DataSetFinalAggFunction(
     private val genAggregations: GeneratedAggregationsFunction)
   extends RichGroupReduceFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations] with Logging {
 
   private var output: Row = _
   private var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
index 8febe3e..fc3366b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
@@ -22,9 +22,9 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * [[GroupCombineFunction]] and [[MapPartitionFunction]] to compute pre-aggregates for batch
@@ -36,12 +36,12 @@ class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction)
   extends AbstractRichFunction
   with GroupCombineFunction[Row, Row]
   with MapPartitionFunction[Row, Row]
-  with Compiler[GeneratedAggregations] {
+  with Compiler[GeneratedAggregations]
+  with Logging {
 
   private var output: Row = _
   private var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index fabf200..0c6ed92 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -20,11 +20,11 @@ package org.apache.flink.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * It wraps the aggregate logic inside of
@@ -54,7 +54,8 @@ class DataSetSessionWindowAggReduceGroupFunction(
     gap: Long,
     isInputCombined: Boolean)
   extends RichGroupReduceFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   private var collector: RowTimeWindowPropertyCollector = _
   private val intermediateRowWindowStartPos = keysAndAggregatesArity
@@ -63,7 +64,6 @@ class DataSetSessionWindowAggReduceGroupFunction(
   private var output: Row = _
   private var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
index 9bcac30..666bfee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -22,11 +22,11 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * This wraps the aggregate logic inside of
@@ -46,13 +46,13 @@ class DataSetSessionWindowAggregatePreProcessor(
   with MapPartitionFunction[Row,Row]
   with GroupCombineFunction[Row,Row]
   with ResultTypeQueryable[Row]
-  with Compiler[GeneratedAggregations] {
+  with Compiler[GeneratedAggregations]
+  with Logging {
 
   private var output: Row = _
   private val rowTimeFieldPos = keysAndAggregatesArity
   private var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
index b3a19a4..3af7969 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * It is used for sliding windows on batch for time-windows. It takes a prepared input row (with
@@ -53,7 +53,8 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
   extends RichGroupReduceFunction[Row, Row]
   with CombineFunction[Row, Row]
   with ResultTypeQueryable[Row]
-  with Compiler[GeneratedAggregations] {
+  with Compiler[GeneratedAggregations]
+  with Logging {
 
   private val timeFieldPos = returnType.getArity - 1
   private val intermediateWindowStartPos = keysAndAggregatesArity
@@ -61,7 +62,6 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
   protected var intermediateRow: Row = _
   private var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index 56ed08a..9b74bc6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -22,9 +22,9 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * It wraps the aggregate logic inside of
@@ -45,7 +45,8 @@ class DataSetSlideWindowAggReduceGroupFunction(
     finalRowWindowEndPos: Option[Int],
     windowSize: Long)
   extends RichGroupReduceFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   private var collector: RowTimeWindowPropertyCollector = _
   protected val windowStartPos: Int = keysAndAggregatesArity
@@ -53,7 +54,6 @@ class DataSetSlideWindowAggReduceGroupFunction(
   private var output: Row = _
   protected var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   protected var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
index 0e73f7b..22fe389 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
@@ -22,9 +22,9 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * It wraps the aggregate logic inside of
@@ -38,12 +38,12 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
     private val genAggregations: GeneratedAggregationsFunction,
     private val windowSize: Long)
   extends RichGroupReduceFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   private var output: Row = _
   private var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 8af2c2e..c963239 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -22,9 +22,9 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
 
 /**
   * It wraps the aggregate logic inside of
@@ -44,7 +44,8 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
     windowEndPos: Option[Int],
     keysAndAggregatesArity: Int)
   extends RichGroupReduceFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   private var collector: RowTimeWindowPropertyCollector = _
   protected var aggregateBuffer: Row = new Row(keysAndAggregatesArity + 1)
@@ -52,7 +53,6 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
   private var output: Row = _
   protected var accumulators: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   protected var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
index d49ed0e..324784f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
 
 /**
   * This map function only works for windows on batch tables.
@@ -44,12 +44,12 @@ class DataSetWindowAggMapFunction(
     @transient private val returnType: TypeInformation[Row])
   extends RichMapFunction[Row, Row]
     with ResultTypeQueryable[Row]
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   private var accs: Row = _
   private var output: Row = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 57ea86e..385778c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -26,10 +26,11 @@ import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
+
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.slf4j.{Logger, LoggerFactory}
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
 
 /**
   * Aggregate Function used for the groupby (without window) aggregate
@@ -43,9 +44,9 @@ class GroupAggProcessFunction(
     private val generateRetraction: Boolean,
     private val queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
-  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   private var newRow: CRow = _

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 8f2ec98..1424644 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.util.Logging
 
 /**
   * Process Function used for the aggregate in bounded proc-time OVER window
@@ -52,13 +52,13 @@ class ProcTimeBoundedRangeOver(
     inputType: TypeInformation[CRow],
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   private var output: CRow = _
   private var accumulatorState: ValueState[Row] = _
   private var rowMapState: MapState[Long, JList[Row]] = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
index ccc4b46..9eaed04 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -21,8 +21,6 @@ import java.util
 
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
@@ -36,7 +34,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
 
 /**
   * Process Function for ROW clause processing-time bounded OVER window
@@ -53,7 +53,8 @@ class ProcTimeBoundedRowsOver(
     inputType: TypeInformation[CRow],
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   Preconditions.checkArgument(precedingOffset > 0)
 
@@ -63,7 +64,6 @@ class ProcTimeBoundedRowsOver(
   private var counterState: ValueState[Long] = _
   private var smallestTsState: ValueState[Long] = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
index 7a7b44d..d96ee7e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
@@ -19,15 +19,15 @@ package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.CRow
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
 
 /**
   * Process Function for processing-time unbounded OVER window
@@ -40,11 +40,11 @@ class ProcTimeUnboundedOver(
     aggregationStateType: RowTypeInfo,
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   private var output: CRow = _
   private var state: ValueState[Row] = _
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 1a207bb..e183b43 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -27,9 +27,9 @@ import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
-import org.slf4j.LoggerFactory
 
 /**
  * Process Function for RANGE clause event-time bounded OVER window
@@ -46,7 +46,8 @@ class RowTimeBoundedRangeOver(
     precedingOffset: Long,
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
   Preconditions.checkNotNull(aggregationStateType)
   Preconditions.checkNotNull(precedingOffset)
 
@@ -64,7 +65,6 @@ class RowTimeBoundedRangeOver(
   // to this time stamp.
   private var dataState: MapState[Long, JList[Row]] = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index a4b1076..b519554 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -30,7 +30,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.util.Logging
 
 /**
  * Process Function for ROWS clause event-time bounded OVER window
@@ -47,7 +47,8 @@ class RowTimeBoundedRowsOver(
     precedingOffset: Long,
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   Preconditions.checkNotNull(aggregationStateType)
   Preconditions.checkNotNull(precedingOffset)
@@ -69,7 +70,6 @@ class RowTimeBoundedRowsOver(
   // to this time stamp.
   private var dataState: MapState[Long, JList[Row]] = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index f38ba93..d4b5d23a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -22,7 +22,6 @@ import java.util.{List => JList}
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.util.{Collector, Preconditions}
 import org.apache.flink.api.common.state._
@@ -31,7 +30,8 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.slf4j.LoggerFactory
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
 
 
 /**
@@ -47,7 +47,8 @@ abstract class RowTimeUnboundedOver(
     inputType: TypeInformation[CRow],
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
+    with Compiler[GeneratedAggregations]
+    with Logging {
 
   protected var output: CRow = _
   // state to hold the accumulators of the aggregations
@@ -57,7 +58,6 @@ abstract class RowTimeUnboundedOver(
   // list to sort timestamps to access rows in timestamp order
   private var sortedTimestamps: util.LinkedList[Long] = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   protected var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
index 1cb3a6e..fff5924 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.core.io.GenericInputSplit
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
 
 class CRowValuesInputFormat(
     name: String,
@@ -34,9 +34,8 @@ class CRowValuesInputFormat(
   extends GenericInputFormat[CRow]
   with NonParallelInput
   with ResultTypeQueryable[CRow]
-  with Compiler[GenericInputFormat[Row]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  with Compiler[GenericInputFormat[Row]]
+  with Logging {
 
   private var format: GenericInputFormat[Row] = _
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
index 43ce605..858146a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.runtime.io
 import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.core.io.GenericInputSplit
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
 
 class ValuesInputFormat(
     name: String,
@@ -33,9 +33,8 @@ class ValuesInputFormat(
   extends GenericInputFormat[Row]
   with NonParallelInput
   with ResultTypeQueryable[Row]
-  with Compiler[GenericInputFormat[Row]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  with Compiler[GenericInputFormat[Row]]
+  with Logging {
 
   private var format: GenericInputFormat[Row] = _
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3167f72d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/util/Logging.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/util/Logging.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/util/Logging.scala
new file mode 100644
index 0000000..b6be99e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/util/Logging.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util
+
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * Helper class to ensure the logger is never serialized.
+  */
+trait Logging {
+  @transient lazy val LOG: Logger = LoggerFactory.getLogger(getClass)
+}


Mime
View raw message