eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-440: Alert mongodb storage refine
Date Thu, 11 Aug 2016 06:55:13 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 08abde513 -> a772a0556


EAGLE-440: Alert mongodb storage refine


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/678437aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/678437aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/678437aa

Branch: refs/heads/develop
Commit: 678437aadb5ca6e5c7256fabd40b4966e4227911
Parents: e2532a1
Author: mizeng <mizeng@ebaysf.com>
Authored: Wed Aug 10 16:04:16 2016 +0800
Committer: mizeng <mizeng@ebaysf.com>
Committed: Thu Aug 11 10:52:38 2016 +0800

----------------------------------------------------------------------
 .../model/internal/ScheduleStateBase.java       |  86 ++++++
 .../metadata/impl/MongoMetadataDaoImpl.java     | 298 +++++++++++++++----
 .../alert/resource/impl/MongoImplTest.java      | 121 ++++++--
 3 files changed, 434 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/678437aa/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
new file mode 100644
index 0000000..a1efbf9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+/**
+ *
+ * This is the Base part of ScheduleState, only contains version/generateTime/code/message/scheduleTimeMillis
+ *
+ *
+ * @since Aug 10, 2016
+ *
+ */
+public class ScheduleStateBase {
+    private String version;
+    // FIXME : should be date, can not make it simple in mongo..
+    private String generateTime;
+    private int code = 200;
+    private String message = "OK";
+    private int scheduleTimeMillis;
+
+    public ScheduleStateBase(String version, String generateTime, int code, String message,
int scheduleTimeMillis) {
+        this.version = version;
+        this.generateTime = generateTime;
+        this.code = code;
+        this.message = message;
+        this.scheduleTimeMillis = scheduleTimeMillis;
+    }
+
+    public int getScheduleTimeMillis() {
+        return scheduleTimeMillis;
+    }
+
+    public void setScheduleTimeMillis(int scheduleTimeMillis) {
+        this.scheduleTimeMillis = scheduleTimeMillis;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public String getGenerateTime() {
+        return generateTime;
+    }
+
+    public void setGenerateTime(String generateTime) {
+        this.generateTime = generateTime;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/678437aa/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
index a990b13..d68dc6a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
@@ -16,20 +16,26 @@
  */
 package org.apache.eagle.alert.metadata.impl;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
+import com.mongodb.Block;
+import com.mongodb.Function;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.IndexOptions;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.coordination.model.*;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.ScheduleStateBase;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.MetadataUtils;
 import org.apache.eagle.alert.metadata.resource.Models;
@@ -41,18 +47,11 @@ import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.mongodb.Function;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.IndexOptions;
-import com.mongodb.client.model.UpdateOptions;
-import com.mongodb.client.result.DeleteResult;
-import com.mongodb.client.result.UpdateResult;
-import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @since Apr 11, 2016
@@ -77,9 +76,18 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
     private MongoCollection<Document> policy;
     private MongoCollection<Document> publishment;
     private MongoCollection<Document> publishmentType;
+    private MongoCollection<Document> topologies;
+
+    // scheduleStates splits to several collections
     private MongoCollection<Document> scheduleStates;
+    private MongoCollection<Document> spoutSpecs;
+    private MongoCollection<Document> alertSpecs;
+    private MongoCollection<Document> groupSpecs;
+    private MongoCollection<Document> publishSpecs;
+    private MongoCollection<Document> policySnapshots;
+    private MongoCollection<Document> streamSnapshots;
+    private MongoCollection<Document> monitoredStreams;
     private MongoCollection<Document> assignments;
-    private MongoCollection<Document> topologies;
 
     @Inject
     public MongoMetadataDaoImpl(Config config) {
@@ -118,21 +126,46 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
             publishmentType.createIndex(doc1, io1);
         }
 
+        // below is for schedule_specs and its splitted collections
+        BsonDocument doc1 = new BsonDocument();
+        IndexOptions io1 = new IndexOptions().background(true).unique(true).name("versionIndex");
+        doc1.append("version", new BsonInt32(1));
         scheduleStates = db.getCollection("schedule_specs");
+        scheduleStates.createIndex(doc1, io1);
+
+        spoutSpecs = db.getCollection("spoutSpecs");
         {
-            IndexOptions io1 = new IndexOptions().background(true).unique(true).name("nameIndex");
-            BsonDocument doc1 = new BsonDocument();
-            doc1.append("version", new BsonInt32(1));
-            scheduleStates.createIndex(doc1, io1);
+            IndexOptions io_internal = new IndexOptions().background(true).unique(true).name("topologyIdIndex");
+            BsonDocument doc_internal = new BsonDocument();
+            doc_internal.append("topologyId", new BsonInt32(1));
+            spoutSpecs.createIndex(doc_internal, io_internal);
         }
 
-        assignments = db.getCollection("assignments");
+        alertSpecs = db.getCollection("alertSpecs");
         {
-            IndexOptions io1 = new IndexOptions().background(true).unique(true).name("policyNameIndex");
-            BsonDocument doc1 = new BsonDocument();
-            doc1.append("policyName", new BsonInt32(1));
-            assignments.createIndex(doc1, io1);
+            IndexOptions io_internal = new IndexOptions().background(true).unique(true).name("topologyNameIndex");
+            BsonDocument doc_internal = new BsonDocument();
+            doc_internal.append("topologyName", new BsonInt32(1));
+            alertSpecs.createIndex(doc_internal, io_internal);
         }
+
+        groupSpecs = db.getCollection("groupSpecs");
+        groupSpecs.createIndex(doc1, io1);
+
+        publishSpecs = db.getCollection("publishSpecs");
+        publishSpecs.createIndex(doc1, io1);
+
+        policySnapshots = db.getCollection("policySnapshots");
+        policySnapshots.createIndex(doc1, io);
+
+        streamSnapshots = db.getCollection("streamSnapshots");
+        streamSnapshots.createIndex(doc1, io);
+
+        monitoredStreams = db.getCollection("monitoredStreams");
+        monitoredStreams.createIndex(doc1, io);
+
+        assignments = db.getCollection("assignments");
+        assignments.createIndex(doc1, io1);
     }
 
     @Override
@@ -285,6 +318,21 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
         return remove(publishmentType, pubType);
     }
 
+    private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
+        OpResult result = new OpResult();
+        try {
+            String json = mapper.writeValueAsString(t);
+            collection.insertOne(Document.parse(json));
+            result.code = 200;
+            result.message = String.format("add one document to collection %s succeed!",
collection.getNamespace());
+        } catch (Exception e) {
+            result.code = 400;
+            result.message = e.getMessage();
+            LOG.error("", e);
+        }
+        return result;
+    }
+
     @Override
     public ScheduleState getScheduleState(String versionId) {
         BsonDocument doc = new BsonDocument();
@@ -301,29 +349,21 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
                 return null;
             }
         }).first();
-        return state;
-    }
-
-    @Override
-    public OpResult addScheduleState(ScheduleState state) {
-        return addOne(scheduleStates, state);
-    }
 
-    private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
-        OpResult result = new OpResult();
-        try {
-            String json = mapper.writeValueAsString(t);
-            collection.insertOne(Document.parse(json));
-            result.code = 200;
-            result.message = "add state succeed!";
-        } catch (Exception e) {
-            result.code = 400;
-            result.message = e.getMessage();
-            LOG.error("", e);
+        if (state != null){
+            // based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
+            state = addDetailForScheduleState(state, versionId);
         }
-        return result;
+
+        return state;
     }
 
+    /***
+     * get the basic ScheduleState, and then based on the version to get all sub-part(spoutSpecs/alertSpecs/etc)
+     * to form a completed ScheduleState.
+     *
+     * @return the latest ScheduleState
+     */
     @Override
     public ScheduleState getScheduleState() {
         BsonDocument sort = new BsonDocument();
@@ -340,9 +380,163 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
                 return null;
             }
         }).first();
+
+        if (state != null){
+            String version = state.getVersion();
+            // based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
+            state = addDetailForScheduleState(state, version);
+        }
+
+        return state;
+    }
+
+    private ScheduleState addDetailForScheduleState(ScheduleState state, String version){
+        Map<String, SpoutSpec> spoutMaps = maps(spoutSpecs, SpoutSpec.class, version);
+        if (spoutMaps.size() !=0){
+            state.setSpoutSpecs(spoutMaps);
+        }
+
+        Map<String, AlertBoltSpec> alertMaps = maps(alertSpecs, AlertBoltSpec.class,
version);
+        if (alertMaps.size() !=0){
+            state.setAlertSpecs(alertMaps);
+        }
+
+        Map<String, RouterSpec> groupMaps = maps(groupSpecs, RouterSpec.class, version);
+        if (groupMaps.size() !=0){
+            state.setGroupSpecs(groupMaps);
+        }
+
+        Map<String, PublishSpec> publishMaps = maps(publishSpecs, PublishSpec.class,
version);
+        if (publishMaps.size() !=0){
+            state.setPublishSpecs(publishMaps);
+        }
+
+        List<VersionedPolicyDefinition> policyLists = list(policySnapshots, VersionedPolicyDefinition.class,
version);
+        if (policyLists.size() !=0){
+            state.setPolicySnapshots(policyLists);
+        }
+
+        List<VersionedStreamDefinition> streamLists = list(streamSnapshots, VersionedStreamDefinition.class,
version);
+        if (streamLists.size() !=0){
+            state.setStreamSnapshots(streamLists);
+        }
+
+        List<MonitoredStream> monitorLists = list(monitoredStreams, MonitoredStream.class,
version);
+        if (monitorLists.size() !=0){
+            state.setMonitoredStreams(monitorLists);
+        }
+
+        List<PolicyAssignment> assignmentLists = list(assignments, PolicyAssignment.class,
version);
+        if (assignmentLists.size() !=0){
+            state.setAssignments(assignmentLists);
+        }
         return state;
     }
 
+    private <T> Map<String, T> maps(MongoCollection<Document> collection,
Class<T> clz, String version){
+        BsonDocument doc = new BsonDocument();
+        doc.append("version", new BsonString(version));
+
+        Map<String, T> maps = new HashMap<String, T>();
+        String mapKey = (clz == SpoutSpec.class)? "topologyId" : "topologyName";
+        collection.find(doc).forEach(new Block<Document>() {
+            @Override
+            public void apply(Document document) {
+                String json = document.toJson();
+                try {
+                    maps.put(document.getString(mapKey), mapper.readValue(json, clz));
+                } catch (IOException e) {
+                    LOG.error("deserialize config item failed!", e);
+                }
+            }
+        });
+
+        return maps;
+    }
+
+    private <T> List<T> list(MongoCollection<Document> collection, Class<T>
clz, String version){
+        BsonDocument doc = new BsonDocument();
+        doc.append("version", new BsonString(version));
+
+        List<T> result = new LinkedList<T>();
+        collection.find(doc).map(new Function<Document, T>() {
+            @Override
+            public T apply(Document t) {
+                String json = t.toJson();
+                try {
+                    return mapper.readValue(json, clz);
+                } catch (IOException e) {
+                    LOG.error("deserialize config item failed!", e);
+                }
+                return null;
+            }
+        }).into(result);
+        return result;
+    }
+
+    /***
+     * write ScheduleState to several collections. basic info writes to ScheduleState, other
writes to collections
+     * named by spoutSpecs/alertSpecs/etc.
+     *
+     * @param state
+     * @return
+     */
+    @Override
+    public OpResult addScheduleState(ScheduleState state) {
+        OpResult result = new OpResult();
+        try {
+            for (String key: state.getSpoutSpecs().keySet()){
+                SpoutSpec spoutSpec = state.getSpoutSpecs().get(key);
+                addOne(spoutSpecs, spoutSpec);
+            }
+
+            for (String key: state.getAlertSpecs().keySet()){
+                AlertBoltSpec alertBoltSpec = state.getAlertSpecs().get(key);
+                addOne(alertSpecs, alertBoltSpec);
+            }
+
+            for (String key: state.getGroupSpecs().keySet()){
+                RouterSpec groupSpec = state.getGroupSpecs().get(key);
+                addOne(groupSpecs, groupSpec);
+            }
+
+            for (String key: state.getPublishSpecs().keySet()){
+                PublishSpec publishSpec = state.getPublishSpecs().get(key);
+                addOne(publishSpecs, publishSpec);
+            }
+
+            for (VersionedPolicyDefinition policySnapshot: state.getPolicySnapshots()){
+                addOne(policySnapshots, policySnapshot);
+            }
+
+            for (VersionedStreamDefinition streamSnapshot: state.getStreamSnapshots()){
+                addOne(streamSnapshots, streamSnapshot);
+            }
+
+            for (MonitoredStream monitoredStream: state.getMonitoredStreams()){
+                addOne(monitoredStreams, monitoredStream);
+            }
+
+            for (PolicyAssignment assignment: state.getAssignments()){
+                addOne(assignments, assignment);
+            }
+
+            ScheduleStateBase stateBase = new ScheduleStateBase(
+                    state.getVersion(), state.getGenerateTime(), state.getCode(),
+                    state.getMessage(), state.getScheduleTimeMillis());
+
+            addOne(scheduleStates, stateBase);
+
+            result.code = 200;
+            result.message = "add document to collection schedule_specs succeed";
+        } catch (Exception e) {
+            result.code = 400;
+            result.message = e.getMessage();
+            LOG.error("", e);
+        }
+        return result;
+    }
+
     @Override
     public List<PolicyAssignment> listAssignments() {
         return list(assignments, PolicyAssignment.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/678437aa/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
index 5b38776..a48ee7d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
@@ -16,17 +16,21 @@
  */
 package org.apache.eagle.service.alert.resource.impl;
 
-import java.util.Date;
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.runtime.Network;
+import org.apache.eagle.alert.coordination.model.*;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl;
 import org.apache.eagle.alert.metadata.resource.OpResult;
@@ -37,16 +41,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import de.flapdoodle.embed.mongo.MongodExecutable;
-import de.flapdoodle.embed.mongo.MongodProcess;
-import de.flapdoodle.embed.mongo.MongodStarter;
-import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.process.runtime.Network;
+import java.util.*;
 
 /**
  * @since May 1, 2016
@@ -206,4 +201,92 @@ public class MongoImplTest {
         System.out.println(state.getVersion());
         System.out.println(state.getGenerateTime());
     }
+
+    private void test_addCompleteScheduleState() {
+        Long timestamp = System.currentTimeMillis();
+        String version = "state-" + timestamp;
+
+        // SpoutSpec
+        Map<String, SpoutSpec> spoutSpecsMap = new HashMap<>();
+        SpoutSpec spoutSpec1 = new SpoutSpec();
+        String topologyId1 = "testUnitTopology1_" + timestamp;
+        spoutSpec1.setTopologyId(topologyId1);
+        spoutSpecsMap.put(topologyId1, spoutSpec1);
+
+        SpoutSpec spoutSpec2 = new SpoutSpec();
+        String topologyId2 = "testUnitTopology2_" + timestamp;
+        spoutSpec2.setTopologyId(topologyId2);
+        spoutSpecsMap.put(topologyId2, spoutSpec2);
+
+        // Alert Spec
+        Map<String, AlertBoltSpec> alertSpecsMap = new HashMap<>();
+        alertSpecsMap.put(topologyId1, new AlertBoltSpec(topologyId1));
+
+        // GroupSpec
+        Map<String, RouterSpec> groupSpecsMap = new HashMap<>();
+        groupSpecsMap.put(topologyId1, new RouterSpec(topologyId1));
+
+        // PublishSpec
+        Map<String, PublishSpec> pubMap = new HashMap<>();
+        pubMap.put(topologyId1, new PublishSpec(topologyId1, "testPublishBolt"));
+
+        // Policy Snapshots
+        Collection<PolicyDefinition> policySnapshots = new ArrayList<>();
+        PolicyDefinition policy = new PolicyDefinition();
+        policy.setName("testPolicyDefinition");
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
+        def.setType("absencealert");
+        policy.setDefinition(def);
+        policySnapshots.add(policy);
+
+        // Stream Snapshots
+        Collection<StreamDefinition> streams = new ArrayList<>();
+        StreamDefinition stream = new StreamDefinition();
+        stream.setStreamId("testStream");
+        streams.add(stream);
+
+        // Monitored Streams
+        Collection<MonitoredStream> monitoredStreams = new ArrayList<>();
+        StreamPartition partition = new StreamPartition();
+        partition.setType(StreamPartition.Type.GLOBAL);
+        partition.setStreamId("s1");
+        partition.setColumns(Arrays.asList("f1", "f2"));
+        StreamGroup sg = new StreamGroup();
+        sg.addStreamPartition(partition);
+        MonitoredStream monitoredStream = new MonitoredStream(sg);
+        monitoredStreams.add(monitoredStream);
+
+        // Assignments
+        Collection<PolicyAssignment> assignments = new ArrayList<>();
+        assignments.add(new PolicyAssignment("syslog_regex", "SG[syslog_stream-]"+timestamp));
+
+        ScheduleState state = new ScheduleState(version, spoutSpecsMap, groupSpecsMap, alertSpecsMap,
pubMap,
+                assignments, monitoredStreams, policySnapshots, streams);
+
+        OpResult result = dao.addScheduleState(state);
+        Assert.assertEquals(200, result.code);
+    }
+
+    @Test
+    public void test_readCompleteScheduleState() {
+        test_addCompleteScheduleState();
+
+        ScheduleState state = dao.getScheduleState();
+        Assert.assertNotNull(state);
+        Assert.assertEquals(2, state.getSpoutSpecs().size());
+        Assert.assertEquals(1, state.getAlertSpecs().size());
+        Assert.assertEquals(1, state.getGroupSpecs().size());
+        Assert.assertEquals(1, state.getPublishSpecs().size());
+        Assert.assertEquals(1, state.getPolicySnapshots().size());
+        Assert.assertEquals(1, state.getStreamSnapshots().size());
+        Assert.assertEquals(1, state.getMonitoredStreams().size());
+        Assert.assertEquals(1, state.getAssignments().size());
+
+
+        System.out.println(state.getVersion());
+        System.out.println(state.getGenerateTime());
+
+
+    }
 }


Mime
View raw message