flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: ValueState in RichCoFlatMap, possible 1.2-SNAPSHOT regression
Date Fri, 21 Oct 2016 14:33:49 GMT
Hi,

the problem is this line

> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {

which should use „class" instead of „object". Otherwise, one singleton instance of the
FlatMapper is used by Flink across multiple operator instances, which leads to the whole bunch
of exceptions you experience.

Best,
Stefan

> Am 20.10.2016 um 23:22 schrieb Seth Wiesman <swiesman@mediamath.com>:
> 
> Hi all, 
>  
> I was trying to implement a join similar to what was laid out in the flink forward talk
Joining Infinity: Windowless Stream Processing with Flink <https://www.youtube.com/watch?v=UrRQYzux5L0&feature=youtu.be&list=PLDX4T_cnKjD2E_lSDcxOXED59GvXHVKR->
and I have been running to some issues. I am running on 1.2-SNAPSHOT compiled for scala 2.11
and suspect this may be a regression so I am including the dev mailing list. When initializing
the value state I receive a null pointer exception: 
>  
> java.lang.NullPointerException
>                 at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
>                 at com.mediamath.reporting.streaming.FlatMapper$.open(StreamingPipeline.scala:21)
>                 at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>                 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:154)
>                 at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:368)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
>                 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:593)
>                 at java.lang.Thread.run(Thread.java:745)
>  
>  
> Below is a minimum failing example:
>  
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.runtime.state.memory.MemoryStateBackend
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
> import org.apache.flink.util.Collector
> 
> import scala.collection.mutable.ArrayBuffer
> 
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {
> 
>   val buffer = ArrayBuffer.empty[Long]
> 
>   @transient var state: ValueState[String] = _
> 
>   override def open(parameters: Configuration): Unit = {
>     super.open(parameters)
>     state = getRuntimeContext.getState(new ValueStateDescriptor[String]("state-descriptor",
classOf[String], ""))
>   }
> 
>   override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
>     state.update(value)
>   }
> 
>   override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
>     buffer += value
> 
>     if (state.value() != "") {
>       for (elem ← buffer) {
>         out.collect((elem, state.value()))
>       }
> 
>       buffer.clear()
>     }
>   }
> }
> 
> object StreamingPipeline {
> 
>   def main(args: Array[String]): Unit = {
> 
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>     env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     env.setStateBackend(new MemoryStateBackend())
> 
>     val pipeline1 = env.generateSequence(0, 1000)
> 
>     val pipeline2 = env.fromElements("even", "odd")
> 
>     pipeline1.connect(pipeline2)
>       .keyBy(
>         elem ⇒ elem % 2 == 0,
>         elem ⇒ elem == "even"
>       ).flatMap(FlatMapper)
>       .print()
> 
>     env.execute("Example")
>   }
> }
> I also attempted retrieving the state each time I needed it:
>  
> import org.apache.flink.api.common.state.ValueStateDescriptor
> import org.apache.flink.api.scala._
> import org.apache.flink.runtime.state.memory.MemoryStateBackend
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
> import org.apache.flink.util.Collector
> 
> import scala.collection.mutable.ArrayBuffer
> 
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {
>   val descriptor = new ValueStateDescriptor[String]("state-descriptor", classOf[String],
"")
> 
>   val buffer = ArrayBuffer.empty[Long]
> 
>   override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
>     getRuntimeContext.getState(descriptor).update(value)
>   }
> 
>   override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
>     buffer += value
> 
>     val state = getRuntimeContext.getState(descriptor)
>     
>     if (state.value() != "") {
>       for (elem ← buffer) {
>         out.collect((elem, state.value()))
>       }
> 
>       buffer.clear()
>     }
>   }
> }
> 
> object StreamingPipeline {
> 
>   def main(args: Array[String]): Unit = {
> 
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>     env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     env.setStateBackend(new MemoryStateBackend())
> 
>     val pipeline1 = env.generateSequence(0, 1000)
> 
>     val pipeline2 = env.fromElements("even", "odd")
> 
>     pipeline1.connect(pipeline2)
>       .keyBy(
>         elem ⇒ elem % 2 == 0,
>         elem ⇒ elem == "even"
>       ).flatMap(FlatMapper)
>       .print()
> 
>     env.execute("Example")
>   }
> 
> }
>  
> but this results in this precondition failing <https://github.com/apache/flink/blob/6f0faf9bb35e7cac3a38ed792cdabd6400fc4c79/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java#L88>
on updates. 
>  
> Seth Wiesman


Mime
View raw message