flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dong-iL, Kim" <kim.s...@gmail.com>
Subject Re: ValueState is missing
Date Fri, 12 Aug 2016 04:59:56 GMT
in my code, is the config of ExecutionEnv alright?


> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.same@gmail.com> wrote:
> 
> 
> my code and log is as below.
> 
> 
>    val getExecuteEnv: StreamExecutionEnvironment = {
>        val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>        env.getCheckpointConfig.setCheckpointTimeout(60000)
>        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>        env
>    }
> 
> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase]
=
>        target.keyBy(_._3).flatMap(new StateOperator)
> 
> def main(args: Array[String]) {
>        val env = getExecuteEnv
>        val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>        val json = deserializeToJsonObj(source).name("ConvertToJson")
>        val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>        val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
> …
> }
> 
> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long),
WinLossBase] {
>        var playerState: ValueState[util.Map[String, PotPlayer]] = _
>        var handState: ValueState[HandHistoryInfo] = _
> 
>        override def open(param: Configuration): Unit = {
>            val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String,
PotPlayer]]("winloss",
>                classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>            playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>            handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo",
classOf[HandHistoryInfo], null))
>        }
> 
>        override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]):
Unit = {
>            in._2 match {
>                case "GameStartHistory" =>
>                    val players = playerState.value()
>                    val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>                    val record = obj.asInstanceOf[GameStartHistoryRecord]
>                    val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>                    if (LOG.isInfoEnabled())
>                        LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId
else "NULL”)
> 		       ….
>                    playerState.update(players)
>                    handState.update(handHistoryInfo)
>                case "HoleCardHistory" =>
>                    val players = playerState.value()
>                    if (players != null) {
> 			...
>                         playerState.update(players)
>                    } else LOG.warn("there is no player[hole card]. {}", in._4)
>                case "PlayerStateHistory" =>
>                    val players = playerState.value()
>                    if (players != null) {
> 			….
>                        playerState.update(players)
>                    } else LOG.warn("there is no player[player state]. {}", in._4)
>                case "CommCardHistory" =>
>                    val handHistoryInfo = handState.value()
>                    val commCardHistory: CommCardHistory = commCardState.value()
>                    if (handHistoryInfo != null) {
> 			...
>                        handState.update(handHistoryInfo)
>                        commCardState.update(commCardHistory)
>                    } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>                case "PlayerActionHistory" =>
>                    val handHistoryInfo = handState.value()
>                    val players = playerState.value()
> 
>                    if (handHistoryInfo != null) {
> 			...
>                    } else LOG.warn("there is no handhistory info[player action]. {}",
in._4)
>                case "PotHistory" =>
>                    val players = playerState.value()
>                    val handHistoryInfo = handState.value()
>                    val commCardHistory: CommCardHistory = commCardState.value()
>                    if (handHistoryInfo != null && handHistoryInfo.playType ==
PlayType.Cash && players != null && players.size > 1) {
>                        ...
>                    } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>                case "GameEndHistory" =>
>                    val players = playerState.value()
>                    val handHistoryInfo = handState.value()
> 			...
>                    if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>                    playerState.clear()
>                    handState.clear()
>                case _ =>
>            }
>        }
> 
> —— log —— 
> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map
-> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO 
com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
> 
> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map
-> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN 
com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot]. 
> 
>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <uce@apache.org> wrote:
>> 
>> What do you mean with lost exactly?
>> 
>> You call value() and it returns a value (!= null/defaultValue) and you
>> call it again and it returns null/defaultValue for the same key with
>> no update in between?
>> 
>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>> <k.kloudas@data-artisans.com> wrote:
>>> Hello,
>>> 
>>> Could you share the code of the job you are running?
>>> With only this information I am afraid we cannot help much.
>>> 
>>> Thanks,
>>> Kostas
>>> 
>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.same@gmail.com> wrote:
>>>> 
>>>> Hi.
>>>> I’m using flink 1.0.3 on aws EMR.
>>>> sporadically value of ValueState is lost.
>>>> what is starting point for solving this problem.
>>>> Thank you.
>>> 
> 


Mime
View raw message