flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mateusz Zakarczemny (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-6298) Local execution is not setting RuntimeContext for RichOutputFormat
Date Tue, 11 Apr 2017 18:02:42 GMT
Mateusz Zakarczemny created FLINK-6298:
------------------------------------------

             Summary: Local execution is not setting RuntimeContext for RichOutputFormat
                 Key: FLINK-6298
                 URL: https://issues.apache.org/jira/browse/FLINK-6298
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.2.0, 1.1.0
            Reporter: Mateusz Zakarczemny


RuntimeContext is never set in RichOutputFormat. I tested it in local execution. RichMapFunction
is setup correctly. 

Following code will never print "//////Context set in RichOutputFormat"
{code}

import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.api.common.io.RichOutputFormat
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object Startup {
  def main(args: Array[String]): Unit = {

    val mapFunction = new RichMapFunction[String, String] {
      def open(taskNumber: Int, numTasks: Int) { getRuntimeContext }
      def map(event: String) = { event }
      override def setRuntimeContext(t: RuntimeContext) = {
        println("//////Context set in RichMapFunction")
        super.setRuntimeContext(t)
      }
    }

    val outputFormat = new RichOutputFormat[String] {
      override def setRuntimeContext(t: RuntimeContext) = {
        println("//////Context set in RichOutputFormat")
        super.setRuntimeContext(t)
      }
      def open(taskNumber: Int, numTasks: Int) {}
      def writeRecord(event: String) {
        println(event)
      }
      def configure(parameters: Configuration): Unit = {}
      def close(): Unit = {}
    }

    val see = StreamExecutionEnvironment.getExecutionEnvironment
    val eventsStream = see.fromElements[String]("A", "B", "C").map(mapFunction)
    eventsStream.writeUsingOutputFormat(outputFormat)
    see.execute("test-job")
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message