eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject incubator-eagle git commit: EAGLE-394 : fix NPE and broken test
Date Tue, 26 Jul 2016 00:37:38 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop fabfc25b9 -> f1a93b1bb


EAGLE-394 : fix NPE and broken test

1. avoid mismatch metadata caused serialize/deserialize NPE issue
2. fix broken test in remote branch after merge

Author: ralphsu
Reviewer: ralphsu

Closes #273


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f1a93b1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f1a93b1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f1a93b1b

Branch: refs/heads/develop
Commit: f1a93b1bbc29aadbda5f09ddf2622252e8893b0b
Parents: fabfc25
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Tue Jul 26 08:32:04 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Tue Jul 26 08:32:04 2016 +0800

----------------------------------------------------------------------
 .../src/test/resources/streamDef.json           |  14 +-
 .../StreamDefinitionNotFoundException.java      |  15 +-
 .../engine/evaluator/PolicyStreamHandlers.java  |   6 +-
 .../eagle/alert/engine/runner/AlertBolt.java    |  53 ++++---
 .../SerializationMetadataProvider.java          |   8 +-
 .../alert/engine/router/TestAlertBolt.java      | 153 +++++++++++++++----
 .../eagle/app/TestApplicationTestSuite.java     |   1 +
 7 files changed, 179 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f1a93b1b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
index 8c7c33a..205905b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
@@ -17,14 +17,14 @@
 			"defaultValue": 0,
 			"required":true
 		},{
-			"name": "metric",
-			"type" : "string",
-			"defaultValue": "perfmon_cpu",
+			"name": "floatField",
+			"type" : "float",
+			"defaultValue": "1.2",
 			"required": true
 		},{
-			"name": "pool",
-			"type" : "string",
-			"defaultValue": "raptor_general",
+			"name": "intField",
+			"type" : "int",
+			"defaultValue": "3",
 			"required":true
 		},{
 			"name": "value",
@@ -33,7 +33,7 @@
 			"required":true
 		},
 		{
-			"name": "colo",
+			"name": "boolField",
 			"type" : "bool",
 			"defaultValue": true,
 			"required":true

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f1a93b1b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
index 927eb8c..8b8fef3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
@@ -1,5 +1,7 @@
 package org.apache.eagle.alert.engine.coordinator;
 
+import java.io.IOException;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,10 +18,17 @@ package org.apache.eagle.alert.engine.coordinator;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-public class StreamDefinitionNotFoundException extends Exception {
+public class StreamDefinitionNotFoundException extends IOException {
     private static final long serialVersionUID = 6027811718016485808L;
 
-    public StreamDefinitionNotFoundException(String streamId){
-        super("Stream definition not found: "+streamId);
+    public StreamDefinitionNotFoundException() {
+    }
+
+    public StreamDefinitionNotFoundException(String streamId) {
+        super("Stream definition not found: " + streamId);
+    }
+
+    public StreamDefinitionNotFoundException(String streamName, String specVersion) {
+        super(String.format("Stream '%s' not found! Current spec version '%s'. Possibly metadata
not loaded or metadata mismatch between upstream and alert bolts yet!", streamName, specVersion));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f1a93b1b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index 638b240..fec8ff6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -16,12 +16,12 @@
  */
 package org.apache.eagle.alert.engine.evaluator;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
-import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+
+import java.util.Map;
 
 public class PolicyStreamHandlers {
     public static final String SIDDHI_ENGINE ="siddhi";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f1a93b1b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 140fbff..5daa236 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -16,20 +16,19 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
 import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
 import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -41,14 +40,10 @@ import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Since 5/1/16.
@@ -69,6 +64,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     private StreamContext streamContext;
     private volatile Map<String, StreamDefinition> sdf  = new HashMap<String, StreamDefinition>();
     private PartitionedEventSerializer serializer;
+    private volatile String specVersion = "Not Initialized";
 
     public AlertBolt(String boltId, PolicyGroupEvaluator policyGroupEvaluator, Config config,
IMetadataChangeNotifyService changeNotifyService){
         super(changeNotifyService, config);
@@ -80,8 +76,10 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         // byte[] in higher priority
         if(object instanceof byte[]) {
             return serializer.deserialize((byte[]) object);
-        } else {
+        } else if (object instanceof PartitionedEvent){
             return (PartitionedEvent) object;
+        } else {
+            throw new IllegalStateException(String.format("Unsupported event class '%s',
expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName()));
         }
     }
 
@@ -94,8 +92,8 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
                 this.collector.ack(input);
             }
             this.streamContext.counter().scope("ack_count").incr();
-        }catch (Exception ex) {
-            LOG.error(ex.getMessage(),ex);
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
             synchronized (outputLock) {
                 this.streamContext.counter().scope("fail_count").incr();
                 this.collector.fail(input);
@@ -131,7 +129,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     }
 
     @Override
-    public void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition>
sds) {
+    public synchronized void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition>
sds) {
         List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId);
         if(newPolicies == null) {
             LOG.info("no new policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
@@ -148,10 +146,19 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         // switch
         cachedPolicies = newPoliciesMap;
         sdf = sds;
+        specVersion = spec.getVersion();
     }
 
     @Override
-    public StreamDefinition getStreamDefinition(String streamId) {
-        return sdf.get(streamId);
+    public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException
{
+        if (sdf.containsKey(streamId)) {
+            return sdf.get(streamId);
+        } else {
+            throw new StreamDefinitionNotFoundException(streamId, specVersion);
+        }
+    }
+
+    public String getBoltId() {
+        return boltId;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f1a93b1b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
index 71b274d..69bb695 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
@@ -17,6 +17,9 @@
 package org.apache.eagle.alert.engine.serialization;
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+
+import java.io.IOException;
 
 /**
  * Integration interface to provide stream definition for serializer
@@ -26,5 +29,6 @@ public interface SerializationMetadataProvider {
      * @param streamId
      * @return StreamDefinition or null if not exist
      */
-    StreamDefinition getStreamDefinition(String streamId);
-}
\ No newline at end of file
+    StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f1a93b1b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index e322099..3bd7b9b 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -16,21 +16,16 @@
  */
 package org.apache.eagle.alert.engine.router;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
@@ -44,21 +39,20 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.runner.AlertBolt;
 import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt;
+import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
 import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Test;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
+import java.util.*;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Since 5/2/16.
@@ -82,10 +76,6 @@ public class TestAlertBolt {
     public void testAlertBolt() throws Exception{
         final AtomicInteger alertCount = new AtomicInteger();
         final Semaphore mutex = new Semaphore(0);
-        Config config = ConfigFactory.load();
-        PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
-        TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
-        AlertBolt bolt = new AlertBolt("alertBolt1", policyGroupEvaluator, config, mockChangeService);
         OutputCollector collector = new OutputCollector(new IOutputCollector(){
             int count = 0;
             @Override
@@ -106,10 +96,7 @@ public class TestAlertBolt {
             @Override
             public void reportError(Throwable error) {            }
         });
-        Map stormConf = new HashMap<>();
-        TopologyContext topologyContext = mock(TopologyContext.class);
-        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class),
any(int.class))).thenReturn(new MultiCountMetric());
-        bolt.prepare(stormConf, topologyContext, collector);
+        AlertBolt bolt = createAlertBolt(collector);
 
         String streamId = "cpuUsageStream";
 
@@ -174,4 +161,104 @@ public class TestAlertBolt {
         Assert.assertEquals(2, alertCount.get());
         bolt.cleanup();
     }
+
+    @NotNull
+    private AlertBolt createAlertBolt(OutputCollector collector) {
+        Config config = ConfigFactory.load();
+        PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
+        TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
+        AlertBolt bolt = new AlertBolt("alertBolt1", policyGroupEvaluator, config, mockChangeService);
+        Map stormConf = new HashMap<>();
+        TopologyContext topologyContext = mock(TopologyContext.class);
+        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class),
any(int.class))).thenReturn(new MultiCountMetric());
+        bolt.prepare(stormConf, topologyContext, collector);
+        return bolt;
+    }
+
+    @Test
+    public void testMetadataMismatch() throws Exception {
+        AtomicInteger failedCount = new AtomicInteger();
+        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+            int count = 0;
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+                Assert.assertEquals("testAlertStream", tuple.get(0));
+                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
+                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s]
", streamId, tuple));
+                return null;
+            }
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors,
List<Object> tuple) {            }
+            @Override
+            public void ack(Tuple input) {            }
+            @Override
+            public void fail(Tuple input) {      failedCount.incrementAndGet();      }
+            @Override
+            public void reportError(Throwable error) {            }
+        });
+        AlertBolt bolt = createAlertBolt(collector);
+
+        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+        int taskId = 1;
+        when(context.getComponentId(taskId)).thenReturn("comp1");
+        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+        // case 1: bolt prepared but metadata not initialized
+        PartitionedEvent pe = new PartitionedEvent();
+        pe.setPartitionKey(1);
+        pe.setPartition(createPartition());
+        StreamEvent streamEvent = new StreamEvent();
+        streamEvent.setStreamId("test-stream");
+        streamEvent.setTimestamp(System.currentTimeMillis());
+        pe.setEvent(streamEvent);
+
+        PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
+        byte[] serializedEvent = peSer.serialize(pe);
+        Tuple input = new TupleImpl(context, Collections.singletonList(serializedEvent),
taskId, "default");
+        bolt.execute(input);
+
+        Assert.assertEquals(1, failedCount.get());
+        failedCount.set(0);
+
+        {
+            // case 2: metadata loaded but empty
+            bolt.onAlertBoltSpecChange(new AlertBoltSpec(), new HashMap());
+
+            bolt.execute(input);
+            Assert.assertEquals(1, failedCount.get());
+            failedCount.set(0);
+        }
+
+        // case 3: metadata loaded but mismatched
+        {
+            Map<String, StreamDefinition> sds = new HashMap();
+            StreamDefinition sdTest = new StreamDefinition();
+            String streamId = "pd-test";
+            sdTest.setStreamId(streamId);
+            sds.put(sdTest.getStreamId(), sdTest);
+            AlertBoltSpec boltSpecs = new AlertBoltSpec();
+            boltSpecs.setVersion("specVersion-"+System.currentTimeMillis());
+            PolicyDefinition def = new PolicyDefinition();
+            def.setName("policy-definition");
+            def.setInputStreams(Arrays.asList(streamId));
+            PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+            definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE);
+            definition.setValue("PT0M,plain,1,host,host1");
+            def.setDefinition(definition);
+
+            boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
+
+            bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
+            bolt.execute(input);
+            Assert.assertEquals(1, failedCount.get());
+        }
+    }
+
+    @NotNull
+    private StreamPartition createPartition() {
+        StreamPartition sp = new StreamPartition();
+        sp.setType(StreamPartition.Type.GROUPBY);
+        return sp;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f1a93b1b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
index 10fcffc..3014b16 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.SiteEntity;
 import org.apache.eagle.metadata.resource.SiteResource;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 


Mime
View raw message