eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [4/5] incubator-eagle git commit: modify some interfaces and constructors
Date Thu, 03 Dec 2015 03:29:02 GMT
modify some interfaces and constructors


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b11a223a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b11a223a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b11a223a

Branch: refs/heads/master
Commit: b11a223a452b6a11d4a111175ca26948663d17ef
Parents: 58a9555
Author: sunlibin <abnersunlibin@gmail.com>
Authored: Mon Nov 30 18:09:06 2015 +0800
Committer: sunlibin <abnersunlibin@gmail.com>
Committed: Wed Dec 2 14:54:29 2015 +0800

----------------------------------------------------------------------
 .../apache/eagle/partition/DataDistributionDao.java   |  2 +-
 .../apache/eagle/partition/PartitionStrategyImpl.java | 14 ++++++++++----
 .../eagle/metric/kafka/EagleMetricCollectorMain.java  |  2 +-
 .../src/test/java/TestDataDistributionDaoImpl.java    |  3 ++-
 .../src/test/java/TestGreedyPartition.java            |  3 ++-
 .../security/partition/DataDistributionDaoImpl.java   |  4 +---
 .../security/auditlog/HdfsAuditLogProcessorMain.java  |  7 ++++++-
 .../src/main/resources/application.conf               |  4 +++-
 8 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
index 5c78f96..0b17775 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
@@ -24,5 +24,5 @@ import java.util.List;
 
 public interface DataDistributionDao extends Serializable {
 
-    List<Weight> fetchDataDistribution() throws Exception;
+    List<Weight> fetchDataDistribution(long startTime, long endTime) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
index 46696a6..eacefd5 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
@@ -19,8 +19,10 @@
 
 package org.apache.eagle.partition;
 
+import org.apache.commons.lang3.time.DateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.Map;
 
@@ -31,17 +33,20 @@ public class PartitionStrategyImpl implements PartitionStrategy {
     public Map<String, Integer> routingTable;
     public long lastRefreshTime;
     public long refreshInterval;
-    public static long DEFAULT_REFRESH_INTERVAL = 60 * 60 * 1000;
+    public long timeRange;
+    public static long DEFAULT_TIME_RANGE = 2 * DateUtils.MILLIS_PER_DAY;
+    public static long DEFAULT_REFRESH_INTERVAL = 2 * DateUtils.MILLIS_PER_HOUR;
     private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class);
 
-    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long
refreshInterval) {
+    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long
refreshInterval, long timeRange) {
         this.dao = dao;
         this.algorithm = algorithm;
         this.refreshInterval = refreshInterval;
+        this.timeRange = timeRange;
     }
 
     public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) {
-        this(dao, algorithm, DEFAULT_REFRESH_INTERVAL);
+        this(dao, algorithm, DEFAULT_REFRESH_INTERVAL, DEFAULT_TIME_RANGE);
     }
 
     public boolean needRefresh() {
@@ -54,7 +59,8 @@ public class PartitionStrategyImpl implements PartitionStrategy {
 
     public Map<String, Integer> generateRoutingTable(int buckNum) {
         try {
-            List<Weight> weights = dao.fetchDataDistribution();
+            long currentTime = System.currentTimeMillis();
+            List<Weight> weights = dao.fetchDataDistribution(currentTime - timeRange,
currentTime);
             routingTable = algorithm.partition(weights, buckNum);
             return routingTable;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
index 65fe68a..218b812 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
@@ -120,7 +120,7 @@ public class EagleMetricCollectorMain {
         };
 
         env.newSource(new KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker");
-        env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageDistributionCheck").groupBy(Arrays.asList(0))
+        env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageFetcher").groupBy(Arrays.asList(0))
                 .flatMap(new KafkaMessageDistributionExecutor());
         env.execute();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
index 1c54a90..4d82085 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
@@ -19,6 +19,7 @@
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.partition.DataDistributionDao;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
@@ -35,6 +36,6 @@ public class TestDataDistributionDaoImpl {
         String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE
+ "." + EagleConfigConstants.PASSWORD);
         String topic = config.getString("dataSourceConfig.topic");
         DataDistributionDao dao = new DataDistributionDaoImpl(eagleServiceHost, eagleServicePort,
username, password, topic);
-        dao.fetchDataDistribution();
+        dao.fetchDataDistribution(System.currentTimeMillis() - 2 * DateUtils.MILLIS_PER_DAY,
System.currentTimeMillis());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
index cfd873a..f3e1cf8 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
@@ -19,6 +19,7 @@
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.partition.DataDistributionDao;
 import org.apache.eagle.partition.PartitionAlgorithm;
@@ -39,6 +40,6 @@ public class TestGreedyPartition {
         String topic = config.getString("dataSourceConfig.topic");
         DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password,
topic);
         PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
-        algorithm.partition(dao.fetchDataDistribution(), 4);
+        algorithm.partition(dao.fetchDataDistribution(System.currentTimeMillis() - 2 * DateUtils.MILLIS_PER_DAY,
System.currentTimeMillis()), 4);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
index 631947c..e808502 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -55,7 +55,7 @@ public class DataDistributionDaoImpl implements DataDistributionDao {
     }
 
     @Override
-    public List<Weight> fetchDataDistribution() throws Exception {
+    public List<Weight> fetchDataDistribution(long startTime, long endTime) throws
Exception {
         IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort,
username, password) {
             @Override
             public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest
request) throws EagleServiceClientException {
@@ -75,8 +75,6 @@ public class DataDistributionDaoImpl implements DataDistributionDao {
         };
         try {
             String query = MetricConstants.GENERIC_METRIC_ENTITY_ENDPOINT + "[@topic=\""
+ topic + "\"]<@user>{sum(value)}.{sum(value) desc}";
-            long endTime = System.currentTimeMillis();
-            long startTime = endTime - 2 * DateUtils.MILLIS_PER_DAY;
             GenericServiceAPIResponseEntity<Map> response = client.search()
                     .startTime(startTime)
                     .endTime(endTime)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index 8e6d5d7..fda41d3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -21,6 +21,7 @@ package org.apache.eagle.security.auditlog;
 import backtype.storm.spout.SchemeAsMultiScheme;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
@@ -51,7 +52,11 @@ public class HdfsAuditLogProcessorMain {
         String topic = config.getString("dataSourceConfig.topic");
         DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password,
topic);
         PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
-        PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm);
+        String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
+        Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1)
: 60;
+        String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
+        Integer kafkaStatisticRangeInMin =  config.hasPath(key2) ? config.getInt(key2) :
60;
+        PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin
* DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
         return strategy;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index 3b678a3..43e5b58 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -50,7 +50,9 @@
     "mailHost" : "mailHost.com",
     "mailSmtpPort":"25",
     "mailDebug" : "true",
-    "balancePartitionEnabled" : "true",
+    "balancePartitionEnabled" : true,
+    #"partitionRefreshIntervalInMin" : 60,
+    #"kafkaStatisticRangeInMin" : 60,
     "eagleService": {
       "host": "localhost",
       "port": 38080,


Mime
View raw message