eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject incubator-eagle git commit: [EAGLE-450]: Alert check meta confliction
Date Fri, 12 Aug 2016 06:45:13 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 9a7fedcc9 -> cc1261553


[EAGLE-450]: Alert check meta confliction

Author: Zeng, Bryant(mizeng@ebaysf.com)
Reviewer: ralphsu

This closes #330


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/cc126155
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/cc126155
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/cc126155

Branch: refs/heads/develop
Commit: cc12615536626dcbb71e47d37fc1f7df95dcdd20
Parents: 9a7fedc
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Fri Aug 12 14:43:00 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Fri Aug 12 14:43:00 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/model/AlertStreamEvent.java    | 10 +--
 .../eagle/alert/engine/model/StreamEvent.java   | 32 +++++--
 .../alert/engine/model/StreamEventBuilder.java  | 13 ++-
 .../eagle/alert/metric/MetricSystemTest.java    | 51 ++++++++---
 .../StreamDefinitionNotFoundException.java      |  4 +
 .../alert/engine/runner/AbstractStreamBolt.java | 32 +++----
 .../eagle/alert/engine/runner/AlertBolt.java    | 95 ++++++++++++++++++--
 .../impl/StreamEventSerializer.java             | 24 +++--
 .../eagle/alert/engine/e2e/Integration1.java    |  4 +-
 .../alert/engine/router/TestAlertBolt.java      | 91 +++++++++++++++++--
 10 files changed, 292 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index f36d3cb..a73eccd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -16,14 +16,14 @@
  */
 package org.apache.eagle.alert.engine.model;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.utils.DateTimeUtil;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * streamId stands for alert type instead of source event streamId
  */
@@ -59,8 +59,8 @@ public class AlertStreamEvent extends StreamEvent {
                 dataStrings.add(null);
             }
         }
-        return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policy=%s,
createdBy=%s]",
-                this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
StringUtils.join(dataStrings,","),this.getPolicy().getName(),this.getCreatedBy());
+        return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policy=%s,
createdBy=%s, metaVersion=%s]",
+                this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
StringUtils.join(dataStrings,","),this.getPolicy().getName(),this.getCreatedBy(),this.getMetaVersion());
     }
 
     public String getCreatedBy() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index 3e4e1df..5f59b1e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -16,17 +16,17 @@
  */
 package org.apache.eagle.alert.engine.model;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-
 /**
  * @since Apr 5, 2016
  *
@@ -37,6 +37,7 @@ public class StreamEvent implements Serializable {
     private String streamId;
     private Object[] data;
     private long timestamp;
+    private String metaVersion;
 
     public StreamEvent(){}
 
@@ -46,6 +47,13 @@ public class StreamEvent implements Serializable {
         this.setData(data);
     }
 
+    public StreamEvent(String streamId,long timestamp,Object[] data,String metaVersion){
+        this.setStreamId(streamId);
+        this.setTimestamp(timestamp);
+        this.setData(data);
+        this.setMetaVersion(metaVersion);
+    }
+
     public String getStreamId() {
         return streamId;
     }
@@ -70,9 +78,17 @@ public class StreamEvent implements Serializable {
         this.timestamp = timestamp;
     }
 
+    public String getMetaVersion() {
+        return metaVersion;
+    }
+
+    public void setMetaVersion(String metaVersion) {
+        this.metaVersion = metaVersion;
+    }
+
     @Override
     public int hashCode() {
-        return new HashCodeBuilder().append(streamId).append(timestamp).append(data).build();
+        return new HashCodeBuilder().append(streamId).append(timestamp).append(data).append(metaVersion).build();
     }
 
     @Override
@@ -97,7 +113,7 @@ public class StreamEvent implements Serializable {
                 }
             }
         }
-        return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s]]",this.getStreamId(),
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","));
+        return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",this.getStreamId(),
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","),
this.getMetaVersion());
     }
 
     public static StreamEventBuilder Builder(){
@@ -112,6 +128,7 @@ public class StreamEvent implements Serializable {
         newEvent.setTimestamp(this.getTimestamp());
         newEvent.setData(this.getData());
         newEvent.setStreamId(this.getStreamId());
+        newEvent.setMetaVersion(this.getMetaVersion());
         return newEvent;
     }
 
@@ -119,6 +136,7 @@ public class StreamEvent implements Serializable {
         this.setTimestamp(event.getTimestamp());
         this.setData(event.getData());
         this.setStreamId(event.getStreamId());
+        this.setMetaVersion(event.getMetaVersion());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
index 136fd8b..1036ba2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
@@ -16,15 +16,15 @@
  */
 package org.apache.eagle.alert.engine.model;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 public class StreamEventBuilder{
     private final static Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class);
 
@@ -74,6 +74,11 @@ public class StreamEventBuilder{
         return this;
     }
 
+    public StreamEventBuilder metaVersion(String metaVersion){
+        instance.setMetaVersion(metaVersion);
+        return this;
+    }
+
     public StreamEvent build(){
         if(instance.getStreamId() == null){
             throw new IllegalArgumentException("streamId is null of event: " + instance);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
index e96ded9..d1fb5f6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
@@ -1,18 +1,17 @@
 package org.apache.eagle.alert.metric;
 
-import java.util.HashMap;
-import java.util.Map;
-
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.JvmAttributeGaugeSet;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.metric.sink.KafkaSink;
 import org.apache.eagle.alert.metric.source.JVMMetricSource;
 import org.apache.eagle.alert.metric.source.MetricSource;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.JvmAttributeGaugeSet;
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.ConfigFactory;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -51,6 +50,34 @@ public class MetricSystemTest {
         system.stop();
     }
 
+    @Test @Ignore
+    public void testMetaConflict(){
+        MetricSystem system = MetricSystem.load(ConfigFactory.load());
+        system.register(new MetaConflictMetricSource());
+        system.start();
+        system.report();
+        system.stop();
+    }
+
+    private class MetaConflictMetricSource implements MetricSource {
+        private MetricRegistry registry = new MetricRegistry();
+
+        public MetaConflictMetricSource(){
+            registry.register("meta.conflict", (Gauge<String>) () -> "meta conflict
happening!");
+        }
+
+        @Override
+        public String name() {
+            return "metaConflict";
+        }
+
+        @Override
+        public MetricRegistry registry() {
+            return registry;
+        }
+    }
+
+
     private class SampleMetricSource implements MetricSource {
         private MetricRegistry registry = new MetricRegistry();
 
@@ -58,11 +85,11 @@ public class MetricSystemTest {
             registry.register("sample.long", (Gauge<Long>) System::currentTimeMillis);
             registry.register("sample.map", (Gauge<Map<String, Object>>) () ->
new HashMap<String, Object>(){
                 private static final long serialVersionUID = 3948508906655117683L;
-            {
-                put("int",1234);
-                put("str","text");
-                put("bool",true);
-            }});
+                {
+                    put("int",1234);
+                    put("str","text");
+                    put("bool",true);
+                }});
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
index 8b8fef3..5814ac6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
@@ -31,4 +31,8 @@ public class StreamDefinitionNotFoundException extends IOException {
     public StreamDefinitionNotFoundException(String streamName, String specVersion) {
         super(String.format("Stream '%s' not found! Current spec version '%s'. Possibly metadata
not loaded or metadata mismatch between upstream and alert bolts yet!", streamName, specVersion));
     }
+
+    public StreamDefinitionNotFoundException(String streamName, String streamMetaVersion,
String specVersion) {
+        super(String.format("Stream '%s' has meta version '%s' which is different from current
spec version '%s'.", streamName, streamMetaVersion, specVersion));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
index fe896d1..5cb38fc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -16,14 +16,14 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
@@ -35,19 +35,20 @@ import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 @SuppressWarnings({"rawtypes", "serial"})
 public abstract class AbstractStreamBolt extends BaseRichBolt implements SerializationMetadataProvider
{
     private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class);
     private IMetadataChangeNotifyService changeNotifyService;
+
+    public Config getConfig() {
+        return config;
+    }
+
     private Config config;
     private List<String> outputStreamIds;
     protected OutputCollector collector;
@@ -57,6 +58,7 @@ public abstract class AbstractStreamBolt extends BaseRichBolt implements
Seriali
     protected PartitionedEventSerializer serializer;
     protected volatile Map<String, StreamDefinition> sdf  = new HashMap<String,
StreamDefinition>();
     protected volatile String specVersion = "Not Initialized";
+    protected volatile boolean specVersionOutofdate = false;
     protected StreamContext streamContext;
 
     public AbstractStreamBolt(String boltId, IMetadataChangeNotifyService changeNotifyService,
Config config) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 683e571..9cfb59a 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -22,34 +22,41 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
 import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
 import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper;
 import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
+import org.apache.eagle.alert.metric.MetricSystem;
+import org.apache.eagle.alert.metric.source.MetricSource;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Since 5/1/16.
  * This is container for hosting all policies belonging to the same monitoredStream
- * MonitoredStream refers to tuple of {dataSource, streamId, grouopby}
+ * MonitoredStream refers to tuple of {dataSource, streamId, groupby}
  * The container is also called {@link WorkSlot}
  */
 public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListener,SerializationMetadataProvider
{
@@ -62,6 +69,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     // mapping from policy name to PolicyDefinition
     private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>();
// for one streamGroup, there are multiple policies
 
+    private AlertBoltSpec spec;
 
     public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService){
         super(boltId, changeNotifyService, config);
@@ -70,11 +78,81 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         // TODO next stage evaluator
     }
 
+    private class MetaConflictMetricSource implements MetricSource {
+        private MetricRegistry registry = new MetricRegistry();
+
+        public MetaConflictMetricSource(){
+            registry.register("meta.conflict", (Gauge<String>) () -> "meta conflict
happening!");
+        }
+
+        public MetaConflictMetricSource(String message){
+            registry.register("meta.conflict", (Gauge<String>) () -> message);
+        }
+
+        @Override
+        public String name() {
+            return "metaConflict";
+        }
+
+        @Override
+        public MetricRegistry registry() {
+            return registry;
+        }
+    }
+
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+
     @Override
     public void execute(Tuple input) {
         this.streamContext.counter().scope("execute_count").incr();
         try {
-            policyGroupEvaluator.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
+            PartitionedEvent pe = deserialize(input.getValueByField(AlertConstants.FIELD_0));
+            String stream_event_version = pe.getEvent().getMetaVersion();
+            if (stream_event_version != null && !stream_event_version.equals(specVersion))
{
+                if (stream_event_version == null){
+                    // if stream event version is null, need to initialize it
+                    pe.getEvent().setMetaVersion(specVersion);
+                }
+                // check if specVersion is older than stream_event_version
+                else if (specVersion != null && stream_event_version != null &&
specVersion.contains("spec_version_") && specVersion.contains("spec_version_")){
+                    Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
+                    Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
+                    specVersionOutofdate = timestamp_of_specVersion < timestamp_of_streamEventVersion;
+                    if (!specVersionOutofdate){
+                        pe.getEvent().setMetaVersion(specVersion);
+                    }
+                }
+
+                String message = String.format("Spec Version [%s] of AlertBolt is %s Stream
Event Version [%s]!", specVersion, specVersionOutofdate ? "older than":"newer than", stream_event_version);
+                LOG.warn(message);
+
+                // send out metrics for meta conflict
+                MetricSystem system = MetricSystem.load(this.getConfig());
+                system.register(new MetaConflictMetricSource(message));
+                system.start();
+                system.report();
+                system.stop();
+
+                executors.submit(() -> {
+                    // if spec version is out-of-date, need to refresh it
+                    if (specVersionOutofdate){
+                        try{
+                            IMetadataServiceClient client = new MetadataServiceClientImpl(this.getConfig());
+                            String topologyId = spec.getTopologyName();
+                            AlertBoltSpec latestSpec = client.getVersionedSpec().getAlertSpecs().get(topologyId);
+                            if (latestSpec != null){
+                                spec = latestSpec;
+                            }
+                        } catch (Exception e){
+                            LOG.error(e.toString());
+                        }
+
+                    }
+                });
+
+            }
+
+            policyGroupEvaluator.nextEvent(pe.withAnchor(input));
             synchronized (outputLock) {
                 this.collector.ack(input);
             }
@@ -133,6 +211,9 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         cachedPolicies = newPoliciesMap;
         sdf = sds;
         specVersion = spec.getVersion();
+        this.spec = spec;
+
+
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
index 0fb686b..e13b23f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
@@ -16,11 +16,6 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.BitSet;
-
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -28,6 +23,11 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider
 import org.apache.eagle.alert.engine.serialization.Serializer;
 import org.apache.eagle.alert.engine.serialization.Serializers;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
 /**
  * @see StreamEvent
  */
@@ -55,7 +55,12 @@ public class StreamEventSerializer implements Serializer<StreamEvent>
{
 
     @Override
     public void serialize(StreamEvent event, DataOutput dataOutput) throws IOException {
-        dataOutput.writeUTF(event.getStreamId());
+        // Bryant: here "metaVersion/streamId" writes to dataOutputUTF
+        String metaVersion = event.getMetaVersion();
+        String streamId = event.getStreamId();
+        String metaVersion_streamId = String.format("%s/%s", metaVersion, streamId);
+
+        dataOutput.writeUTF(metaVersion_streamId);
         dataOutput.writeLong(event.getTimestamp());
         if(event.getData() == null || event.getData().length == 0){
             dataOutput.writeInt(0);
@@ -82,7 +87,12 @@ public class StreamEventSerializer implements Serializer<StreamEvent>
{
     @Override
     public StreamEvent deserialize(DataInput dataInput) throws IOException {
         StreamEvent event = new StreamEvent();
-        event.setStreamId(dataInput.readUTF());
+        String metaVersion_streamId = dataInput.readUTF();
+        String streamId = metaVersion_streamId.split("/")[1];
+        String metaVersion = metaVersion_streamId.split("/")[0];
+        event.setStreamId(streamId);
+        event.setMetaVersion(metaVersion);
+
         StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
         event.setTimestamp(dataInput.readLong());
         int isNullBytesLen = dataInput.readInt();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
index a00d75f..d05b7ee 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
@@ -70,7 +70,7 @@ public class Integration1 {
         
         @Override
         public Thread newThread(Runnable r) {
-            Thread t = new Thread();
+            Thread t = new Thread(r);
             t.setDaemon(true);
             return t;
         }
@@ -118,7 +118,7 @@ public class Integration1 {
         System.out.println("loading metadatas done!");
 
         if (args == null) {
-            args = new String[] { "-f", "simple/application-integration.conf" };
+            args = new String[] { "-c", "simple/application-integration.conf" };
         }
 
         executors.submit(() -> SampleClient1.main(args));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cc126155/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 7dea167..9e82cc7 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -140,6 +140,7 @@ public class TestAlertBolt {
         // construct event with "value1"
         StreamEvent event1 = new StreamEvent();
         event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+        event1.setMetaVersion("version1");
         Object[] data = new Object[]{"value1"};
         event1.setData(data);
         event1.setStreamId(streamId);
@@ -148,6 +149,7 @@ public class TestAlertBolt {
         // construct another event with "value1"
         StreamEvent event2 = new StreamEvent();
         event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+        event2.setMetaVersion("version1");
         data = new Object[]{"value2"};
         event2.setData(data);
         event2.setStreamId(streamId);
@@ -202,7 +204,7 @@ public class TestAlertBolt {
         int taskId = 1;
         when(context.getComponentId(taskId)).thenReturn("comp1");
         when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-        // case 1: bolt prepared but metadata not initialized
+        // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
         PartitionedEvent pe = new PartitionedEvent();
         pe.setPartitionKey(1);
         pe.setPartition(createPartition());
@@ -220,7 +222,7 @@ public class TestAlertBolt {
         failedCount.set(0);
 
         {
-            // case 2: metadata loaded but empty
+            // case 2: metadata loaded but empty (AlertBoltSepc)
             bolt.onAlertBoltSpecChange(new AlertBoltSpec(), new HashMap());
 
             bolt.execute(input);
@@ -232,14 +234,17 @@ public class TestAlertBolt {
         {
             Map<String, StreamDefinition> sds = new HashMap();
             StreamDefinition sdTest = new StreamDefinition();
-            String streamId = "pd-test";
+            String streamId = "pd-test"; // here streamId is different from the one "test-stream"
(StreamEvent)
             sdTest.setStreamId(streamId);
             sds.put(sdTest.getStreamId(), sdTest);
+
             AlertBoltSpec boltSpecs = new AlertBoltSpec();
-            boltSpecs.setVersion("specVersion-"+System.currentTimeMillis());
+            boltSpecs.setVersion("specVersion-" + System.currentTimeMillis());
+
             PolicyDefinition def = new PolicyDefinition();
             def.setName("policy-definition");
             def.setInputStreams(Arrays.asList(streamId));
+
             PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
             definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE);
             definition.setValue("PT0M,plain,1,host,host1");
@@ -251,9 +256,85 @@ public class TestAlertBolt {
 
             bolt.execute(input);
             Assert.assertEquals(1, failedCount.get());
+            failedCount.set(0);
         }
     }
 
+    @Test
+    public void testMetaversionConflict() throws Exception {
+        AtomicInteger failedCount = new AtomicInteger();
+        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+            int count = 0;
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+                Assert.assertEquals("testAlertStream", tuple.get(0));
+                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
+                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s]
", streamId, tuple));
+                return null;
+            }
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors,
List<Object> tuple) {            }
+            @Override
+            public void ack(Tuple input) {            }
+            @Override
+            public void fail(Tuple input) {      failedCount.incrementAndGet();      }
+            @Override
+            public void reportError(Throwable error) {            }
+        });
+        AlertBolt bolt = createAlertBolt(collector);
+
+        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+        int taskId = 1;
+        when(context.getComponentId(taskId)).thenReturn("comp1");
+        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+        // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
+        PartitionedEvent pe = new PartitionedEvent();
+        pe.setPartitionKey(1);
+        pe.setPartition(createPartition());
+        StreamEvent streamEvent = new StreamEvent();
+        streamEvent.setStreamId("test-stream");
+        streamEvent.setTimestamp(System.currentTimeMillis());
+        streamEvent.setMetaVersion("spec_version_"+System.currentTimeMillis());
+        pe.setEvent(streamEvent);
+
+        PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
+        byte[] serializedEvent = peSer.serialize(pe);
+        Tuple input = new TupleImpl(context, Collections.singletonList(serializedEvent),
taskId, "default");
+
+
+        Map<String, StreamDefinition> sds = new HashMap();
+        StreamDefinition sdTest = new StreamDefinition();
+        String streamId = "test-stream";
+        sdTest.setStreamId(streamId);
+        sds.put(sdTest.getStreamId(), sdTest);
+
+        AlertBoltSpec boltSpecs = new AlertBoltSpec();
+        boltSpecs.setVersion("spec_version_" + System.currentTimeMillis());
+        boltSpecs.setTopologyName("alertUnitTopology_1");
+
+        PolicyDefinition def = new PolicyDefinition();
+        def.setName("policy-definition");
+        def.setInputStreams(Arrays.asList(streamId));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE);
+        definition.setValue("PT0M,plain,1,host,host1");
+        def.setDefinition(definition);
+
+        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
+        bolt = createAlertBolt(collector);
+        bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
+        bolt.execute(input);
+
+        // Sleep 10s to wait thread in bolt.execute() to finish works
+        Thread.sleep(10000);
+
+        Assert.assertEquals(0, failedCount.get());
+        failedCount.set(0);
+
+    }
+
     @NotNull
     private StreamPartition createPartition() {
         StreamPartition sp = new StreamPartition();
@@ -261,4 +342,4 @@ public class TestAlertBolt {
         return sp;
     }
 
-}
+}
\ No newline at end of file



Mime
View raw message