eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjt...@apache.org
Subject eagle git commit: [EAGLE-872] Transform counter metric to rate metric
Date Tue, 21 Mar 2017 02:52:37 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 0cda01b58 -> 673a81e44


[EAGLE-872] Transform counter metric to rate metric

- Add transform bolt using counterToRateFunction in HadoopMetricMonitorApp's storm topology.

https://issues.apache.org/jira/browse/EAGLE-872

Author: r7raul1984 <tangjijun@yhd.com>

Closes #783 from r7raul1984/EAGLE-872.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/673a81e4
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/673a81e4
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/673a81e4

Branch: refs/heads/master
Commit: 673a81e440914ca292b14b26ec2eb8b757a01e8f
Parents: 0cda01b
Author: r7raul1984 <tangjijun@yhd.com>
Authored: Tue Mar 21 02:52:14 2017 +0000
Committer: r7raul1984 <tangjijun@yhd.com>
Committed: Tue Mar 21 02:52:14 2017 +0000

----------------------------------------------------------------------
 .../environment/builder/CountMetricFilter.java  |  26 ++
 .../builder/CounterToRateFunction.java          | 220 +++++++++++++
 .../java/org/apache/eagle/app/utils/Clock.java  |  24 ++
 .../apache/eagle/app/utils/ClockWithOffset.java |  35 +++
 .../org/apache/eagle/app/utils/ManualClock.java |  54 ++++
 .../builder/CounterToRateFunctionTest.java      | 306 +++++++++++++++++++
 .../app/messaging/MetricStreamPersistTest.java  | 144 +++++++++
 .../mr/running/MRRunningJobApplicationTest.java |   2 -
 .../jpm/mr/running/MRRunningJobManagerTest.java |   3 -
 .../parser/MRJobEntityCreationHandlerTest.java  |   2 -
 .../eagle/metric/HadoopMetricMonitorApp.java    |  56 ++--
 11 files changed, 840 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java
new file mode 100644
index 0000000..bed047b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.environment.builder;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+@FunctionalInterface
+interface CountMetricFilter extends Function<String, Boolean>, Serializable {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
new file mode 100644
index 0000000..51dad41
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.environment.builder;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.app.utils.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CounterToRateFunction implements TransformFunction {
+    private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class);
+    private final Map<String, CounterValue> cache;
+    private MetricDescriptor metricDescriptor;
+    private Collector collector;
+
+    public CounterToRateFunction(MetricDescriptor metricDescriptor, long heartbeat, TimeUnit
unit, final Clock clock) {
+        final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit);
+        this.cache = new LinkedHashMap<String, CounterValue>(16, 0.75f, true) {
+            protected boolean removeEldestEntry(Map.Entry<String, CounterValue> eldest)
{
+                final long now = clock.now();
+                final long lastMod = eldest.getValue().getTimestamp();
+                final boolean expired = (now - lastMod) > heartbeatMillis;
+                if (expired) {
+                    LOG.debug("heartbeat interval exceeded, expiring {}", eldest.getKey());
+                }
+                return expired;
+            }
+        };
+        this.metricDescriptor = metricDescriptor;
+    }
+
+    @Override
+    public String getName() {
+        return "CounterToRate";
+    }
+
+    @Override
+    public void open(Collector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void transform(Map event) {
+        Metric metric = toMetric(event);
+        LOG.debug("received {} metrics", metric);
+        if (new DefaultCountMetricFilter().apply(metric.getMetricName())) {
+            final String metricName = metric.getMetricName();
+            final CounterValue prev = cache.get(metricName);
+            if (prev != null) {
+                final double rate = prev.computeRate(metric);
+                event.put(metricDescriptor.getValueField(), rate);
+                collector.collect(event.toString(), event);
+            } else {
+                CounterValue current = new CounterValue(metric);
+                cache.put(metricName, current);
+            }
+        } else {
+            collector.collect(event.toString(), event);
+        }
+
+    }
+
+    @Override
+    public void close() {
+        cache.clear();
+    }
+
+    private Metric toMetric(Map event) {
+
+        String metricName = "";
+        for (String dimensionField : metricDescriptor.getDimensionFields()) {
+            metricName += event.get(dimensionField) + "-";
+        }
+        metricName += metricDescriptor.getMetricNameSelector().getMetricName(event);
+
+        long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event);
+
+        return new Metric(metricName, timestamp, getCurrentValue(event));
+    }
+
+    private double getCurrentValue(Map event) {
+        double[] values;
+        if (event.containsKey(metricDescriptor.getValueField())) {
+            values = new double[] {(double) event.get(metricDescriptor.getValueField())};
+        } else {
+            LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(),
event);
+            values = new double[] {0};
+        }
+        return values[0];
+    }
+
+    protected static class CounterValue {
+        private long timestamp;
+        private double value;
+
+        public CounterValue(long timestamp, double value) {
+            this.timestamp = timestamp;
+            this.value = value;
+        }
+
+        public CounterValue(Metric m) {
+            this(m.getTimestamp(), m.getNumberValue().doubleValue());
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public double computeRate(Metric m) {
+            final long currentTimestamp = m.getTimestamp();
+            final double currentValue = m.getNumberValue().doubleValue();
+
+            final long durationMillis = currentTimestamp - timestamp;
+            final double delta = currentValue - value;
+
+            timestamp = currentTimestamp;
+            value = currentValue;
+
+            return computeRate(durationMillis, delta);
+        }
+
+        private double computeRate(long durationMillis, double delta) {
+            final double millisPerSecond = 1000.0;
+            final double duration = durationMillis / millisPerSecond;
+            return (duration <= 0.0 || delta <= 0.0) ? 0.0 : delta / duration;
+        }
+
+        @Override
+        public String toString() {
+            return "CounterValue{" + "timestamp=" + timestamp + ", value=" + value + '}';
+        }
+    }
+
+
+    protected final class Metric {
+        private final String metricName;
+        private final long timestamp;
+        private final Object value;
+
+        public Metric(String metricName, long timestamp, Object value) {
+            this.metricName = Preconditions.checkNotNull(metricName, "metricName");
+            this.timestamp = timestamp;
+            this.value = Preconditions.checkNotNull(value, "value");
+        }
+
+        public String getMetricName() {
+            return metricName;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public Object getValue() {
+            return value;
+        }
+
+        public Number getNumberValue() {
+            return (Number) value;
+        }
+
+        public boolean hasNumberValue() {
+            return (value instanceof Number);
+        }
+
+        public boolean isCounter() {
+            return metricName.endsWith("count");
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null || !(obj instanceof Metric)) {
+                return false;
+            }
+            Metric m = (Metric) obj;
+            return metricName.equals(m.getMetricName())
+                && timestamp == m.getTimestamp()
+                && value.equals(m.getValue());
+        }
+
+        @Override
+        public int hashCode() {
+            int result = metricName.hashCode();
+            result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+            result = 31 * result + value.hashCode();
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value="
+ value + '}';
+        }
+    }
+
+    private class DefaultCountMetricFilter implements CountMetricFilter {
+        @Override
+        public Boolean apply(String metricName) {
+            return metricName.endsWith("count");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java
new file mode 100644
index 0000000..f3deff9
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.utils;
+
+public interface Clock {
+    Clock WALL = System::currentTimeMillis;
+
+    long now();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java
new file mode 100644
index 0000000..62b060f
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.utils;
+
+public enum ClockWithOffset implements Clock {
+    INSTANCE;
+
+    private volatile long offset = 0L;
+
+    public void setOffset(long offset) {
+        if (offset >= 0) {
+            this.offset = offset;
+        }
+    }
+
+    @Override
+    public long now() {
+        return offset + System.currentTimeMillis();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java
new file mode 100644
index 0000000..cd8fc80
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ManualClock implements Clock {
+
+    private final AtomicLong time;
+
+    public ManualClock(long init) {
+        time = new AtomicLong(init);
+    }
+
+    public void set(long t) {
+        time.set(t);
+    }
+
+    public long now() {
+        return time.get();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ManualClock clock = (ManualClock) o;
+        return now() == clock.now();
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.valueOf(now()).hashCode();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java
new file mode 100644
index 0000000..6c00880
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.environment.builder;
+
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.app.utils.ClockWithOffset;
+import org.apache.eagle.app.utils.ManualClock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+public class CounterToRateFunctionTest {
+
+
+    private Map mkCountTypeEvent(long ts, double value) {
+        Map event = new HashMap();
+        event.put("timestamp", ts);
+        event.put("metric", "hadoop.hbase.regionserver.server.totalrequestcount");
+        event.put("component", "hbasemaster");
+        event.put("site", "sandbox");
+        event.put("value", value);
+        event.put("host", "xxx-xxx.int.xxx.com");
+        return event;
+    }
+
+    private Map mkCountTypeEventWithMetricName(long ts, double value, String metric) {
+        Map event = new HashMap();
+        event.put("timestamp", ts);
+        event.put("metric", metric);
+        event.put("component", "hbasemaster");
+        event.put("site", "sandbox");
+        event.put("value", value);
+        event.put("host", "xxx-xxx.int.xxx.com");
+        return event;
+    }
+
+    private Map mkOtherTypeEvent(long ts, double value) {
+        Map event = new HashMap();
+        event.put("timestamp", ts);
+        event.put("metric", "hadoop.memory.heapmemoryusage.used");
+        event.put("component", "hbasemaster");
+        event.put("site", "sandbox");
+        event.put("value", value);
+        event.put("host", "xxx-xxx.int.xxx.com");
+        return event;
+    }
+
+
+    @Test
+    public void testToMetricAndCounterValue() throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+        long baseTime = System.currentTimeMillis() + 100000L;
+
+        MetricDescriptor metricDefinition = MetricDescriptor
+            .metricGroupByField("group")
+            .siteAs("siteId")
+            .namedByField("metric")
+            .eventTimeByField("timestamp")
+            .dimensionFields("host", "component", "site")
+            .granularity(Calendar.MINUTE)
+            .valueField("value");
+        CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition,
3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE);
+
+        Map event = mkCountTypeEvent((baseTime + 0), 374042741.0);
+        Method toMetricMethod = counterToRateFunction.getClass().getDeclaredMethod("toMetric",
Map.class);
+        toMetricMethod.setAccessible(true);
+        CounterToRateFunction.Metric metric = (CounterToRateFunction.Metric) toMetricMethod.invoke(counterToRateFunction,
event);
+        Assert.assertEquals("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount",
metric.getMetricName());
+        Assert.assertEquals(374042741.0, Double.valueOf(metric.getValue().toString()), 0.00001);
+        Assert.assertEquals(374042741.0, metric.getNumberValue().doubleValue(), 0.00001);
+        Assert.assertTrue(metric.isCounter());
+
+
+        event = mkOtherTypeEvent((baseTime + 0), 100);
+        metric = (CounterToRateFunction.Metric) toMetricMethod.invoke(counterToRateFunction,
event);
+        Assert.assertEquals("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.memory.heapmemoryusage.used",
metric.getMetricName());
+        Assert.assertEquals(100, Double.valueOf(metric.getValue().toString()), 0.00001);
+        Assert.assertEquals(100, metric.getNumberValue().doubleValue(), 0.00001);
+        Assert.assertTrue(!metric.isCounter());
+
+
+    }
+
+    @Test
+    public void testTransformToRate() throws NoSuchFieldException, IllegalAccessException
{
+        List<Map> result = new ArrayList<>();
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+                result.add((Map) tuple.get(1));
+                return null;
+            }
+
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+
+            }
+
+            @Override
+            public void ack(Tuple input) {
+
+            }
+
+            @Override
+            public void fail(Tuple input) {
+
+            }
+
+            @Override
+            public void reportError(Throwable error) {
+
+            }
+        });
+        MetricDescriptor metricDefinition = MetricDescriptor
+            .metricGroupByField("group")
+            .siteAs("siteId")
+            .namedByField("metric")
+            .eventTimeByField("timestamp")
+            .dimensionFields("host", "component", "site")
+            .granularity(Calendar.MINUTE)
+            .valueField("value");
+        CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition,
3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE);
+        counterToRateFunction.open(new StormOutputCollector(collector));
+        long baseTime = System.currentTimeMillis() + 100000L;
+        //put first count sample
+        Map event = mkCountTypeEvent((baseTime + 0), 374042741.0);
+        counterToRateFunction.transform(event);
+        Assert.assertTrue(result.isEmpty());
+
+        Field cacheField = counterToRateFunction.getClass().getDeclaredField("cache");
+        cacheField.setAccessible(true);
+        Map<String, CounterToRateFunction.CounterValue> cache = (Map<String, CounterToRateFunction.CounterValue>)
cacheField.get(counterToRateFunction);
+        Assert.assertTrue(cache.size() == 1);
+
+        CounterToRateFunction.CounterValue counterValue = cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount");
+        Assert.assertEquals((long) event.get("timestamp"), counterValue.getTimestamp());
+        Field valueField = counterValue.getClass().getDeclaredField("value");
+        valueField.setAccessible(true);
+        double value = (double) valueField.get(counterValue);
+        Assert.assertEquals(374042741.0, value, 0.00001);
+        result.clear();
+        //put not count sample
+        event = mkOtherTypeEvent((baseTime + 0), 100);
+        counterToRateFunction.transform(event);
+        Assert.assertTrue(result.size() == 1);
+        Assert.assertTrue(cache.size() == 1);
+        Assert.assertEquals(baseTime + 0, counterValue.getTimestamp());
+        Assert.assertEquals(374042741.0, value, 0.00001);
+
+        Assert.assertEquals("hadoop.memory.heapmemoryusage.used", event.get("metric"));
+        Assert.assertEquals(100, (Double) event.get("value"), 0.00001);
+        result.clear();
+
+        //delta of 10 in 5 seconds
+        event = mkCountTypeEvent((baseTime + 5000), 374042751.0);
+        counterToRateFunction.transform(event);
+
+        Assert.assertTrue(result.size() == 1);
+        Map transedEvent = result.get(0);
+        Assert.assertEquals(baseTime + 5000, transedEvent.get("timestamp"));
+        Assert.assertEquals(2.0, (double) transedEvent.get("value"), 0.00001);
+        Assert.assertEquals(baseTime + 5000, counterValue.getTimestamp());
+        value = (double) valueField.get(counterValue);
+        Assert.assertEquals(374042751.0, value, 0.00001);
+        result.clear();
+
+        //delta of 15 in 5 seconds
+        event = mkCountTypeEvent((baseTime + 10000), 374042766.0);
+        counterToRateFunction.transform(event);
+
+        Assert.assertTrue(result.size() == 1);
+        transedEvent = result.get(0);
+        Assert.assertEquals(baseTime + 10000, transedEvent.get("timestamp"));
+        Assert.assertEquals(3.0, (double) transedEvent.get("value"), 0.00001);
+        Assert.assertEquals(baseTime + 10000, counterValue.getTimestamp());
+        value = (double) valueField.get(counterValue);
+        Assert.assertEquals(374042766.0, value, 0.00001);
+        result.clear();
+
+
+        //No change from previous sample
+        event = mkCountTypeEvent((baseTime + 15000), 374042766.0);
+        counterToRateFunction.transform(event);
+
+        Assert.assertTrue(result.size() == 1);
+        transedEvent = result.get(0);
+        Assert.assertEquals(baseTime + 15000, transedEvent.get("timestamp"));
+        Assert.assertEquals(0.0, (double) transedEvent.get("value"), 0.00001);
+        Assert.assertEquals(baseTime + 15000, counterValue.getTimestamp());
+        value = (double) valueField.get(counterValue);
+        Assert.assertEquals(374042766.0, value, 0.00001);
+        result.clear();
+
+        //Decrease from previous sample
+        event = mkCountTypeEvent((baseTime + 20000), 1.0);
+        counterToRateFunction.transform(event);
+
+        Assert.assertTrue(result.size() == 1);
+        transedEvent = result.get(0);
+        Assert.assertEquals(baseTime + 20000, transedEvent.get("timestamp"));
+        Assert.assertEquals(0.0, (double) transedEvent.get("value"), 0.00001);
+        Assert.assertEquals(baseTime + 20000, counterValue.getTimestamp());
+        value = (double) valueField.get(counterValue);
+        Assert.assertEquals(1.0, value, 0.00001);
+        result.clear();
+    }
+
+    @Test
+    public void testTransformToRateWithExpiration() throws NoSuchFieldException, IllegalAccessException
{
+
+        MetricDescriptor metricDefinition = MetricDescriptor
+            .metricGroupByField("group")
+            .siteAs("siteId")
+            .namedByField("metric")
+            .eventTimeByField("timestamp")
+            .dimensionFields("host", "component", "site")
+            .granularity(Calendar.MINUTE)
+            .valueField("value");
+        List<Map> result = new ArrayList<>();
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+                result.add((Map) tuple.get(1));
+                return null;
+            }
+
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+
+            }
+
+            @Override
+            public void ack(Tuple input) {
+
+            }
+
+            @Override
+            public void fail(Tuple input) {
+
+            }
+
+            @Override
+            public void reportError(Throwable error) {
+
+            }
+        });
+        ManualClock manualClock = new ManualClock(0);
+        manualClock.set(30000L);
+        CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition,
60, TimeUnit.SECONDS, manualClock);
+        counterToRateFunction.open(new StormOutputCollector(collector));
+        Map event = mkCountTypeEventWithMetricName(manualClock.now(), 110, "hadoop.hbase.regionserver.server.totalrequestcount");
+        counterToRateFunction.transform(event);
+        Field cacheField = counterToRateFunction.getClass().getDeclaredField("cache");
+        cacheField.setAccessible(true);
+        Map<String, CounterToRateFunction.CounterValue> cache = (Map<String, CounterToRateFunction.CounterValue>)
cacheField.get(counterToRateFunction);
+        Assert.assertTrue(cache.size() == 1);
+
+        manualClock.set(50000L);
+        event = mkCountTypeEventWithMetricName(manualClock.now(), 130, "hadoop.hbase.regionserver.server.readerrequestcount");
+        counterToRateFunction.transform(event);
+
+        cache = (Map<String, CounterToRateFunction.CounterValue>) cacheField.get(counterToRateFunction);
+        Assert.assertEquals(2, cache.size());
+        Assert.assertEquals("CounterValue{timestamp=30000, value=110.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount").toString());
+        Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString());
+
+        manualClock.set(100000L);
+        event = mkCountTypeEventWithMetricName(manualClock.now(), 120, "hadoop.hbase.regionserver.server.totalrequestcount");
+        counterToRateFunction.transform(event);
+
+        cache = (Map<String, CounterToRateFunction.CounterValue>) cacheField.get(counterToRateFunction);
+        Assert.assertEquals(2, cache.size());
+        Assert.assertEquals("CounterValue{timestamp=100000, value=120.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount").toString());
+        Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString());
+
+        manualClock.set(160001L);
+        event = mkCountTypeEventWithMetricName(manualClock.now(), 10, "hadoop.hbase.regionserver.server.writerrequestcount");
+        counterToRateFunction.transform(event);
+        Assert.assertEquals(2, cache.size());
+        Assert.assertEquals("CounterValue{timestamp=160001, value=10.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.writerrequestcount").toString());
+        Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString());
+
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java
new file mode 100644
index 0000000..1561a41
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.Testing;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
+import org.apache.eagle.app.utils.StreamConvertHelper;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest( {MetricStreamPersist.class})
+public class MetricStreamPersistTest {
+
+    @Test
+    public void testStructuredMetricMapper() throws Exception {
+        MetricDescriptor metricDefinition = MetricDescriptor
+            .metricGroupByField("group")
+            .siteAs("siteId")
+            .namedByField("metric")
+            .eventTimeByField("timestamp")
+            .dimensionFields("host", "component", "site")
+            .granularity(Calendar.MINUTE)
+            .valueField("value");
+        Config config = mock(Config.class);
+        MetricStreamPersist metricStreamPersist = new MetricStreamPersist(metricDefinition,
config);
+        Field mapperField = metricStreamPersist.getClass().getDeclaredField("mapper");
+        mapperField.setAccessible(true);
+
+        Map event = new HashMap();
+        event.put("timestamp", 1482106479564L);
+        event.put("metric", "hadoop.memory.heapmemoryusage.used");
+        event.put("component", "hbasemaster");
+        event.put("site", "sandbox");
+        event.put("value", 14460904.0);
+        event.put("host", "xxx-xxx.int.xxx.com");
+
+        Tuple tuple = Testing.testTuple(new Values("metric", event));
+        MetricStreamPersist.MetricMapper mapper = (MetricStreamPersist.MetricMapper) mapperField.get(metricStreamPersist);
+
+        GenericMetricEntity metricEntity = mapper.map(StreamConvertHelper.tupleToEvent(tuple).f1());
+
+        Assert.assertEquals("prefix:hadoop.memory.heapmemoryusage.used, timestamp:1482106440000,
humanReadableDate:2016-12-19 00:14:00,000, tags: component=hbasemaster,site=sandbox,host=xxx-xxx.int.xxx.com,,
encodedRowkey:null", metricEntity.toString());
+    }
+
+    @Test
+    public void testMetricStreamPersist() throws Exception {
+        List<String> result = new ArrayList<>();
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+                result.add(String.valueOf(tuple.get(0)));
+                return null;
+            }
+
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+
+            }
+
+            @Override
+            public void ack(Tuple input) {
+
+            }
+
+            @Override
+            public void fail(Tuple input) {
+
+            }
+
+            @Override
+            public void reportError(Throwable error) {
+
+            }
+        });
+
+        MetricDescriptor metricDefinition = MetricDescriptor
+            .metricGroupByField("group")
+            .siteAs("siteId")
+            .namedByField("metric")
+            .eventTimeByField("timestamp")
+            .dimensionFields("host", "component", "site")
+            .granularity(Calendar.MINUTE)
+            .valueField("value");
+        Config config = mock(Config.class);
+        when(config.hasPath("service.batchSize")).thenReturn(false);
+
+        GenericServiceAPIResponseEntity<String> response = mock(GenericServiceAPIResponseEntity.class);
+        when(response.isSuccess()).thenReturn(true);
+
+        EagleServiceClientImpl client = mock(EagleServiceClientImpl.class);
+        PowerMockito.whenNew(EagleServiceClientImpl.class).withArguments(config).thenReturn(client);
+        when(client.create(anyObject())).thenReturn(response);
+
+        MetricStreamPersist metricStreamPersist = new MetricStreamPersist(metricDefinition,
config);
+        metricStreamPersist.prepare(null, null, collector);
+        Map event = new HashMap();
+        event.put("timestamp", 1482106479564L);
+        event.put("metric", "hadoop.memory.heapmemoryusage.used");
+        event.put("component", "hbasemaster");
+        event.put("site", "sandbox");
+        event.put("value", 14460904.0);
+        event.put("host", "xxx-xxx.int.xxx.com");
+
+        Tuple tuple = Testing.testTuple(new Values("metric", event));
+        metricStreamPersist.execute(tuple);
+        Assert.assertTrue(result.size() == 1);
+        Assert.assertEquals("hadoop.memory.heapmemoryusage.used", result.get(0));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index a43c956..faf8c8e 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -65,12 +65,10 @@ public class MRRunningJobApplicationTest {
     private static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341',
user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING',
finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/',
diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477,
finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy',
amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6},
null]";
     private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
     private static Config config = ConfigFactory.load();
-    private static String siteId;
 
     @BeforeClass
     public static void setupMapper() throws Exception {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
-        siteId = config.getString("siteId");
     }
 
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
index 4c52e10..eb48ff6 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
@@ -24,7 +24,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
-import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
 import org.apache.zookeeper.CreateMode;
 import org.junit.*;
@@ -37,8 +36,6 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java
b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java
index 0ec1b8e..7ffd30f 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java
@@ -21,8 +21,6 @@ import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreatio
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.Utils;
-import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
-import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils;
 import org.apache.eagle.jpm.util.resourcefetch.model.*;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import com.fasterxml.jackson.core.JsonParser;

http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
index 05c874d..9c0ac10 100644
--- a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
+++ b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
@@ -19,41 +19,47 @@ package org.apache.eagle.metric;
 import backtype.storm.generated.StormTopology;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.builder.CounterToRateFunction;
 import org.apache.eagle.app.environment.builder.MetricDescriptor;
 import org.apache.eagle.app.environment.builder.MetricDescriptor.MetricGroupSelector;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.utils.AppConfigUtils;
+import org.apache.eagle.app.utils.ClockWithOffset;
 
 import java.util.Calendar;
+import java.util.concurrent.TimeUnit;
 
 public class HadoopMetricMonitorApp extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
+
+        MetricDescriptor hadoopMetricDescriptor = MetricDescriptor.metricGroupAs((MetricGroupSelector)
event -> {
+            if (event.containsKey("component")) {
+                return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase());
+            } else {
+                return "hadoop.metrics";
+            }
+        })
+            .siteAs(AppConfigUtils.getSiteId(config))
+            .namedByField("metric")
+            .eventTimeByField("timestamp")
+            .dimensionFields("host", "component", "site")
+            .granularity(Calendar.SECOND)
+            .valueField("value");
+
+        MetricDescriptor systemMetricDescriptor = MetricDescriptor.metricGroupByField("group")
+            .siteAs(AppConfigUtils.getSiteId(config))
+            .namedByField("metric")
+            .eventTimeByField("timestamp")
+            .dimensionFields("host", "group", "site", "device")
+            .granularity(Calendar.SECOND)
+            .valueField("value");
         return environment.newApp(config)
-                .fromStream("HADOOP_JMX_METRIC_STREAM")
-                .saveAsMetric(
-                        MetricDescriptor.metricGroupAs((MetricGroupSelector) event ->
{
-                            if (event.containsKey("component")) {
-                                return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase());
-                            } else {
-                                return "hadoop.metrics";
-                            }
-                        })
-                        .siteAs(AppConfigUtils.getSiteId(config))
-                        .namedByField("metric")
-                        .eventTimeByField("timestamp")
-                        .dimensionFields("host", "component", "site")
-                        .granularity(Calendar.SECOND)
-                        .valueField("value"))
-                .fromStream("SYSTEM_METRIC_STREAM")
-                .saveAsMetric(MetricDescriptor.metricGroupByField("group")
-                        .siteAs(AppConfigUtils.getSiteId(config))
-                        .namedByField("metric")
-                        .eventTimeByField("timestamp")
-                        .dimensionFields("host", "group", "site", "device")
-                        .granularity(Calendar.SECOND)
-                        .valueField("value")
-                )
-                .toTopology();
+            .fromStream("HADOOP_JMX_METRIC_STREAM").transformBy(new CounterToRateFunction(hadoopMetricDescriptor,3,
TimeUnit.SECONDS, ClockWithOffset.INSTANCE))
+            .saveAsMetric(hadoopMetricDescriptor)
+            .fromStream("SYSTEM_METRIC_STREAM").transformBy(new CounterToRateFunction(hadoopMetricDescriptor,3,
TimeUnit.SECONDS, ClockWithOffset.INSTANCE))
+            .saveAsMetric(systemMetricDescriptor
+            )
+            .toTopology();
     }
 }
\ No newline at end of file


Mime
View raw message