eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [16/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
Date Wed, 01 Jun 2016 06:00:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
new file mode 100644
index 0000000..3e4e1df
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.model;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+
+/**
+ * @since Apr 5, 2016
+ *
+ */
+public class StreamEvent implements Serializable {
+    private static final long serialVersionUID = 2765116509856609763L;
+
+    private String streamId;
+    private Object[] data;
+    private long timestamp;
+
+    public StreamEvent(){}
+
+    public StreamEvent(String streamId,long timestamp,Object[] data){
+        this.setStreamId(streamId);
+        this.setTimestamp(timestamp);
+        this.setData(data);
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public Object[] getData() {
+        return data;
+    }
+
+    public void setData(Object[] data) {
+        this.data = data;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(streamId).append(timestamp).append(data).build();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if(obj == this) return true;
+        if(obj instanceof StreamEvent){
+            StreamEvent another = (StreamEvent) obj;
+            return Objects.equals(this.streamId,another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data,another.data);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        List<String> dataStrings = new ArrayList<>();
+        if(this.getData() != null) {
+            for (Object obj : this.getData()) {
+                if (obj != null) {
+                    dataStrings.add(obj.toString());
+                } else {
+                    dataStrings.add(null);
+                }
+            }
+        }
+        return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s]]",this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","));
+    }
+
+    public static StreamEventBuilder Builder(){
+        return new StreamEventBuilder();
+    }
+
+    /**
+     * @return cloned new event object
+     */
+    public StreamEvent copy(){
+        StreamEvent newEvent = new StreamEvent();
+        newEvent.setTimestamp(this.getTimestamp());
+        newEvent.setData(this.getData());
+        newEvent.setStreamId(this.getStreamId());
+        return newEvent;
+    }
+
+    public void copyFrom(StreamEvent event){
+        this.setTimestamp(event.getTimestamp());
+        this.setData(event.getData());
+        this.setStreamId(event.getStreamId());
+    }
+
+    /**
+     * @param column
+     * @param streamDefinition
+     * @return
+     */
+    public Object[] getData(StreamDefinition streamDefinition,List<String> column) {
+        ArrayList<Object> result = new ArrayList<>(column.size());
+        for (String colName : column) {
+            result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
+        }
+        return result.toArray();
+    }
+
+    public Object[] getData(StreamDefinition streamDefinition,String ... column) {
+        ArrayList<Object> result = new ArrayList<>(column.length);
+        for (String colName : column) {
+            result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
+        }
+        return result.toArray();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
new file mode 100644
index 0000000..7e351bb
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
@@ -0,0 +1,72 @@
+package org.apache.eagle.alert.engine.model;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamEventBuilder{
+    private final static Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class);
+
+    private StreamEvent instance;
+    private StreamDefinition streamDefinition;
+    public StreamEventBuilder(){
+        instance = new StreamEvent();
+    }
+
+    public StreamEventBuilder schema(StreamDefinition streamDefinition){
+        this.streamDefinition = streamDefinition;
+        if(instance.getStreamId() == null) instance.setStreamId(streamDefinition.getStreamId());
+        return this;
+    }
+
+    public StreamEventBuilder streamId(String streamId){
+        instance.setStreamId(streamId);
+        return this;
+    }
+
+    public StreamEventBuilder attributes(Map<String,Object> data, StreamDefinition streamDefinition){
+        this.schema(streamDefinition);
+        List<StreamColumn> columnList = streamDefinition.getColumns();
+        if(columnList!=null && columnList.size() > 0){
+            List<Object> values = new ArrayList<>(columnList.size());
+            for (StreamColumn column : columnList) {
+                values.add(data.getOrDefault(column.getName(),column.getDefaultValue()));
+            }
+            instance.setData(values.toArray());
+        } else if(LOG.isDebugEnabled()){
+            LOG.warn("All data [{}] are ignored as no columns defined in schema {}",data,streamDefinition);
+        }
+        return this;
+    }
+
+    public StreamEventBuilder attributes(Map<String,Object> data){
+        return attributes(data,this.streamDefinition);
+    }
+
+    public StreamEventBuilder attributes(Object ... data){
+        instance.setData(data);
+        return this;
+    }
+
+    public StreamEventBuilder timestamep(long timestamp){
+        instance.setTimestamp(timestamp);
+        return this;
+    }
+
+    public StreamEvent build(){
+        if(instance.getStreamId() == null){
+            throw new IllegalArgumentException("streamId is null of event: " + instance);
+        }
+        return instance;
+    }
+
+    public StreamEventBuilder copyFrom(StreamEvent event) {
+        this.instance.copyFrom(event);
+        return this;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
new file mode 100644
index 0000000..06a99f4
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
@@ -0,0 +1,64 @@
+package org.apache.eagle.alert.metric;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.metric.sink.MetricSink;
+import org.apache.eagle.alert.metric.source.MetricSource;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface IMetricSystem {
+
+    /**
+     * Initialize
+     */
+    void start();
+
+    /**
+     * Schedule reporter
+     */
+    void schedule();
+
+    /**
+     * Close and stop all resources and services
+     */
+    void stop();
+
+    /**
+     * Manual report metric
+     */
+    void report();
+
+    /**
+     *
+     * @param sink metric sink
+     */
+    void register(MetricSink sink,Config config);
+
+    /**
+     *
+     * @param source metric source
+     */
+    void register(MetricSource source);
+
+    void tags(Map<String,Object> metricTags);
+
+    MetricRegistry registry();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
new file mode 100644
index 0000000..b91c606
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metric;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.metric.sink.MetricSink;
+import org.apache.eagle.alert.metric.sink.MetricSinkRepository;
+import org.apache.eagle.alert.metric.source.MetricSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class MetricSystem implements IMetricSystem {
+    private final Config config;
+    private Map<MetricSink,Config> sinks = new HashMap<>();
+//    private Map<String,MetricSource> sources = new HashMap<>();
+    private MetricRegistry registry = new MetricRegistry();
+    private boolean running;
+    private boolean initialized;
+    private final static Logger LOG = LoggerFactory.getLogger(MetricSystem.class);
+    private final Map<String, Object> metricTags = new HashMap<>();
+
+    public MetricSystem(Config config){
+        this.config = config;
+    }
+
+    public static MetricSystem load(Config config){
+        MetricSystem instance = new MetricSystem(config);
+        instance.loadFromConfig();
+        return instance;
+    }
+
+    @Override
+    public void tags(Map<String,Object> metricTags){
+        this.metricTags.putAll(metricTags);
+    }
+
+    @Override
+    public void start() {
+        if(initialized)
+            throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already intialized");
+        sinks.forEach((sink,conf) -> sink.prepare(conf.withValue("tags",ConfigFactory.parseMap(metricTags).root()),registry));
+        initialized = true;
+    }
+
+    @Override
+    public void schedule() {
+        if(running){
+           throw  new IllegalStateException("Attempting to start a MetricsSystem that is already running");
+        }
+
+        sinks.keySet().forEach((sink)->sink.start(5, TimeUnit.SECONDS));
+        running = true;
+    }
+
+    public void loadFromConfig(){
+        loadSinksFromConfig();
+    }
+
+    private void loadSinksFromConfig(){
+        Config sinkCls = config.hasPath("metric.sink") ? config.getConfig("metric.sink") : null;
+        if(sinkCls == null){
+            // do nothing
+        }else{
+            for(String sinkType:sinkCls.root().unwrapped().keySet()){
+                register(MetricSinkRepository.createSink(sinkType),config.getConfig("metric.sink."+sinkType));
+            }
+        }
+    }
+
+    @Override
+    public void stop() {
+        sinks.keySet().forEach(MetricSink::stop);
+    }
+
+    @Override
+    public void report() {
+        sinks.keySet().forEach(MetricSink::report);
+    }
+
+    @Override
+    public void register(MetricSink sink,Config config) {
+        LOG.debug("Register {}",sink);
+        sinks.put(sink,config);
+    }
+
+    @Override
+    public void register(MetricSource source) {
+        registry().registerAll(source.registry());
+    }
+
+    @Override
+    public MetricRegistry registry() {
+        return registry;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
new file mode 100644
index 0000000..b5e6c63
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metric.entity;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.eagle.alert.utils.DateTimeUtil;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+
+public class MetricEvent extends TreeMap<String,Object>{
+
+    private static final long serialVersionUID = 6862373651636342744L;
+
+    public static Builder of(String name){
+        return new Builder(name);
+    }
+
+    /**
+     * TODO: Refactor according to ConsoleReporter
+     */
+    public static class Builder{
+        private final String name;
+        private MetricEvent instance;
+        public Builder(String name){
+            this.instance = new MetricEvent();
+            this.name = name;
+        }
+
+        public Builder from(Counter value) {
+//            this.instance.put("type","counter");
+            this.instance.put("count",value.getCount());
+            return this;
+        }
+
+        public MetricEvent build(){
+            this.instance.put("name",name);
+            if(!this.instance.containsKey("timestamp")){
+                this.instance.put("timestamp", DateTimeUtil.getCurrentTimestamp());
+            }
+            return this.instance;
+        }
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public Builder from(Gauge gauge) {
+            Object value = gauge.getValue();
+            if( value instanceof Map){
+                Map<? extends String, ?> map = (Map<? extends String, ?>) value;
+                this.instance.putAll(map);
+            } else {
+                this.instance.put("value", value);
+            }
+            return this;
+        }
+
+        public Builder from(Histogram value) {
+            this.instance.put("count",value.getCount());
+            Snapshot snapshot = value.getSnapshot();
+            this.instance.put("min", snapshot.getMin());
+            this.instance.put("max", snapshot.getMax());
+            this.instance.put("mean", snapshot.getMean());
+            this.instance.put("stddev", snapshot.getStdDev());
+            this.instance.put("median", snapshot.getMedian());
+            this.instance.put("75thPercentile", snapshot.get75thPercentile());
+            this.instance.put("95thPercentile", snapshot.get95thPercentile());
+            this.instance.put("98thPercentile", snapshot.get98thPercentile());
+            this.instance.put("99thPercentile", snapshot.get99thPercentile());
+            this.instance.put("999thPercentile", snapshot.get999thPercentile());
+            return this;
+        }
+
+        public Builder from(Meter value) {
+            this.instance.put("value",value.getCount());
+            this.instance.put("15MinRate",value.getFifteenMinuteRate());
+            this.instance.put("5MinRate",value.getFiveMinuteRate());
+            this.instance.put("mean",value.getMeanRate());
+            this.instance.put("1MinRate",value.getOneMinuteRate());
+            return this;
+        }
+
+        public Builder from(Timer value) {
+//            this.instance.put("type","timer");
+            this.instance.put("value",value.getCount());
+            this.instance.put("15MinRate",value.getFifteenMinuteRate());
+            this.instance.put("5MinRate",value.getFiveMinuteRate());
+            this.instance.put("mean",value.getMeanRate());
+            this.instance.put("1MinRate",value.getOneMinuteRate());
+            return this;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
new file mode 100644
index 0000000..0d0f8b3
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
@@ -0,0 +1,197 @@
+package org.apache.eagle.alert.metric.reporter;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.SortedMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.eagle.alert.metric.entity.MetricEvent;
+import org.apache.eagle.alert.utils.ByteUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+public class KafkaReporter extends ScheduledReporter {
+	private final static Logger LOG = LoggerFactory.getLogger(KafkaReporter.class);
+	private final String topic;
+	private final Properties properties;
+	private final Producer<byte[], String> producer;
+	private final Map<String, Object> additionalFields;
+
+	protected KafkaReporter(MetricRegistry registry, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, String topic, Properties config, Map<String, Object> additionalFields) {
+		super(registry, "kafka-reporter", filter, rateUnit, durationUnit);
+		this.topic = topic;
+		this.properties = new Properties();
+		Preconditions.checkNotNull(topic,"topic should not be null");
+//		properties.put("bootstrap.servers", brokerList);
+//		properties.put("metadata.broker.list", brokerList);
+		properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		properties.put("request.required.acks", "1");
+		properties.put("key.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer");
+		properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		if(config != null) {
+			LOG.info(config.toString());
+			properties.putAll(config);
+		}
+		this.additionalFields = additionalFields;
+		this.producer = new KafkaProducer<>(properties);
+		LOG.info("Initialized kafka-reporter");
+	}
+
+	@SuppressWarnings("rawtypes")
+    @Override
+	public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
+		for(SortedMap.Entry<String, Gauge> entry:gauges.entrySet()){
+			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+		}
+		for(SortedMap.Entry<String, Counter> entry:counters.entrySet()){
+			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+		}
+		for(SortedMap.Entry<String, Histogram> entry:histograms.entrySet()){
+			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+		}
+		for(SortedMap.Entry<String, Meter> entry:meters.entrySet()){
+			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+		}
+		for(SortedMap.Entry<String, Timer> entry:timers.entrySet()){
+			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+		}
+	}
+
+	private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+	private void onMetricEvent(MetricEvent event){
+		try {
+			if(additionalFields!=null){
+				event.putAll(additionalFields);
+			}
+			// TODO: Support configurable partition key
+			byte[] key = ByteUtils.intToBytes(event.hashCode());
+			ProducerRecord<byte[],String> record  = new ProducerRecord<>(topic, key, OBJECT_MAPPER.writeValueAsString(event));
+			// TODO: Support configuration timeout
+			this.producer.send(record).get(5,TimeUnit.SECONDS);
+		} catch (JsonProcessingException e) {
+			LOG.error("Failed to serialize {} as json",event,e);
+		} catch (InterruptedException | ExecutionException | TimeoutException e) {
+			LOG.error("Failed to produce message to topic {}",topic,e);
+		}
+	}
+
+	@Override
+	public void stop() {
+		this.producer.close();
+		super.stop();
+	}
+
+	@Override
+	public void close() {
+		this.producer.close();
+		super.close();
+	}
+
+	public static Builder forRegistry(MetricRegistry registry){
+		return new Builder(registry);
+	}
+
+	public static class Builder{
+		private final MetricRegistry registry;
+		private TimeUnit rateUnit;
+		private TimeUnit durationUnit;
+		private MetricFilter filter;
+		private String topic;
+		private Properties properties;
+		private Map<String, Object> additionalFields;
+
+		private Builder(MetricRegistry registry) {
+			this.registry = registry;
+			this.rateUnit = TimeUnit.SECONDS;
+			this.durationUnit = TimeUnit.MILLISECONDS;
+			this.filter = MetricFilter.ALL;
+		}
+
+		/**
+		 * Convert rates to the given time unit.
+		 *
+		 * @param rateUnit a unit of time
+		 * @return {@code this}
+		 */
+		public Builder convertRatesTo(TimeUnit rateUnit) {
+			this.rateUnit = rateUnit;
+			return this;
+		}
+
+		/**
+		 * Convert durations to the given time unit.
+		 *
+		 * @param durationUnit a unit of time
+		 * @return {@code this}
+		 */
+		public Builder convertDurationsTo(TimeUnit durationUnit) {
+			this.durationUnit = durationUnit;
+			return this;
+		}
+
+		/**
+		 * Only report metrics which match the given filter.
+		 *
+		 * @param filter a {@link MetricFilter}
+		 * @return {@code this}
+		 */
+		public Builder filter(MetricFilter filter) {
+			this.filter = filter;
+			return this;
+		}
+
+		public Builder topic(String topic){
+			this.topic = topic;
+			return this;
+		}
+
+		public Builder config(Properties properties){
+			this.properties = properties;
+			return this;
+		}
+
+		/**
+		 * Builds a {@link ConsoleReporter} with the given properties.
+		 *
+		 * @return a {@link ConsoleReporter}
+		 */
+		public KafkaReporter build() {
+			if(topic == null && properties!=null) topic = properties.getProperty("topic");
+			return new KafkaReporter(registry,filter,rateUnit,durationUnit,topic,properties,additionalFields);
+		}
+
+		@SuppressWarnings("serial")
+        public Builder config(Config config) {
+			this.config(new Properties(){{
+				putAll(config.root().unwrapped());
+			}});
+			return this;
+		}
+
+		public Builder addFields(Map<String, Object> tags) {
+			this.additionalFields = tags;
+			return this;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
new file mode 100644
index 0000000..fd6cc41
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
@@ -0,0 +1,47 @@
+package org.apache.eagle.alert.metric.sink;
+
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class ConsoleSink implements MetricSink {
+    private ConsoleReporter reporter;
+    @Override
+    public void prepare(Config config, MetricRegistry registry) {
+        reporter = ConsoleReporter.forRegistry(registry).build();
+    }
+
+    @Override
+    public void start(long period,TimeUnit unit) {
+        reporter.start(period, unit);
+    }
+
+    @Override
+    public void stop() {
+        reporter.stop();
+        reporter.close();
+    }
+
+    @Override
+    public void report() {
+        reporter.report();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
new file mode 100644
index 0000000..7e30b82
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metric.sink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.elasticsearch.metrics.ElasticsearchReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+public class ElasticSearchSink implements MetricSink {
+    private ElasticsearchReporter reporter = null;
+    private final static Logger LOG = LoggerFactory.getLogger(ElasticSearchSink.class);
+
+    @Override
+    public void prepare(Config config, MetricRegistry registry) {
+        LOG.debug("Preparing elasticsearch-sink");
+        try {
+            ElasticsearchReporter.Builder builder = ElasticsearchReporter.forRegistry(registry);
+            if(config.hasPath("hosts")){
+                List<String> hosts = config.getStringList("hosts");
+                builder.hosts(hosts.toArray(new String[hosts.size()]));
+            }
+            if(config.hasPath("index")){
+                builder.index(config.getString("index"));
+            }
+            builder.indexDateFormat("yyyy-MM-dd");
+            builder.timestampFieldname(config.hasPath("timestampField")?config.getString("timestampField"):"@timestamp");
+
+            if(config.hasPath("tags")) {
+                builder.additionalFields(config.getConfig("tags").root().unwrapped());
+            }
+
+            reporter = builder.build();
+        } catch (IOException e) {
+            LOG.error(e.getMessage(),e);
+        }
+    }
+
+    @Override
+    public void start(long period, TimeUnit unit) {
+        reporter.start(period, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void stop() {
+        reporter.stop();
+        reporter.close();
+    }
+
+    @Override
+    public void report() {
+        reporter.report();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java
new file mode 100644
index 0000000..fddaf19
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metric.sink;
+
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+public class JmxSink implements MetricSink {
+    private JmxReporter reporter;
+
+    @Override
+    public void prepare(Config config, MetricRegistry registry) {
+        reporter = JmxReporter.forRegistry(registry).build();
+    }
+
+    @Override
+    public void start(long period, TimeUnit unit) {
+        reporter.start();
+    }
+
+    @Override
+    public void stop() {
+        reporter.stop();
+        reporter.close();
+    }
+
+    @Override
+    public void report() {
+        // do nothing
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
new file mode 100644
index 0000000..6ff000c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
@@ -0,0 +1,65 @@
+package org.apache.eagle.alert.metric.sink;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.metric.reporter.KafkaReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class KafkaSink implements MetricSink{
+    private KafkaReporter reporter;
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+    @Override
+    public void prepare(Config config, MetricRegistry registry) {
+        LOG.debug("Preparing kafka-sink");
+        KafkaReporter.Builder builder = KafkaReporter.forRegistry(registry)
+                .topic(config.getString("topic"))
+                .config(config);
+
+        if(config.hasPath("tags")){
+            builder.addFields(config.getConfig("tags").root().unwrapped());
+        }
+
+        reporter = builder.build();
+        LOG.info("Prepared kafka-sink");
+    }
+
+    @Override
+    public void start(long period, TimeUnit unit) {
+        LOG.info("Starting");
+        reporter.start(period,unit);
+    }
+
+    @Override
+    public void stop() {
+        LOG.info("Stopping");
+        reporter.stop();
+
+        LOG.info("Closing");
+        reporter.close();
+    }
+
+    @Override
+    public void report() {
+        reporter.report();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java
new file mode 100644
index 0000000..b09eda3
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metric.sink;
+
+
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+public interface MetricSink {
+    void prepare(Config config, MetricRegistry registry);
+    void start(long period,TimeUnit unit);
+    void stop();
+    void report();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java
new file mode 100644
index 0000000..b4126f2
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java
@@ -0,0 +1,47 @@
+package org.apache.eagle.alert.metric.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class MetricSinkRepository {
+    private final static Map<String,Class<? extends MetricSink>> sinkTypeClassMapping = new HashMap<>();
+
+    public static void register(String sinkType,Class<? extends MetricSink> sinkClass){
+        sinkTypeClassMapping.put(sinkType,sinkClass);
+    }
+
+    public static MetricSink createSink(String sinkType){
+        if (!sinkTypeClassMapping.containsKey(sinkType)) {
+            throw new IllegalArgumentException("Unknown sink type: "+sinkType);
+        }
+        try {
+            return sinkTypeClassMapping.get(sinkType).newInstance();
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    static {
+        register("kafka",KafkaSink.class);
+        register("jmx",JmxSink.class);
+        register("elasticsearch",ElasticSearchSink.class);
+        register("stdout",ConsoleSink.class);
+        register("logger",Slf4jSink.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java
new file mode 100644
index 0000000..ce465fa
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metric.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
+import com.typesafe.config.Config;
+
+public class Slf4jSink implements MetricSink {
+    private Slf4jReporter reporter;
+
+    @SuppressWarnings("serial")
+    private final static Map<String,Slf4jReporter.LoggingLevel> LEVEL_MAPPING = new HashMap<String,Slf4jReporter.LoggingLevel>(){{
+        put("INFO",Slf4jReporter.LoggingLevel.INFO);
+        put("DEBUG",Slf4jReporter.LoggingLevel.DEBUG);
+        put("ERROR",Slf4jReporter.LoggingLevel.ERROR);
+        put("TRACE",Slf4jReporter.LoggingLevel.TRACE);
+        put("WARN",Slf4jReporter.LoggingLevel.WARN);
+    }};
+
+    private static Slf4jReporter.LoggingLevel getLoggingLevel(String level){
+        if(LEVEL_MAPPING.containsKey(level.toUpperCase())){
+            return LEVEL_MAPPING.get(level.toUpperCase());
+        } else{
+            throw new IllegalArgumentException("Illegal logging level: "+level);
+        }
+    }
+
+    @Override
+    public void prepare(Config config, MetricRegistry registry) {
+        reporter = Slf4jReporter.forRegistry(registry)
+                .outputTo(LoggerFactory.getLogger("org.apache.eagle.alert.metric"))
+                .withLoggingLevel(config.hasPath("level")? getLoggingLevel(config.getString("level")): Slf4jReporter.LoggingLevel.INFO)
+                .convertRatesTo(TimeUnit.SECONDS)
+                .convertDurationsTo(TimeUnit.MILLISECONDS)
+                .build();
+    }
+
+    @Override
+    public void start(long period,TimeUnit unit) {
+        reporter.start(period,unit);
+    }
+
+    @Override
+    public void stop() {
+        reporter.stop();
+        reporter.close();
+    }
+
+    @Override
+    public void report() {
+        reporter.report();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java
new file mode 100644
index 0000000..a5bb5f4
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java
@@ -0,0 +1,41 @@
+package org.apache.eagle.alert.metric.source;
+
+import com.codahale.metrics.JvmAttributeGaugeSet;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class JVMMetricSource implements MetricSource{
+
+    private MetricRegistry registry = new MetricRegistry();
+
+    public JVMMetricSource(){
+        registry.registerAll(new JvmAttributeGaugeSet());
+        registry.registerAll(new MemoryUsageGaugeSet());
+    }
+
+    @Override
+    public String name() {
+        return "jvm";
+    }
+
+    @Override
+    public MetricRegistry registry() {
+        return registry;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java
new file mode 100644
index 0000000..59b7a02
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java
@@ -0,0 +1,24 @@
+package org.apache.eagle.alert.metric.source;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface MetricSource {
+    String name();
+    MetricRegistry registry();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java
new file mode 100644
index 0000000..137fc17
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java
@@ -0,0 +1,39 @@
+package org.apache.eagle.alert.metric.source;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class MetricSourceWrapper implements MetricSource {
+    private final MetricRegistry registry;
+    private final String name;
+
+    public MetricSourceWrapper(String name, MetricRegistry registry){
+        this.name = name;
+        this.registry = registry;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public MetricRegistry registry() {
+        return registry;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java
new file mode 100644
index 0000000..addab44
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.resource;
+
+import java.io.IOException;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * A simple allow all CORS filter that works with swagger UI. Tomcat CORS filter
+ * doesn't support Origin: null case which is the swagger UI request.
+ * 
+ * @since Apr 15, 2016
+ *
+ */
+public class SimpleCORSFiler implements Filter {
+
+    @Override
+    public void init(FilterConfig filterConfig) throws ServletException {
+
+    }
+
+    @Override
+    public void doFilter(ServletRequest request, ServletResponse res, FilterChain chain) throws IOException,
+            ServletException {
+        HttpServletResponse response = (HttpServletResponse) res;
+        response.setHeader("Access-Control-Allow-Origin", "*");
+        response.setHeader("Access-Control-Allow-Methods", "HEAD, POST, GET, OPTIONS, DELETE");
+        response.setHeader("Access-Control-Max-Age", "3600");
+        response.setHeader("Access-Control-Allow-Headers",
+                "Origin, Accept, X-Requested-With, Content-Type, Access-Control-Request-Method, Access-Control-Request-Headers");
+        chain.doFilter(request, response);
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
new file mode 100644
index 0000000..3609c0a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
@@ -0,0 +1,69 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.service;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+
+/**
+ * service stub to get metadata from remote metadata service
+ */
+public interface IMetadataServiceClient extends Closeable, Serializable {
+
+    // user metadta
+    void addStreamingCluster(StreamingCluster cluster);
+    List<StreamingCluster> listClusters();
+    
+    List<Topology> listTopologies();
+    void addTopology(Topology t);
+
+    void addPolicy(PolicyDefinition policy);
+    List<PolicyDefinition> listPolicies();
+
+    void addStreamDefinition(StreamDefinition streamDef);
+    List<StreamDefinition> listStreams();
+
+    void addDataSource(Kafka2TupleMetadata k2t);
+    List<Kafka2TupleMetadata> listDataSources();
+
+    void addPublishment(Publishment pub);
+    List<Publishment> listPublishment();
+
+    // monitor metadata
+    List<SpoutSpec> listSpoutMetadata();
+
+    ScheduleState getVersionedSpec();
+    ScheduleState getVersionedSpec(String version);
+    void addScheduleState(ScheduleState state);
+    
+    void clear();
+    
+    // for topology mgmt
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
new file mode 100644
index 0000000..3460b77
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
@@ -0,0 +1,235 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.typesafe.config.Config;
+
+public class MetadataServiceClientImpl implements IMetadataServiceClient {
+    private static final long serialVersionUID = 3003976065082684128L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetadataServiceClientImpl.class);
+
+    private static final String METADATA_SCHEDULESTATES_PATH = "/metadata/schedulestates";
+    private static final String METADATA_PUBLISHMENTS_PATH = "/metadata/publishments";
+    private static final String METADATA_DATASOURCES_PATH = "/metadata/datasources";
+    private static final String METADATA_STREAMS_PATH = "/metadata/streams";
+    private static final String METADATA_POLICIES_PATH = "/metadata/policies";
+    private static final String METADATA_CLUSTERS_PATH = "/metadata/clusters";
+    private static final String METADATA_TOPOLOGY_PATH = "/metadata/topologies";
+    private static final String METADATA_CLEAR_PATH = "/metadata/clear";
+
+    private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
+    private static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
+    private static final String EAGLE_CORRELATION_SERVICE_HOST = "metadataService.host";
+
+    protected static final String CONTENT_TYPE = "Content-Type";
+
+    private String host;
+    private int port;
+    private String context;
+    private transient Client client;
+    private String basePath;
+
+    public MetadataServiceClientImpl(Config config) {
+        this(config.getString(EAGLE_CORRELATION_SERVICE_HOST), config.getInt(EAGLE_CORRELATION_SERVICE_PORT), config
+                .getString(EAGLE_CORRELATION_CONTEXT));
+        basePath = buildBasePath();
+    }
+
+    public MetadataServiceClientImpl(String host, int port, String context) {
+        this.host = host;
+        this.port = port;
+        this.context = context;
+        this.basePath = buildBasePath();
+        ClientConfig cc = new DefaultClientConfig();
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
+        cc.getClasses().add(JacksonJsonProvider.class);
+        cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+        this.client = Client.create(cc);
+        client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
+    }
+
+    private String buildBasePath() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("http://");
+        sb.append(host);
+        sb.append(":");
+        sb.append(port);
+        sb.append(context);
+        return sb.toString();
+    }
+
+    @Override
+    public void close() throws IOException {
+        client.destroy();
+    }
+
+    @Override
+    public List<SpoutSpec> listSpoutMetadata() {
+        ScheduleState state = getVersionedSpec();
+        return new ArrayList<>(state.getSpoutSpecs().values());
+    }
+
+    @Override
+    public List<StreamingCluster> listClusters() {
+        return list(METADATA_CLUSTERS_PATH, new GenericType<List<StreamingCluster>>() {
+        });
+    }
+
+    @Override
+    public List<PolicyDefinition> listPolicies() {
+        return list(METADATA_POLICIES_PATH, new GenericType<List<PolicyDefinition>>() {
+        });
+    }
+
+    @Override
+    public List<StreamDefinition> listStreams() {
+        return list(METADATA_STREAMS_PATH, new GenericType<List<StreamDefinition>>() {
+        });
+    }
+
+    @Override
+    public List<Kafka2TupleMetadata> listDataSources() {
+        return list(METADATA_DATASOURCES_PATH, new GenericType<List<Kafka2TupleMetadata>>() {
+        });
+    }
+
+    private <T> List<T> list(String path, GenericType<List<T>> type) {
+        WebResource r = client.resource(basePath + path);
+        LOG.info("query URL {}", basePath + path);
+        List<T> ret = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).get(type);
+        return ret;
+    }
+
+    @Override
+    public List<Publishment> listPublishment() {
+        return list(METADATA_PUBLISHMENTS_PATH, new GenericType<List<Publishment>>() {
+        });
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec(String version) {
+        return listOne(METADATA_SCHEDULESTATES_PATH + "/" + version, ScheduleState.class);
+    }
+
+    private <T> T listOne(String path, Class<T> tClz) {
+        LOG.info("query URL {}", basePath + path);
+        WebResource r = client.resource(basePath + path);
+
+        ClientResponse resp = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON)
+                .get(ClientResponse.class);
+        if (resp.getStatus() < 300) {
+            try {
+                return resp.getEntity(tClz);
+            } catch (Exception e) {
+                LOG.warn(" list one entity failed, ignored and continute, path {}, message {}!", path, e.getMessage());
+            }
+        }else{
+            LOG.warn("fail querying metadata service {} with http status {}", basePath + path, resp.getStatus());
+        }
+        return null;
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec() {
+        return listOne(METADATA_SCHEDULESTATES_PATH, ScheduleState.class);
+    }
+
+    @Override
+    public void addScheduleState(ScheduleState state) {
+        WebResource r = client.resource(basePath + METADATA_SCHEDULESTATES_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(state);
+    }
+
+    @Override
+    public List<Topology> listTopologies() {
+        return list(METADATA_TOPOLOGY_PATH, new GenericType<List<Topology>>() {
+        });
+    }
+
+    @Override
+    public void addStreamingCluster(StreamingCluster cluster) {
+        WebResource r = client.resource(basePath + METADATA_CLUSTERS_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(cluster);
+    }
+
+    @Override
+    public void addTopology(Topology t) {
+        WebResource r = client.resource(basePath + METADATA_TOPOLOGY_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(t);
+    }
+
+    @Override
+    public void addPolicy(PolicyDefinition policy) {
+        WebResource r = client.resource(basePath + METADATA_POLICIES_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(policy);
+    }
+
+    @Override
+    public void addStreamDefinition(StreamDefinition streamDef) {
+        WebResource r = client.resource(basePath + METADATA_STREAMS_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(streamDef);
+    }
+
+    @Override
+    public void addDataSource(Kafka2TupleMetadata k2t) {
+        WebResource r = client.resource(basePath + METADATA_DATASOURCES_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(k2t);
+    }
+
+    @Override
+    public void addPublishment(Publishment pub) {
+        WebResource r = client.resource(basePath + METADATA_PUBLISHMENTS_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(pub);
+    }
+
+    @Override
+    public void clear() {
+        WebResource r = client.resource(basePath + METADATA_CLEAR_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
new file mode 100644
index 0000000..87c45d8
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
@@ -0,0 +1,27 @@
+package org.apache.eagle.alert.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class AlertConstants {
+    public final static String FIELD_0 = "f0";
+    public final static String FIELD_1 = "f1";
+    public final static String FIELD_2 = "f2";
+    public final static String FIELD_3 = "f3";
+
+    public final static String DEFAULT_SPOUT_NAME = "alertEngineSpout";
+    public final static String DEFAULT_ROUTERBOLT_NAME = "streamRouterBolt";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java
new file mode 100644
index 0000000..636716d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java
@@ -0,0 +1,172 @@
+package org.apache.eagle.alert.utils;
+
+import java.io.UnsupportedEncodingException;
+
+public class ByteUtils {
+
+	public static double bytesToDouble(byte[] bytes, int offset){
+		return Double.longBitsToDouble(bytesToLong(bytes, offset));
+	}
+	
+	public static double bytesToDouble(byte[] bytes){
+		return Double.longBitsToDouble(bytesToLong(bytes));
+	}
+	
+	public static void doubleToBytes(double v, byte[] bytes){
+		doubleToBytes(v, bytes, 0);
+	}
+	
+	public static void doubleToBytes(double v, byte[] bytes, int offset){
+		longToBytes(Double.doubleToLongBits(v), bytes, offset);
+	}
+	
+	public static byte[] doubleToBytes(double v){
+		return longToBytes(Double.doubleToLongBits(v));
+	}
+	
+	public static long bytesToLong(byte[] bytes){
+		return bytesToLong(bytes, 0);
+	}
+	
+	public static long bytesToLong(byte[] bytes, int offset){
+		long value = 0;
+		for(int i=0; i<8; i++){
+			value <<= 8;
+			value |= (bytes[i+offset] & 0xFF);
+		}
+		return value;
+	}
+	
+	public static void longToBytes(long v, byte[] bytes){
+		longToBytes(v, bytes, 0);
+	}
+	
+	public static void longToBytes(long v, byte[] bytes, int offset){
+		long tmp = v;
+		for(int i=0; i<8; i++){
+			bytes[offset + 7 - i] = (byte)(tmp & 0xFF);
+			tmp >>= 8;
+		}
+	}
+	
+	public static byte[] longToBytes(long v){
+		long tmp = v;
+		byte[] b = new byte[8];
+		for(int i=0; i<8; i++){
+			b[7-i] = (byte)(tmp & 0xFF);
+			tmp >>= 8;
+		}
+		return b;
+	}
+	
+	public static int bytesToInt(byte[] bytes){
+		return bytesToInt(bytes, 0);
+	}
+	
+	public static int bytesToInt(byte[] bytes, int offset){
+		int value = 0;
+		for(int i=0; i<4; i++){
+			value <<= 8;
+			value |= (bytes[i+offset] & 0xFF);
+		}
+		return value;
+	}
+	
+	public static void intToBytes(int v, byte[] bytes){
+		intToBytes(v, bytes, 0);
+	}
+	
+	public static void intToBytes(int v, byte[] bytes, int offset){
+		int tmp = v;
+		for(int i=0; i<4; i++){
+			bytes[offset + 3 - i] = (byte)(tmp & 0xFF);
+			tmp >>= 8;
+		}
+	}
+
+	public static byte[] intToBytes(int v){
+		int tmp = v;
+		byte[] b = new byte[4];
+		for(int i=0; i<4; i++){
+			b[3-i] = (byte)(tmp & 0xFF);
+			tmp >>= 8;
+		}
+		return b;
+	}
+
+	//////
+	
+	public static short bytesToShort(byte[] bytes){
+		return bytesToShort(bytes, 0);
+	}
+	
+	public static short bytesToShort(byte[] bytes, int offset){
+		short value = 0;
+		for(int i=0; i < 2; i++){
+			value <<= 8;
+			value |= (bytes[i+offset] & 0xFF);
+		}
+		return value;
+	}
+	
+	public static void shortToBytes(short v, byte[] bytes){
+		shortToBytes(v, bytes, 0);
+	}
+	
+	public static void shortToBytes(short v, byte[] bytes, int offset){
+		int tmp = v;
+		for(int i=0; i < 2; i++){
+			bytes[offset + 1 - i] = (byte)(tmp & 0xFF);
+			tmp >>= 8;
+		}
+	}
+
+	public static byte[] shortToBytes(short v){
+		int tmp = v;
+		byte[] b = new byte[2];
+		for(int i=0; i<2; i++){
+			b[1-i] = (byte)(tmp & 0xFF);
+			tmp >>= 8;
+		}
+		return b;
+	}
+
+	public static byte[] concat(byte[]... arrays) {
+        int length = 0;
+        for (byte[] array : arrays) {
+            length += array.length;
+        }
+        byte[] result = new byte[length];
+        int pos = 0;
+        for (byte[] array : arrays) {
+            System.arraycopy(array, 0, result, pos, array.length);
+            pos += array.length;
+        }
+        return result;
+    }
+
+	public static byte[] stringToBytes(String str) {
+		try {
+			return str.getBytes("UTF-8");
+		} catch (UnsupportedEncodingException e) {
+			throw new IllegalStateException(e);
+		}
+	}
+
+//    public static void main(String[] args){ 
+//    	int a = "ThreadName".hashCode();
+//    	byte[] b = intToBytes(a);
+//    	byte[] c = intToBytes(1676687583);
+//    	String s = new String(b);
+//    	System.out.println(s);
+    	
+//    	byte[] d = intToBytes(8652353);
+//    	System.out.println(bytesToInt(d));
+    	
+//    	byte[] e = longToBytes(12131513513l);
+//    	System.out.println(bytesToLong(e));
+//    	if(12131513513l == bytesToLong(e)){
+//    		System.out.println("yes");
+//    	}
+//    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java
new file mode 100644
index 0000000..685265f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java
@@ -0,0 +1,31 @@
+package org.apache.eagle.alert.utils;
+
+import java.util.Properties;
+
+import com.typesafe.config.Config;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class ConfigUtils {
+
+    @SuppressWarnings("serial")
+    public static Properties toProperties(Config config){
+        return new Properties(){{
+            putAll(config.root().unwrapped());
+        }};
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java
new file mode 100644
index 0000000..d611b95
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.utils;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * be aware that SimpleDateFormat instantiation is expensive, so if that's under a tight loop, probably we need
+ * a thread local SimpleDateFormat object
+ */
+public class DateTimeUtil {
+	public static final long ONESECOND = 1L * 1000L;
+	public static final long ONEMINUTE = 1L * 60L * 1000L;
+	public static final long ONEHOUR = 1L * 60L * 60L * 1000L;
+	public static final long ONEDAY = 24L * 60L * 60L * 1000L;
+    private static TimeZone CURRENT_TIME_ZONE = TimeZone.getDefault();
+
+	public static Date humanDateToDate(String date) throws ParseException {
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(CURRENT_TIME_ZONE);
+		return sdf.parse(date);
+	}
+
+	public static long getCurrentTimestamp(){
+		return System.currentTimeMillis();
+	}
+	
+	public static String secondsToHumanDate(long seconds){
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(CURRENT_TIME_ZONE);
+		Date t = new Date();
+		t.setTime(seconds*1000);
+		return sdf.format(t);
+	}
+	
+	public static String millisecondsToHumanDateWithMilliseconds(long milliseconds){
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+        sdf.setTimeZone(CURRENT_TIME_ZONE);
+		Date t = new Date();
+		t.setTime(milliseconds);
+		return sdf.format(t);
+	}
+	
+	public static String millisecondsToHumanDateWithSeconds(long milliseconds){
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(CURRENT_TIME_ZONE);
+		Date t = new Date();
+		t.setTime(milliseconds);
+		return sdf.format(t);
+	}
+	
+	public static long humanDateToSeconds(String date) throws ParseException {
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(CURRENT_TIME_ZONE);
+		Date d = sdf.parse(date);
+		return d.getTime()/1000;
+	}
+	
+	public static long humanDateToMilliseconds(String date) throws ParseException {
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+        sdf.setTimeZone(CURRENT_TIME_ZONE);
+		Date d = sdf.parse(date);
+		return d.getTime();
+	}
+	
+	
+	public static long humanDateToMillisecondsWithoutException(String date){
+		try{
+			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+            sdf.setTimeZone(CURRENT_TIME_ZONE);
+			Date d = sdf.parse(date);
+			return d.getTime();
+		}catch(ParseException ex){
+			return 0L;
+		}
+	}
+	
+	public static long humanDateToSecondsWithoutException(String date){
+		try{
+			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            sdf.setTimeZone(CURRENT_TIME_ZONE);
+			Date d = sdf.parse(date);
+			return (d.getTime() / 1000);
+		}catch(ParseException ex){
+			return 0L;
+		}
+	}
+	/**
+	 * this could be accurate only when timezone is UTC
+	 * for the timezones other than UTC, there is possibly issue, for example
+	 * assume timezone is GMT+8 in China
+	 * When user time is "2014-07-15 05:00:00", it will be converted to timestamp first, internally it would be  "2014-07-14 21:00:00" in UTC timezone. When rounded down to day, the internal time would 
+	 * be changed to "2014-07-14 00:00:00", and that means the user time is "2014-07-14 08:00:00". But originally user wants to round it to "2014-07-15 00:00:00"
+	 * 
+	 * @param field
+	 * @param timeInMillis the seconds elapsed since 1970-01-01 00:00:00
+	 * @return
+	 */
+	public static long roundDown(int field, long timeInMillis){
+		switch(field){
+			case Calendar.DAY_OF_MONTH:
+			case Calendar.DAY_OF_WEEK:
+			case Calendar.DAY_OF_YEAR:
+				return (timeInMillis - timeInMillis % (24*60*60*1000));
+			case Calendar.HOUR:
+				return (timeInMillis - timeInMillis % (60*60*1000));
+			case Calendar.MINUTE:
+				return (timeInMillis - timeInMillis % (60*1000));
+			case Calendar.SECOND:
+				return (timeInMillis - timeInMillis % (1000));
+			default:
+				return 0L;
+		}
+	}
+
+	public static String format(long milliseconds, String format) {
+		SimpleDateFormat sdf = new SimpleDateFormat(format);
+        sdf.setTimeZone(CURRENT_TIME_ZONE);
+		Date t = new Date();
+		t.setTime(milliseconds);
+		return sdf.format(t);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java
new file mode 100644
index 0000000..3e3b6d5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java
@@ -0,0 +1,70 @@
+
+package org.apache.eagle.alert.utils;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Enumeration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * http://stackoverflow.com/questions/7348711/recommended-way-to-get-hostname-in-java
+ */
+public class HostUtils {
+	private static final Logger logger = LoggerFactory
+			.getLogger(HostUtils.class);
+
+	public static String getHostName() {
+		try {
+			String hostName = InetAddress.getLocalHost().getHostName();
+			if (hostName != null && !hostName.isEmpty()) {
+				return hostName;
+			}
+		} catch (UnknownHostException e) {
+			logger.error("get hostName error!", e);
+		}
+
+		String host = System.getenv("COMPUTERNAME");
+		if (host != null)
+			return host;
+		host = System.getenv("HOSTNAME");
+		if (host != null)
+			return host;
+
+		return null;
+	}
+
+	public static String getNotLoopbackAddress() {
+		String hostName = null;
+		Enumeration<NetworkInterface> interfaces;
+		try {
+			interfaces = NetworkInterface.getNetworkInterfaces();
+			while (interfaces.hasMoreElements()) {
+				NetworkInterface nic = interfaces.nextElement();
+				Enumeration<InetAddress> addresses = nic.getInetAddresses();
+				while (hostName == null && addresses.hasMoreElements()) {
+					InetAddress address = addresses.nextElement();
+					if (!address.isLoopbackAddress()) {
+						hostName = address.getHostName();
+					}
+				}
+			}
+		} catch (SocketException e) {
+			logger.error("getNotLoopbackAddress error!", e);
+		}
+		return hostName;
+	}
+
+	public static String getHostAddress() {
+		try {
+			return InetAddress.getLocalHost().getHostAddress();
+		} catch (UnknownHostException e) {
+			logger.error("get hostAddress error!", e);
+		}
+
+		return null;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
new file mode 100644
index 0000000..de0c48b
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * @since May 1, 2016
+ *
+ */
+public class JsonUtils {
+
+    public static final ObjectMapper mapper = new ObjectMapper();
+    private static final Logger LOG = LoggerFactory.getLogger(JsonUtils.class);
+
+    public static String writeValueAsString(Object o) {
+        try {
+            return mapper.writeValueAsString(o);
+        } catch (Exception e) {
+            LOG.error("write object as string failed {} !", o);
+        }
+        return "";
+    }
+}



Mime
View raw message