flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dong-iL, Kim" <kim.s...@gmail.com>
Subject Re: ValueState is missing
Date Mon, 15 Aug 2016 08:28:18 GMT
Hi. Stephan.

do you mean using map on local excution?
I’ve tested it but not works at all.
Thanks.

> On Aug 15, 2016, at 4:56 PM, Dong-iL, Kim <kim.same@gmail.com> wrote:
> 
> Hi.
> I've tested the program with window function(keyBy->window->collect). it has no
problem.
> 
> my old program. (keyBy-> state processing). can it be processed by multiple thread
within a key?
> 
> Thank you.
> 
>> On Aug 12, 2016, at 8:27 PM, Stephan Ewen <sewen@apache.org <mailto:sewen@apache.org>>
wrote:
>> 
>> Hi!
>> 
>> So far we are not aware of a state loss bug in Flink. My guess is that it is some
subtlety in the program.
>> 
>> The check that logs also has other checks, like "handHistoryInfo.playType == PlayType.Cash"
and "players.size > 1". Is one of them maybe the problem?
>> 
>> 
>> To debug this, you can try and do the following:
>> 
>> Rather than using Flink's key/value state, simply use your own java/scala map in
the RichFlatMapFunction.
>> That is not by default fault-tolerant, but you can use that to see if the error occurs
in the same way or not.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> 
>> On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.same@gmail.com <mailto:kim.same@gmail.com>>
wrote:
>> Hi.
>> I checked order of data. but it is alright.
>> Is there any other possibilities?
>> Thank you.
>> 
>>> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <sewen@apache.org <mailto:sewen@apache.org>>
wrote:
>>> 
>>> Hi!
>>> 
>>> Its not that easy to say at a first glance.
>>> 
>>> One thing that is important to bear in mind is what ordering guarantees Flink
gives, and where the ordering guarantees are not given.
>>> When you use keyBy() or redistribute(), order is preserved per parallel source/target
pair only.
>>> 
>>> Have a look here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows
<https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows>
>>> 
>>> 
>>> Could it be that the events simply arrive in a different order in the functions,
so that a later event that looks for state comes before an earlier event that creates the
state?
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.same@gmail.com <mailto:kim.same@gmail.com>>
wrote:
>>> Nope.
>>> I added log in End.
>>> but there is same log.
>>> is there any fault in my code?
>>> 
>>> thank you.
>>> 
>>> 
>>> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <mxm@apache.org <mailto:mxm@apache.org>>
wrote:
>>> >
>>> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
>>> > event comes in before "CommCardHistory" where you check the state.
>>> >
>>> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.same@gmail.com <mailto:kim.same@gmail.com>>
wrote:
>>> >> in my code, is the config of ExecutionEnv alright?
>>> >>
>>> >>
>>> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.same@gmail.com
<mailto:kim.same@gmail.com>> wrote:
>>> >>>
>>> >>>
>>> >>> my code and log is as below.
>>> >>>
>>> >>>
>>> >>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>> >>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>> >>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>> >>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>> >>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>> >>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>> >>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30,
30000))
>>> >>>       env
>>> >>>   }
>>> >>>
>>> >>> def transform(target: DataStream[(String, String, String, String,
Long)]): DataStream[WinLossBase] =
>>> >>>       target.keyBy(_._3).flatMap(new StateOperator)
>>> >>>
>>> >>> def main(args: Array[String]) {
>>> >>>       val env = getExecuteEnv
>>> >>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>> >>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>> >>>       val target: DataStream[(String, String, String, String, Long)]
= preTransform(json)
>>> >>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> >>> …
>>> >>> }
>>> >>>
>>> >>> class StateOperator extends RichFlatMapFunction[(String, String,
String, String, Long), WinLossBase] {
>>> >>>       var playerState: ValueState[util.Map[String, PotPlayer]] =
_
>>> >>>       var handState: ValueState[HandHistoryInfo] = _
>>> >>>
>>> >>>       override def open(param: Configuration): Unit = {
>>> >>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String,
PotPlayer]]("winloss",
>>> >>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String,
PotPlayer]())
>>> >>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>> >>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo",
classOf[HandHistoryInfo], null))
>>> >>>       }
>>> >>>
>>> >>>       override def flatMap(in: (String, String, String, String,
Long), out: Collector[WinLossBase]): Unit = {
>>> >>>           in._2 match {
>>> >>>               case "GameStartHistory" =>
>>> >>>                   val players = playerState.value()
>>> >>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>> >>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>> >>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>> >>>                   if (LOG.isInfoEnabled())
>>> >>>                       LOG.info <http://log.info/>("hand start
{}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>> >>>                     ….
>>> >>>                   playerState.update(players)
>>> >>>                   handState.update(handHistoryInfo)
>>> >>>               case "HoleCardHistory" =>
>>> >>>                   val players = playerState.value()
>>> >>>                   if (players != null) {
>>> >>>                      ...
>>> >>>                        playerState.update(players)
>>> >>>                   } else LOG.warn("there is no player[hole card].
{}", in._4)
>>> >>>               case "PlayerStateHistory" =>
>>> >>>                   val players = playerState.value()
>>> >>>                   if (players != null) {
>>> >>>                      ….
>>> >>>                       playerState.update(players)
>>> >>>                   } else LOG.warn("there is no player[player state].
{}", in._4)
>>> >>>               case "CommCardHistory" =>
>>> >>>                   val handHistoryInfo = handState.value()
>>> >>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>> >>>                   if (handHistoryInfo != null) {
>>> >>>                      ...
>>> >>>                       handState.update(handHistoryInfo)
>>> >>>                       commCardState.update(commCardHistory)
>>> >>>                   } else LOG.warn("there is no handhistory info[comm
card]. {}", in._4)
>>> >>>               case "PlayerActionHistory" =>
>>> >>>                   val handHistoryInfo = handState.value()
>>> >>>                   val players = playerState.value()
>>> >>>
>>> >>>                   if (handHistoryInfo != null) {
>>> >>>                      ...
>>> >>>                   } else LOG.warn("there is no handhistory info[player
action]. {}", in._4)
>>> >>>               case "PotHistory" =>
>>> >>>                   val players = playerState.value()
>>> >>>                   val handHistoryInfo = handState.value()
>>> >>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>> >>>                   if (handHistoryInfo != null && handHistoryInfo.playType
== PlayType.Cash && players != null && players.size > 1) {
>>> >>>                       ...
>>> >>>                   } else LOG.warn("there is no handhistory info[pot].
{}", in._4)
>>> >>>               case "GameEndHistory" =>
>>> >>>                   val players = playerState.value()
>>> >>>                   val handHistoryInfo = handState.value()
>>> >>>                      ...
>>> >>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}",
record.getHandHistoryId)
>>> >>>                   playerState.clear()
>>> >>>                   handState.clear()
>>> >>>               case _ =>
>>> >>>           }
>>> >>>       }
>>> >>>
>>> >>> —— log ——
>>> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot
to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase)
(3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>> >>>
>>> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot
to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase)
(3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>> >>>
>>> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <uce@apache.org
<mailto:uce@apache.org>> wrote:
>>> >>>>
>>> >>>> What do you mean with lost exactly?
>>> >>>>
>>> >>>> You call value() and it returns a value (!= null/defaultValue)
and you
>>> >>>> call it again and it returns null/defaultValue for the same
key with
>>> >>>> no update in between?
>>> >>>>
>>> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>> >>>> <k.kloudas@data-artisans.com <mailto:k.kloudas@data-artisans.com>>
wrote:
>>> >>>>> Hello,
>>> >>>>>
>>> >>>>> Could you share the code of the job you are running?
>>> >>>>> With only this information I am afraid we cannot help much.
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>> Kostas
>>> >>>>>
>>> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.same@gmail.com
<mailto:kim.same@gmail.com>> wrote:
>>> >>>>>>
>>> >>>>>> Hi.
>>> >>>>>> I’m using flink 1.0.3 on aws EMR.
>>> >>>>>> sporadically value of ValueState is lost.
>>> >>>>>> what is starting point for solving this problem.
>>> >>>>>> Thank you.
>>> >>>>>
>>> >>>
>>> >>
>>> 
>>> 
>> 
>> 
> 


Mime
View raw message