flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jacob Park (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger
Date Mon, 21 Aug 2017 18:29:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16135534#comment-16135534
] 

Jacob Park edited comment on FLINK-7398 at 8/21/17 6:28 PM:
------------------------------------------------------------

[~wheat9] Why not follow Apache Spark's example for this problem?
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit
the trait, and the current existing code will produce a compile-time error by the conflict,
which you can use to fix the bad logging.

Edit: Refer to my Example.png.

!https://issues.apache.org/jira/secure/attachment/12882940/Example.png!

I can take on this task if you want.


was (Author: jparkie):
[~wheat9] Why not follow Apache Spark's example for this problem?
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit
the trait, and the current existing code will produce a compile-time error by the conflict,
which you can use to fix the bad logging.

Edit: Refer to my Example.png.

!https://issues.apache.org/jira/secure/attachment/12882932/Example.png!

I can take on this task if you want.

> Table API operators/UDFs must not store Logger
> ----------------------------------------------
>
>                 Key: FLINK-7398
>                 URL: https://issues.apache.org/jira/browse/FLINK-7398
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Aljoscha Krettek
>            Assignee: Haohui Mai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>         Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in an instance
field (c.f. https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
This means that the {{Logger}} will be serialised with the UDF and sent to the cluster. This
in itself does not sound right and leads to problems when the slf4j configuration on the Client
is different from the cluster environment.
> This is an example of a user running into that problem: https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
Here, they have Logback on the client but the Logback classes are not available on the cluster
and so deserialisation of the UDFs fails with a {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43:  private
val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:  private
val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  val LOG10
= Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
 private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
 private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
 private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  val LOGICAL:
Convention = new Convention.Impl("LOGICAL", classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val LOGICAL_OPT_RULES:
RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
 val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala:47:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala:67:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala:72:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala:60:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala:40:  val LOG:
Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala:45:  val
LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala:41:  val LOG =
LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:44:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:77:
 val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala:41:  val LOG
= LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala:43:  val LOG =
LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala:37:  val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala:39:  val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala:39:  val
LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala:38:  val LOG
= LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/MapRunner.scala:36:  val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala:37:  val LOG =
LoggerFactory.getLogger(this.getClass)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message