flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: ValueState is missing
Date Fri, 12 Aug 2016 09:42:46 GMT
You're clearing the "handState" on "GameEndHistory". I'm assuming this
event comes in before "CommCardHistory" where you check the state.

On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.same@gmail.com> wrote:
> in my code, is the config of ExecutionEnv alright?
>
>
>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.same@gmail.com> wrote:
>>
>>
>> my code and log is as below.
>>
>>
>>    val getExecuteEnv: StreamExecutionEnvironment = {
>>        val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>        env.getCheckpointConfig.setCheckpointTimeout(60000)
>>        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>        env
>>    }
>>
>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase]
=
>>        target.keyBy(_._3).flatMap(new StateOperator)
>>
>> def main(args: Array[String]) {
>>        val env = getExecuteEnv
>>        val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>        val json = deserializeToJsonObj(source).name("ConvertToJson")
>>        val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>        val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>> …
>> }
>>
>> class StateOperator extends RichFlatMapFunction[(String, String, String, String,
Long), WinLossBase] {
>>        var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>        var handState: ValueState[HandHistoryInfo] = _
>>
>>        override def open(param: Configuration): Unit = {
>>            val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String,
PotPlayer]]("winloss",
>>                classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>            playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>            handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo",
classOf[HandHistoryInfo], null))
>>        }
>>
>>        override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]):
Unit = {
>>            in._2 match {
>>                case "GameStartHistory" =>
>>                    val players = playerState.value()
>>                    val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>                    val record = obj.asInstanceOf[GameStartHistoryRecord]
>>                    val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>                    if (LOG.isInfoEnabled())
>>                        LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId
else "NULL”)
>>                      ….
>>                    playerState.update(players)
>>                    handState.update(handHistoryInfo)
>>                case "HoleCardHistory" =>
>>                    val players = playerState.value()
>>                    if (players != null) {
>>                       ...
>>                         playerState.update(players)
>>                    } else LOG.warn("there is no player[hole card]. {}", in._4)
>>                case "PlayerStateHistory" =>
>>                    val players = playerState.value()
>>                    if (players != null) {
>>                       ….
>>                        playerState.update(players)
>>                    } else LOG.warn("there is no player[player state]. {}", in._4)
>>                case "CommCardHistory" =>
>>                    val handHistoryInfo = handState.value()
>>                    val commCardHistory: CommCardHistory = commCardState.value()
>>                    if (handHistoryInfo != null) {
>>                       ...
>>                        handState.update(handHistoryInfo)
>>                        commCardState.update(commCardHistory)
>>                    } else LOG.warn("there is no handhistory info[comm card]. {}",
in._4)
>>                case "PlayerActionHistory" =>
>>                    val handHistoryInfo = handState.value()
>>                    val players = playerState.value()
>>
>>                    if (handHistoryInfo != null) {
>>                       ...
>>                    } else LOG.warn("there is no handhistory info[player action].
{}", in._4)
>>                case "PotHistory" =>
>>                    val players = playerState.value()
>>                    val handHistoryInfo = handState.value()
>>                    val commCardHistory: CommCardHistory = commCardState.value()
>>                    if (handHistoryInfo != null && handHistoryInfo.playType
== PlayType.Cash && players != null && players.size > 1) {
>>                        ...
>>                    } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>                case "GameEndHistory" =>
>>                    val players = playerState.value()
>>                    val handHistoryInfo = handState.value()
>>                       ...
>>                    if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>                    playerState.clear()
>>                    handState.clear()
>>                case _ =>
>>            }
>>        }
>>
>> —— log ——
>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map,
Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO
 com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>
>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map,
Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN
 com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>
>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <uce@apache.org> wrote:
>>>
>>> What do you mean with lost exactly?
>>>
>>> You call value() and it returns a value (!= null/defaultValue) and you
>>> call it again and it returns null/defaultValue for the same key with
>>> no update in between?
>>>
>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>> <k.kloudas@data-artisans.com> wrote:
>>>> Hello,
>>>>
>>>> Could you share the code of the job you are running?
>>>> With only this information I am afraid we cannot help much.
>>>>
>>>> Thanks,
>>>> Kostas
>>>>
>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.same@gmail.com>
wrote:
>>>>>
>>>>> Hi.
>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>> sporadically value of ValueState is lost.
>>>>> what is starting point for solving this problem.
>>>>> Thank you.
>>>>
>>
>

Mime
View raw message