Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 06151200D5C for ; Fri, 15 Dec 2017 13:56:05 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 04ADB160C14; Fri, 15 Dec 2017 12:56:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F0B6B160C06 for ; Fri, 15 Dec 2017 13:56:03 +0100 (CET) Received: (qmail 67497 invoked by uid 500); 15 Dec 2017 12:56:03 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 67487 invoked by uid 99); 15 Dec 2017 12:56:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Dec 2017 12:56:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id AF48FC0CA7 for ; Fri, 15 Dec 2017 12:56:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -98.711 X-Spam-Level: X-Spam-Status: No, score=-98.711 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_NUMSUBJECT=0.5, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id wwJgPh_q4slc for ; Fri, 15 Dec 2017 12:56:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id EC87E5F367 for ; Fri, 15 Dec 2017 12:56:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 6E4D1E0F88 for ; Fri, 15 Dec 2017 12:56:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2CAB927408 for ; Fri, 15 Dec 2017 12:56:00 +0000 (UTC) Date: Fri, 15 Dec 2017 12:56:00 +0000 (UTC) From: "Aljoscha Krettek (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Closed] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4 MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 15 Dec 2017 12:56:05 -0000 [ https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8248. ----------------------------------- Resolution: Not A Problem > RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4 > ----------------------------------------------------------------------- > > Key: FLINK-8248 > URL: https://issues.apache.org/jira/browse/FLINK-8248 > Project: Flink > Issue Type: Bug > Components: CEP, State Backends, Checkpointing > Affects Versions: 1.4.0, 1.3.2 > Environment: linux: 3.10.0-514.el7.x86_64 > flink: > * version: 1.4 > * rocksdb backend state > * checkpoint interval 5s > * keyed cep > language: Java8 > Reporter: jia liu > > Here is my exception log: > {code:java} > java.lang.RuntimeException: Exception occurred while processing valve output watermark: > at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291) > at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189) > at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error while adding data to RocksDB > at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103) > at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309) > at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247) > at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277) > at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > ... 7 more > Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon', detector='SlidingWindowAnomalyDetector', measure='count', field='activity', dimension='Logoff', description='null', icons=null, startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0, anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0}, user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0), [SharedBufferEdge(null, 199)], 1) > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943) > at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806) > at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888) > at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820) > at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100) > ... 13 more > {code} > Main job code: > {code:java} > final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new RocksDBStateBackend(getString("flink.backend-state-dir"))); > // ......... > DataStream behaviorStream = anomalyStream > .assignTimestampsAndWatermarks(new AnomalyTimestampExtractor(Time.seconds(0))) > .keyBy((KeySelector) value -> value.entity) > .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")), > Time.seconds(getLong("flink.window.slice-size")))) > .apply(new BehaviorBuilderFunction()) > .filter(new WhitelistFilterFunction()) > // non-keyed stream will result in pattern operator parallelism equal to 1. > .keyBy((KeySelector) Behavior::getUser); > // cep on behavior stream > List allPatterns = PatternsHolder.getAllPatterns(); > for (Pattern pa : allPatterns) { > PatternStream ps = CEP.pattern(behaviorStream, pa); > ps.select(new AlertGenerator(pa.getName())).name(pa.getName()); > } > {code} > keyed stream event: > {code:java} > public class Behavior implements Serializable { > private static final long serialVersionUID = 7786674623147772721L; > static int ANOMALY_SCORE_THRESHOLD = 40; > static int ANOMALY_COUNT_THRESHOLD = 3; > public final String schema; > public final String detector; > private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD; > public final String dimension; > public final String field; //dim value > private String user; > public String group; > public double count; > public int anomalyScore; > protected String description; > private Icon[] icons; > private int adHashCode; > private long startTimestamp; > private long endTimestamp; > private Map timeMap; > public ArrayList> logQuery; > public Behavior(String schema, String detector, String field, String dimension, String user, > long fromMillis, long toMillis, double count, int anomalyScore, ArrayList Object>> logQuery) { > this.schema = schema; > this.detector = detector; > this.field = field; > this.dimension = dimension; > this.user = user; > this.startTimestamp = fromMillis; > this.endTimestamp = toMillis; > this.count = count; > this.anomalyScore = anomalyScore; > this.logQuery = logQuery; > timeMap = new HashMap<>(); > timeMap.put(fromMillis, count); > } > public Behavior(String schema, String detector, String field, String dimension, > long fromMillis, long toMillis, double count, int anomalyScore) { > this.schema = schema; > this.detector = detector; > this.field = field; > this.dimension = dimension; > this.startTimestamp = fromMillis; > this.endTimestamp = toMillis; > this.count = count; > this.anomalyScore = anomalyScore; > timeMap = new HashMap<>(); > timeMap.put(fromMillis, count); > } > public String getGroup() { > return group; > } > public void setGroup(String group) { > this.group = group; > } > public void setAdHashCode(int hashCode) { > this.adHashCode = hashCode; > } > public void setMeasure(String measure) { > this.measure = measure; > } > public String getMeasure() { > return measure; > } > // anomalyScore is using weighted average, may not be wise. > public void add(long fromMillis, long toMillis, double count, int anomalyScore, ArrayList Object>> logQuery) { > double sum = this.count * this.anomalyScore + count * anomalyScore; > this.count += count; > this.anomalyScore = (int) (sum / this.count); > if (fromMillis < this.startTimestamp) { > this.startTimestamp = fromMillis; > } > if (toMillis > this.endTimestamp) { > this.endTimestamp = toMillis; > } > if (!timeMap.containsKey(fromMillis)) { > timeMap.put(fromMillis, 0.0); > } > timeMap.put(fromMillis, timeMap.get(fromMillis) + count); > if (logQuery != null) { > this.logQuery.addAll(logQuery); > } > } > public void add(long fromMillis, long toMillis, double count, int anomalyScore) { > double sum = this.count * this.anomalyScore + count * anomalyScore; > this.count += count; > this.anomalyScore = (int) (sum / this.count); > if (fromMillis < this.startTimestamp) { > this.startTimestamp = fromMillis; > } > if (toMillis > this.endTimestamp) { > this.endTimestamp = toMillis; > } > if (!timeMap.containsKey(fromMillis)) { > timeMap.put(fromMillis, 0.0); > } > timeMap.put(fromMillis, timeMap.get(fromMillis) + count); > } > public Long[] getTimestamps() { > return timeMap.keySet().toArray(new Long[timeMap.size()]); > } > public String dimension() { > return dimension; > } > public long startTimestamp() { > return startTimestamp; > } > public long endTimestamp() { > return endTimestamp; > } > public double count() { > return count; > } > public int anomalyScore() { > return anomalyScore; > } > public boolean isAnomaly() { > return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >= ANOMALY_COUNT_THRESHOLD; > } > public String getUser() { > return user; > } > public void setUser(String user) { > this.user = user; > } > public void describeAs(String description, Icon... icons) { > this.description = description; > this.icons = icons; > } > public String setVisualizeInterfaceParameter(String group, long visualizeStartTimestamp, long > visualizeEndTimestamp) { > String requestParameterString = "/get_alert_visualize?detectorName=" + detector + "&groupField=" + group + > "&user=" + user + "&field=" + field + "&measureField=" + measure + "&schemaName=" + schema + > "&dimensionField=" + dimension + "&visualizeStartTimestamp=" + visualizeStartTimestamp + > "&visualizeEndTimestamp=" + visualizeEndTimestamp; > return requestParameterString; > } > @Override > public int hashCode() { > int result; > long temp; > result = schema != null ? schema.hashCode() : 0; > result = 31 * result + (detector != null ? detector.hashCode() : 0); > result = 31 * result + (measure != null ? measure.hashCode() : 0); > result = 31 * result + (field != null ? field.hashCode() : 0); > result = 31 * result + (dimension != null ? dimension.hashCode() : 0); > result = 31 * result + (description != null ? description.hashCode() : 0); > result = 31 * result + Arrays.hashCode(icons); > result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32)); > result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32)); > temp = Double.doubleToLongBits(count); > result = 31 * result + (int) (temp ^ (temp >>> 32)); > result = 31 * result + anomalyScore; > result = 31 * result + adHashCode; > result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0); > result = 31 * result + (user != null ? user.hashCode() : 0); > result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0); > result = 31 * result + (group != null ? group.hashCode() : 0); > return result; > } > @Override > public boolean equals(Object o) { > if (this == o) return true; > if (o == null || getClass() != o.getClass()) return false; > Behavior behavior = (Behavior) o; > if (startTimestamp != behavior.startTimestamp) return false; > if (endTimestamp != behavior.endTimestamp) return false; > if (Double.compare(behavior.count, count) != 0) return false; > if (anomalyScore != behavior.anomalyScore) return false; > if (adHashCode != behavior.adHashCode) return false; > if (schema != null ? !schema.equals(behavior.schema) : behavior.schema != null) > return false; > if (detector != null ? !detector.equals(behavior.detector) : behavior.detector != null) > return false; > if (measure != null ? !measure.equals(behavior.measure) : behavior.measure != null) > return false; > if (field != null ? !field.equals(behavior.field) : behavior.field != null) return false; > if (dimension != null ? !dimension.equals(behavior.dimension) : behavior.dimension != null) > return false; > if (description != null ? !description.equals(behavior.description) : behavior.description != null) > return false; > // Probably incorrect - comparing Object[] arrays with Arrays.equals > if (!Arrays.equals(icons, behavior.icons)) return false; > if (timeMap != null ? !timeMap.equals(behavior.timeMap) : behavior.timeMap != null) > return false; > if (user != null ? !user.equals(behavior.user) : behavior.user != null) return false; > if (logQuery != null ? !logQuery.equals(behavior.logQuery) : behavior.logQuery != null) > return false; > return group != null ? group.equals(behavior.group) : behavior.group == null; > } > @Override > public String toString() { > return "Behavior{" + > "schema='" + schema + '\'' + > ", detector='" + detector + '\'' + > ", measure='" + measure + '\'' + > ", field='" + field + '\'' + > ", dimension='" + dimension + '\'' + > ", description='" + description + '\'' + > ", icons=" + Arrays.toString(icons) + > ", startTimestamp=" + startTimestamp + > ", endTimestamp=" + endTimestamp + > ", count=" + count + > ", anomalyScore=" + anomalyScore + > ", adHashCode=" + adHashCode + > ", timeMap=" + timeMap + > ", user='" + user + '\'' + > ", logQuery=" + logQuery + > ", group='" + group + '\'' + > '}'; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)