eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject eagle git commit: [EAGLE-869] Fix MetricStreamPersist bug: no tuple pass on when batchSize > 1
Date Mon, 16 Jan 2017 09:07:09 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 2160673c8 -> e38c19e13


[EAGLE-869] Fix MetricStreamPersist bug: no tuple pass on when batchSize > 1

https://issues.apache.org/jira/browse/EAGLE-869

Author: Hao Chen <hao@apache.org>

Closes #779 from haoch/FixMetricStreamPersist.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/e38c19e1
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/e38c19e1
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/e38c19e1

Branch: refs/heads/master
Commit: e38c19e135ec649b0816fa016e24800ba0b4a7d6
Parents: 2160673
Author: Hao Chen <hao@apache.org>
Authored: Mon Jan 16 17:07:03 2017 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Mon Jan 16 17:07:03 2017 +0800

----------------------------------------------------------------------
 .../app/messaging/MetricStreamPersist.java      | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/e38c19e1/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
index d656827..ba99911 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
@@ -41,7 +41,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-public class MetricStreamPersist extends BaseRichBolt  {
+public class MetricStreamPersist extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class);
     public static final String METRIC_NAME_FIELD = "metricName";
 
@@ -75,24 +75,26 @@ public class MetricStreamPersist extends BaseRichBolt  {
 
     @Override
     public void execute(Tuple input) {
+        GenericMetricEntity metricEntity = null;
         try {
-            GenericMetricEntity metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1());
+            metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1());
             if (batchSize <= 1) {
                 GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity));
                 if (!response.isSuccess()) {
                     LOG.error("Service side error: {}", response.getException());
                     collector.reportError(new IllegalStateException(response.getException()));
-                } else {
-                    collector.emit(Collections.singletonList(metricEntity.getPrefix()));
-                    collector.ack(input);
                 }
             } else {
                 this.batchSender.send(metricEntity);
-                collector.ack(input);
             }
         } catch (Exception ex) {
             LOG.error(ex.getMessage(), ex);
             collector.reportError(ex);
+        } finally {
+            if (metricEntity != null) {
+                collector.emit(Collections.singletonList(metricEntity.getPrefix()));
+            }
+            collector.ack(input);
         }
     }
 
@@ -127,12 +129,12 @@ public class MetricStreamPersist extends BaseRichBolt  {
         @Override
         public GenericMetricEntity map(Map event) {
             String metricName = metricDefinition.getNameSelector().getMetricName(event);
-            Preconditions.checkNotNull(metricName,"Metric name is null");
+            Preconditions.checkNotNull(metricName, "Metric name is null");
             Long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event);
             Preconditions.checkNotNull(timestamp, "Timestamp is null");
-            Map<String,String> tags = new HashMap<>();
-            for (String dimensionField: metricDefinition.getDimensionFields()) {
-                Preconditions.checkNotNull(dimensionField,"Dimension field name is null");
+            Map<String, String> tags = new HashMap<>();
+            for (String dimensionField : metricDefinition.getDimensionFields()) {
+                Preconditions.checkNotNull(dimensionField, "Dimension field name is null");
                 tags.put(dimensionField, (String) event.get(dimensionField));
             }
 
@@ -141,7 +143,7 @@ public class MetricStreamPersist extends BaseRichBolt  {
                 values = new double[] {(double) event.get(metricDefinition.getValueField())};
             } else {
                 LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(),
event);
-                values = new double[]{0};
+                values = new double[] {0};
             }
 
             GenericMetricEntity entity = new GenericMetricEntity();


Mime
View raw message