Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6E82200B91 for ; Thu, 29 Sep 2016 19:29:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D5930160AE3; Thu, 29 Sep 2016 17:29:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CAAF1160AC1 for ; Thu, 29 Sep 2016 19:29:31 +0200 (CEST) Received: (qmail 96719 invoked by uid 500); 29 Sep 2016 17:29:31 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 96710 invoked by uid 99); 29 Sep 2016 17:29:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Sep 2016 17:29:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9E0291A0C24 for ; Thu, 29 Sep 2016 17:29:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id FD1Imp249YKK for ; Thu, 29 Sep 2016 17:29:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 7117B61231 for ; Thu, 29 Sep 2016 17:29:26 +0000 (UTC) Received: (qmail 96641 invoked by uid 99); 29 Sep 2016 17:29:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Sep 2016 17:29:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D931E04BA; Thu, 29 Sep 2016 17:29:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ralphsu@apache.org To: commits@eagle.incubator.apache.org Message-Id: <0ad5c0ca294947158c93e56e671a11de@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-580] : fix spoutSpecs load met invalid FieldName(Dot) exception Date: Thu, 29 Sep 2016 17:29:25 +0000 (UTC) archived-at: Thu, 29 Sep 2016 17:29:33 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master f1cd71e90 -> f6fad2ebe [EAGLE-580] : fix spoutSpecs load met invalid FieldName(Dot) exception Author: Zeng, Bryant Reviewer: ralphsu This closes #467 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f6fad2eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f6fad2eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f6fad2eb Branch: refs/heads/master Commit: f6fad2ebe8a384a6d17e40c609041ff77f6a692c Parents: f1cd71e Author: mizeng Authored: Thu Sep 29 20:38:22 2016 +0800 Committer: Ralph, Su Committed: Thu Sep 29 10:29:45 2016 -0700 ---------------------------------------------------------------------- .../metadata/impl/MongoMetadataDaoImpl.java | 89 ++++++++++++++++---- .../alert/resource/impl/MongoImplTest.java | 20 +++++ 2 files changed, 94 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f6fad2eb/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java index 2f03bf9..aaf059b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java @@ -48,10 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @since Apr 11, 2016. @@ -98,7 +95,7 @@ public class MongoMetadataDaoImpl implements IMetadataDao { private void init() { db = client.getDatabase(DB_NAME); - IndexOptions io = new IndexOptions().background(true).unique(true).name("nameIndex"); + IndexOptions io = new IndexOptions().background(true).name("nameIndex"); BsonDocument doc = new BsonDocument(); doc.append("name", new BsonInt32(1)); cluster = db.getCollection("clusters"); @@ -128,14 +125,14 @@ public class MongoMetadataDaoImpl implements IMetadataDao { // below is for schedule_specs and its splitted collections BsonDocument doc1 = new BsonDocument(); - IndexOptions io1 = new IndexOptions().background(true).unique(true).name("versionIndex"); + IndexOptions io1 = new IndexOptions().background(true).name("versionIndex"); doc1.append("version", new BsonInt32(1)); scheduleStates = db.getCollection("schedule_specs"); scheduleStates.createIndex(doc1, io1); spoutSpecs = db.getCollection("spoutSpecs"); { - IndexOptions ioInternal = new IndexOptions().background(true).unique(true).name("topologyIdIndex"); + IndexOptions ioInternal = new IndexOptions().background(true).name("topologyIdIndex"); BsonDocument docInternal = new BsonDocument(); docInternal.append("topologyId", new BsonInt32(1)); spoutSpecs.createIndex(docInternal, ioInternal); @@ -143,7 +140,7 @@ public class MongoMetadataDaoImpl implements IMetadataDao { alertSpecs = db.getCollection("alertSpecs"); { - IndexOptions ioInternal = new IndexOptions().background(true).unique(true).name("topologyNameIndex"); + IndexOptions ioInternal = new IndexOptions().background(true).name("topologyNameIndex"); BsonDocument docInternal = new BsonDocument(); docInternal.append("topologyName", new BsonInt32(1)); alertSpecs.createIndex(docInternal, ioInternal); @@ -304,15 +301,57 @@ public class MongoMetadataDaoImpl implements IMetadataDao { private OpResult addOne(MongoCollection collection, T t) { OpResult result = new OpResult(); + String json = ""; try { - String json = mapper.writeValueAsString(t); + json = mapper.writeValueAsString(t); collection.insertOne(Document.parse(json)); result.code = 200; - result.message = String.format("add one document to collection %s succeed!", collection.getNamespace()); + result.message = String.format("add one document [%s] to collection [%s] succeed!", json, collection.getNamespace()); + LOG.info(result.message); } catch (Exception e) { result.code = 400; result.message = e.getMessage(); - LOG.error("", e); + LOG.error(String.format("Add one document [%s] to collection [%s] failed!", json, collection.getNamespace()), e); + } + return result; + } + + /** + * Due to some field name in SpoutSpec contains dot(.) which is invalid Mongo Field name, we need to transform the + * format to store in Mongo. + * @return opresult + */ + private OpResult addOneSpoutSpec(T t) { + OpResult result = new OpResult(); + String json = ""; + try { + json = mapper.writeValueAsString(t); + Document doc = Document.parse(json); + + String [] metadataMapArrays = {"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", "streamRepartitionMetadataMap"}; + for (String metadataMapName: metadataMapArrays) { + Document _metadataMapDoc = (Document) doc.get(metadataMapName); + doc.remove(metadataMapName); + + ArrayList _metadataMapArray = new ArrayList<>(); + + for ( String key : _metadataMapDoc.keySet()) { + Document _subDoc = new Document(); + _subDoc.put("topicName", key); + _subDoc.put(metadataMapName, _metadataMapDoc.get(key)); + _metadataMapArray.add(_subDoc); + } + doc.append(metadataMapName, _metadataMapArray); + } + + spoutSpecs.insertOne(doc); + result.code = 200; + result.message = String.format("add one document [%s] to collection [%s] succeed!", doc.toJson(), spoutSpecs.getNamespace()); + LOG.info(result.message); + } catch (Exception e) { + result.code = 400; + result.message = e.getMessage(); + LOG.error(String.format("Add one document [%s] to collection [%s] failed!", json, spoutSpecs.getNamespace()), e); } return result; } @@ -427,7 +466,26 @@ public class MongoMetadataDaoImpl implements IMetadataDao { public void apply(Document document) { String json = document.toJson(); try { - maps.put(document.getString(mapKey), mapper.readValue(json, clz)); + //Due to some field name in SpoutSpec contains dot(.) which is invalid Mongo Field name, + // we need to transform the format while reading from Mongo. + if (clz == SpoutSpec.class) { + Document doc = Document.parse(json); + String [] metadataMapArrays = {"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", "streamRepartitionMetadataMap"}; + for (String metadataMapName: metadataMapArrays) { + ArrayList subDocs = (ArrayList) doc.get(metadataMapName); + doc.remove(metadataMapName); + + Document replaceDoc = new Document(); + for ( Document subDoc : subDocs) { + replaceDoc.put((String) subDoc.get("topicName"), subDoc.get(metadataMapName)); + } + doc.put(metadataMapName, replaceDoc); + } + + json = doc.toJson(); + } + T t = mapper.readValue(json, clz); + maps.put(document.getString(mapKey), t); } catch (IOException e) { LOG.error("deserialize config item failed!", e); } @@ -487,7 +545,7 @@ public class MongoMetadataDaoImpl implements IMetadataDao { try { for (String key : state.getSpoutSpecs().keySet()) { SpoutSpec spoutSpec = state.getSpoutSpecs().get(key); - addOne(spoutSpecs, spoutSpec); + addOneSpoutSpec(spoutSpec); } for (String key : state.getAlertSpecs().keySet()) { @@ -522,8 +580,8 @@ public class MongoMetadataDaoImpl implements IMetadataDao { } ScheduleStateBase stateBase = new ScheduleStateBase( - state.getVersion(), state.getGenerateTime(), state.getCode(), - state.getMessage(), state.getScheduleTimeMillis()); + state.getVersion(), state.getGenerateTime(), state.getCode(), + state.getMessage(), state.getScheduleTimeMillis()); addOne(scheduleStates, stateBase); @@ -581,4 +639,5 @@ public class MongoMetadataDaoImpl implements IMetadataDao { public void close() throws IOException { client.close(); } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f6fad2eb/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java index 63871c3..06f5034 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java @@ -223,11 +223,31 @@ public class MongoImplTest { SpoutSpec spoutSpec1 = new SpoutSpec(); String topologyId1 = "testUnitTopology1_" + timestamp; spoutSpec1.setTopologyId(topologyId1); + + Map kafka2TupleMetadataMap = new HashMap<>(); + Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata(); + kafka2TupleMetadata.setType("KAFKA"); + kafka2TupleMetadataMap.put("preprocess.network-sherlock.events", kafka2TupleMetadata); + spoutSpec1.setKafka2TupleMetadataMap(kafka2TupleMetadataMap); + + Map> streamRepartitionMetadataMap= new HashMap<>(); + List StreamRepartitionMetadataList = new ArrayList<>(); + StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata(); + List groupingStrategies = new ArrayList(); + StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy(); + streamRepartitionStrategy.setStartSequence(4); + groupingStrategies.add(streamRepartitionStrategy); + streamRepartitionMetadata.setGroupingStrategies(groupingStrategies); + StreamRepartitionMetadataList.add(streamRepartitionMetadata); + streamRepartitionMetadataMap.put("preprocess.network-nervecenter.events", StreamRepartitionMetadataList); + spoutSpec1.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap); spoutSpecsMap.put(topologyId1, spoutSpec1); SpoutSpec spoutSpec2 = new SpoutSpec(); String topologyId2 = "testUnitTopology2_" + timestamp; spoutSpec2.setTopologyId(topologyId2); + spoutSpec2.setKafka2TupleMetadataMap(kafka2TupleMetadataMap); + spoutSpec2.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap); spoutSpecsMap.put(topologyId2, spoutSpec2); // Alert Spec