flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chesnay Schepler (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
Date Wed, 13 Dec 2017 10:13:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16289021#comment-16289021
] 

Chesnay Schepler commented on FLINK-8248:
-----------------------------------------

Did you actually experience this issue in 1.3.2? (the title and description only refers to
1.4)

> RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
> -----------------------------------------------------------------------
>
>                 Key: FLINK-8248
>                 URL: https://issues.apache.org/jira/browse/FLINK-8248
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP, State Backends, Checkpointing
>    Affects Versions: 1.4.0, 1.3.2
>         Environment: linux: 3.10.0-514.el7.x86_64
> flink: 
> *  version: 1.4
> *  rocksdb backend state
> *  checkpoint interval 5s
> *  keyed cep
> language: Java8
>            Reporter: jia liu
>
> Here is my exception log:
> {code:java}
> java.lang.RuntimeException: Exception occurred while processing valve output watermark:

> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
> 	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
> 	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
> 	... 7 more
> Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', field='activity', dimension='Logoff',
description='null', icons=null, startTimestamp=1465297200000, endTimestamp=1465297203600,
count=11.0, anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0}, user='LMR0049',
logQuery=null, group='null'}, 1465300799999, 0), [SharedBufferEdge(null, 199)], 1)
> 	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> 	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
> 	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
> 	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> 	... 13 more
> {code}
> Main job code:
> {code:java}
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         env.setStateBackend(new RocksDBStateBackend(getString("flink.backend-state-dir")));
> // .........
> DataStream<Behavior> behaviorStream = anomalyStream
>                 .assignTimestampsAndWatermarks(new AnomalyTimestampExtractor(Time.seconds(0)))
>                 .keyBy((KeySelector<AnomalySlice, String>) value -> value.entity)
>                 .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
>                         Time.seconds(getLong("flink.window.slice-size"))))
>                 .apply(new BehaviorBuilderFunction())
>                 .filter(new WhitelistFilterFunction())
>                 // non-keyed stream will result in pattern operator parallelism equal
to 1.
>                 .keyBy((KeySelector<Behavior, String>) Behavior::getUser);
>         // cep on behavior stream
>         List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
>         for (Pattern pa : allPatterns) {
>             PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
>             ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
>         }
> {code}
> keyed stream event:
> {code:java}
> public class Behavior implements Serializable {
>     private static final long serialVersionUID = 7786674623147772721L;
>     static int ANOMALY_SCORE_THRESHOLD = 40;
>     static int ANOMALY_COUNT_THRESHOLD = 3;
>     public final String schema;
>     public final String detector;
>     private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
>     public final String dimension;
>     public final String field; //dim value
>     private String user;
>     public String group;
>     public double count;
>     public int anomalyScore;
>     protected String description;
>     private Icon[] icons;
>     private int adHashCode;
>     private long startTimestamp;
>     private long endTimestamp;
>     private Map<Long, Double> timeMap;
>     public ArrayList<HashMap<String, Object>> logQuery;
>     public Behavior(String schema, String detector, String field, String dimension, String
user,
>                     long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
>             Object>> logQuery) {
>         this.schema = schema;
>         this.detector = detector;
>         this.field = field;
>         this.dimension = dimension;
>         this.user = user;
>         this.startTimestamp = fromMillis;
>         this.endTimestamp = toMillis;
>         this.count = count;
>         this.anomalyScore = anomalyScore;
>         this.logQuery = logQuery;
>         timeMap = new HashMap<>();
>         timeMap.put(fromMillis, count);
>     }
>     public Behavior(String schema, String detector, String field, String dimension,
>                     long fromMillis, long toMillis, double count, int anomalyScore) {
>         this.schema = schema;
>         this.detector = detector;
>         this.field = field;
>         this.dimension = dimension;
>         this.startTimestamp = fromMillis;
>         this.endTimestamp = toMillis;
>         this.count = count;
>         this.anomalyScore = anomalyScore;
>         timeMap = new HashMap<>();
>         timeMap.put(fromMillis, count);
>     }
>     public String getGroup() {
>         return group;
>     }
>     public void setGroup(String group) {
>         this.group = group;
>     }
>     public void setAdHashCode(int hashCode) {
>         this.adHashCode = hashCode;
>     }
>     public void setMeasure(String measure) {
>         this.measure = measure;
>     }
>     public String getMeasure() {
>         return measure;
>     }
>     // anomalyScore is using weighted average, may not be wise.
>     public void add(long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
>             Object>> logQuery) {
>         double sum = this.count * this.anomalyScore + count * anomalyScore;
>         this.count += count;
>         this.anomalyScore = (int) (sum / this.count);
>         if (fromMillis < this.startTimestamp) {
>             this.startTimestamp = fromMillis;
>         }
>         if (toMillis > this.endTimestamp) {
>             this.endTimestamp = toMillis;
>         }
>         if (!timeMap.containsKey(fromMillis)) {
>             timeMap.put(fromMillis, 0.0);
>         }
>         timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
>         if (logQuery != null) {
>             this.logQuery.addAll(logQuery);
>         }
>     }
>     public void add(long fromMillis, long toMillis, double count, int anomalyScore) {
>         double sum = this.count * this.anomalyScore + count * anomalyScore;
>         this.count += count;
>         this.anomalyScore = (int) (sum / this.count);
>         if (fromMillis < this.startTimestamp) {
>             this.startTimestamp = fromMillis;
>         }
>         if (toMillis > this.endTimestamp) {
>             this.endTimestamp = toMillis;
>         }
>         if (!timeMap.containsKey(fromMillis)) {
>             timeMap.put(fromMillis, 0.0);
>         }
>         timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
>     }
>     public Long[] getTimestamps() {
>         return timeMap.keySet().toArray(new Long[timeMap.size()]);
>     }
>     public String dimension() {
>         return dimension;
>     }
>     public long startTimestamp() {
>         return startTimestamp;
>     }
>     public long endTimestamp() {
>         return endTimestamp;
>     }
>     public double count() {
>         return count;
>     }
>     public int anomalyScore() {
>         return anomalyScore;
>     }
>     public boolean isAnomaly() {
>         return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >=
ANOMALY_COUNT_THRESHOLD;
>     }
>     public String getUser() {
>         return user;
>     }
>     public void setUser(String user) {
>         this.user = user;
>     }
>     public void describeAs(String description, Icon... icons) {
>         this.description = description;
>         this.icons = icons;
>     }
>     public String setVisualizeInterfaceParameter(String group, long visualizeStartTimestamp,
long
>             visualizeEndTimestamp) {
>         String requestParameterString = "/get_alert_visualize?detectorName=" + detector
+ "&groupField=" + group +
>                 "&user=" + user + "&field=" + field + "&measureField=" +
measure + "&schemaName=" + schema +
>                 "&dimensionField=" + dimension + "&visualizeStartTimestamp="
+ visualizeStartTimestamp +
>                 "&visualizeEndTimestamp=" + visualizeEndTimestamp;
>         return requestParameterString;
>     }
>     @Override
>     public int hashCode() {
>         int result;
>         long temp;
>         result = schema != null ? schema.hashCode() : 0;
>         result = 31 * result + (detector != null ? detector.hashCode() : 0);
>         result = 31 * result + (measure != null ? measure.hashCode() : 0);
>         result = 31 * result + (field != null ? field.hashCode() : 0);
>         result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
>         result = 31 * result + (description != null ? description.hashCode() : 0);
>         result = 31 * result + Arrays.hashCode(icons);
>         result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
>         result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
>         temp = Double.doubleToLongBits(count);
>         result = 31 * result + (int) (temp ^ (temp >>> 32));
>         result = 31 * result + anomalyScore;
>         result = 31 * result + adHashCode;
>         result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
>         result = 31 * result + (user != null ? user.hashCode() : 0);
>         result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
>         result = 31 * result + (group != null ? group.hashCode() : 0);
>         return result;
>     }
>     @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (o == null || getClass() != o.getClass()) return false;
>         Behavior behavior = (Behavior) o;
>         if (startTimestamp != behavior.startTimestamp) return false;
>         if (endTimestamp != behavior.endTimestamp) return false;
>         if (Double.compare(behavior.count, count) != 0) return false;
>         if (anomalyScore != behavior.anomalyScore) return false;
>         if (adHashCode != behavior.adHashCode) return false;
>         if (schema != null ? !schema.equals(behavior.schema) : behavior.schema != null)
>             return false;
>         if (detector != null ? !detector.equals(behavior.detector) : behavior.detector
!= null)
>             return false;
>         if (measure != null ? !measure.equals(behavior.measure) : behavior.measure !=
null)
>             return false;
>         if (field != null ? !field.equals(behavior.field) : behavior.field != null) return
false;
>         if (dimension != null ? !dimension.equals(behavior.dimension) : behavior.dimension
!= null)
>             return false;
>         if (description != null ? !description.equals(behavior.description) : behavior.description
!= null)
>             return false;
>         // Probably incorrect - comparing Object[] arrays with Arrays.equals
>         if (!Arrays.equals(icons, behavior.icons)) return false;
>         if (timeMap != null ? !timeMap.equals(behavior.timeMap) : behavior.timeMap !=
null)
>             return false;
>         if (user != null ? !user.equals(behavior.user) : behavior.user != null) return
false;
>         if (logQuery != null ? !logQuery.equals(behavior.logQuery) : behavior.logQuery
!= null)
>             return false;
>         return group != null ? group.equals(behavior.group) : behavior.group == null;
>     }
>     @Override
>     public String toString() {
>         return "Behavior{" +
>                 "schema='" + schema + '\'' +
>                 ", detector='" + detector + '\'' +
>                 ", measure='" + measure + '\'' +
>                 ", field='" + field + '\'' +
>                 ", dimension='" + dimension + '\'' +
>                 ", description='" + description + '\'' +
>                 ", icons=" + Arrays.toString(icons) +
>                 ", startTimestamp=" + startTimestamp +
>                 ", endTimestamp=" + endTimestamp +
>                 ", count=" + count +
>                 ", anomalyScore=" + anomalyScore +
>                 ", adHashCode=" + adHashCode +
>                 ", timeMap=" + timeMap +
>                 ", user='" + user + '\'' +
>                 ", logQuery=" + logQuery +
>                 ", group='" + group + '\'' +
>                 '}';
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message