eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject eagle git commit: [EAGLE-950] Add REST Stream Proxy for easy integration
Date Sun, 12 Mar 2017 12:01:27 GMT
Repository: eagle
Updated Branches:
  refs/heads/master dd16620e2 -> d5dce2b34


[EAGLE-950] Add REST Stream Proxy for easy integration

https://issues.apache.org/jira/browse/EAGLE-950

Add API for `POST /streams/{STREAM_ID}` to populate real-time data into stream through REST API, so that
monitoring cases could easily integrate with eagle by some native tools like `curl` or scripts

For example:
* Step 1. Support service team to collect metric with SINGLE COMMAND in Ad-Hoc monitoring scripts like `curl`

        curl --basic -u $USERNAME:$PASSWORD \
            -H "Content-Type: application/json" -X POST \
            http://localhot:9090/rest/streams/HADOOP_JMX_METRIC_STREAM_SANDBOX \
            -d '[
                {
                    "timestamp": 1489318310626,
                    "metric": "hadoop.metric.sample",
                    "component": "namenode",
                    "site": "sandbox",
                    "value": 55044096.0,
                    "host": "sandbox"
                },{
                    "timestamp": 1489318320626,
                    "metric": "hadoop.metric.sample",
                    "component": "namenode",
                    "site": "sandbox",
                    "value": 55044096.0,
                    "host": "sandbox"
                },{
                    "timestamp": 1489318330626,
                    "metric": "hadoop.metric.sample",
                    "component": "namenode",
                    "site": "sandbox",
                    "value": 55044096.0,
                    "host": "sandbox"
                }
            ]'

* Step 2: Define Alerting Policies
* Step 3: Customized Metric Visualization

Author: Hao Chen <hao@apache.org>

Closes #865 from haoch/ImproveMetricMonitor.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/d5dce2b3
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/d5dce2b3
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/d5dce2b3

Branch: refs/heads/master
Commit: d5dce2b3498960417da498895e3f2d9c4241f66b
Parents: dd16620
Author: Hao Chen <hao@apache.org>
Authored: Sun Mar 12 20:01:15 2017 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Sun Mar 12 20:01:15 2017 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/StreamColumn.java  |   2 +-
 .../engine/coordinator/StreamDefinition.java    |   5 +-
 .../alert/utils/StreamValidationException.java  |  39 ++++++
 .../eagle/alert/utils/StreamValidator.java      |  57 ++++++++
 .../coordinator/StreamDefinitionTest.java       |   6 +-
 .../alert/engine/model/StreamEventTest.java     |  13 ++
 .../eagle/app/messaging/StreamRecord.java       |  30 +++++
 .../apache/eagle/app/test/KafkaTestServer.java  |  34 +++++
 .../eagle/app/test/KafkaTestServerImpl.java     |  79 ++++++++++++
 .../eagle-app/eagle-app-streamproxy/pom.xml     |  39 ++++++
 .../proxy/exception/StreamProxyException.java   |  39 ++++++
 .../stream/StreamConfigUpdateListener.java      |  36 ++++++
 .../stream/StreamMetadataUpdateService.java     |  27 ++++
 .../eagle/app/proxy/stream/StreamProxy.java     |  27 ++++
 .../app/proxy/stream/StreamProxyManager.java    |  31 +++++
 .../app/proxy/stream/StreamProxyProducer.java   |  28 ++++
 .../app/proxy/stream/StreamProxyResource.java   |  85 ++++++++++++
 .../impl/KafkaStreamProxyProducerImpl.java      |  88 +++++++++++++
 .../impl/StreamMetadataUpdateServiceImpl.java   | 115 +++++++++++++++++
 .../app/proxy/stream/impl/StreamProxyImpl.java  |  77 +++++++++++
 .../stream/impl/StreamProxyManagerImpl.java     | 129 +++++++++++++++++++
 .../eagle/app/proxy/stream/StreamProxyTest.java |  76 +++++++++++
 .../src/test/resources/application.conf         |  35 +++++
 eagle-core/eagle-app/pom.xml                    |   1 +
 .../apache/eagle/common/rest/RESTResponse.java  |   3 +-
 .../eagle/metric/HadoopMetricMonitorApp.java    |   4 +-
 ...le.metric.HadoopMetricMonitorAppProdiver.xml |   6 +-
 eagle-server/pom.xml                            |  11 ++
 .../server/security/BasicAuthRequestFilter.java |  14 +-
 eagle-server/src/main/webapp/app/dev/index.html |  40 +++---
 .../webapp/app/dev/partials/alert/list.html     |   2 +-
 .../webapp/app/dev/public/js/ctrls/alertCtrl.js |   6 +-
 .../webapp/app/dev/public/js/ctrls/mainCtrl.js  |   2 +-
 33 files changed, 1144 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index ba736fe..abd9dc5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -34,7 +34,7 @@ public class StreamColumn implements Serializable {
     private String name;
     private Type type;
     private Object defaultValue;
-    private boolean required;
+    private boolean required = true;
     private String description;
     private String nodataExpression;
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
index 73d3991..af9d137 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -42,7 +42,7 @@ public class StreamDefinition implements Serializable {
     private String description;
 
     // Is validateable or not
-    private boolean validate;
+    private boolean validate = true;
 
     // Is timeseries-based stream or not
     private boolean timeseries;
@@ -52,7 +52,7 @@ public class StreamDefinition implements Serializable {
     // Stream data source ID
     private String dataSource;
 
-    private String group = "Default";
+    private String group = "global";
 
     //
     private String streamSource;
@@ -125,6 +125,7 @@ public class StreamDefinition implements Serializable {
         this.description = description;
     }
 
+    @Deprecated
     public boolean isValidate() {
         return validate;
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidationException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidationException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidationException.java
new file mode 100644
index 0000000..2f08506
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.utils;
+
+public class StreamValidationException extends Exception {
+    public StreamValidationException() {
+        super();
+    }
+
+    public StreamValidationException(String message) {
+        super(message);
+    }
+
+    public StreamValidationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public StreamValidationException(Throwable cause) {
+        super(cause);
+    }
+
+    protected StreamValidationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidator.java
new file mode 100644
index 0000000..a1f64d9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class StreamValidator {
+    private final StreamDefinition streamDefinition;
+    private final Map<String, StreamColumn> streamColumnMap;
+
+    public StreamValidator(StreamDefinition streamDefinition) {
+        this.streamDefinition = streamDefinition;
+        this.streamColumnMap = new HashMap<>();
+        for (StreamColumn column : this.streamDefinition.getColumns()) {
+            streamColumnMap.put(column.getName(), column);
+        }
+    }
+
+    public void validateMap(Map<String, Object> event) throws StreamValidationException {
+        final List<String> errors = new LinkedList<>();
+        this.streamDefinition.getColumns().forEach((column -> {
+            if (column.isRequired() && !event.containsKey(column.getName())) {
+                errors.add("[" + column.getName() + "]: required but absent");
+            }
+        }));
+        for (Object eventKey : event.keySet()) {
+            if (!streamColumnMap.containsKey(eventKey)) {
+                errors.add("[" + eventKey + "]: invalid column");
+            }
+        }
+
+        if (errors.size() > 0) {
+            throw new StreamValidationException(errors.size() + " validation errors: " + StringUtils.join(errors.toArray(), "; "));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
index c0d9213..85ef5dc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
@@ -35,15 +35,15 @@ public class StreamDefinitionTest {
         streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
 
         StreamDefinition streamDefinition = new StreamDefinition();
-        Assert.assertEquals("StreamDefinition[group=Default, streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[]", streamDefinition.toString());
+        Assert.assertEquals("StreamDefinition[group=global, streamId=null, dataSource=null, description=null, validate=true, timeseries=false, columns=[]", streamDefinition.toString());
         streamDefinition.setColumns(streamColumns);
 
         Assert.assertEquals(3, streamDefinition.getColumnIndex("data"));
         Assert.assertEquals(-1, streamDefinition.getColumnIndex("DATA"));
         Assert.assertEquals(-1, streamDefinition.getColumnIndex("isYhd"));
-        Assert.assertEquals("StreamDefinition[group=Default, streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition.toString());
+        Assert.assertEquals("StreamDefinition[group=global, streamId=null, dataSource=null, description=null, validate=true, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[true], nodataExpression=[null]]", streamDefinition.toString());
         StreamDefinition streamDefinition1 = streamDefinition.copy();
-        Assert.assertEquals("StreamDefinition[group=Default, streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString());
+        Assert.assertEquals("StreamDefinition[group=global, streamId=null, dataSource=null, description=null, validate=true, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[true], nodataExpression=[null]]", streamDefinition1.toString());
 
         Assert.assertTrue(streamDefinition1.equals(streamDefinition));
         Assert.assertFalse(streamDefinition1 == streamDefinition);

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
index 6543a8d..547ef75 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
@@ -18,6 +18,8 @@ package org.apache.eagle.alert.engine.model;
 
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.utils.StreamValidationException;
+import org.apache.eagle.alert.utils.StreamValidator;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -130,6 +132,17 @@ public class StreamEventTest {
     }
 
     @Test
+    public void testStreamValidator() throws StreamValidationException {
+        StreamDefinition streamDefinition = mockStreamDefinition("TEST_STREAM");
+        StreamValidator validator = new StreamValidator(streamDefinition);
+        thrown.expect(StreamValidationException.class);
+        validator.validateMap(new HashMap<String, Object>() {{
+            put("name", "cpu");
+            put("value", 60.0);
+        }});
+    }
+
+    @Test
     public void testStreamEvent3() {
         List<StreamColumn> streamColumns = new ArrayList<>();
         streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java
new file mode 100644
index 0000000..f76cdbf
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class StreamRecord extends HashMap<String,Object> implements Serializable {
+    public StreamRecord() {
+    }
+
+    public StreamRecord(Map<String,Object> event) {
+        this.putAll(event);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServer.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServer.java
new file mode 100644
index 0000000..9744350
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import java.io.File;
+import java.io.IOException;
+
+public interface KafkaTestServer {
+    void start() throws Exception;
+
+    void stop() throws IOException;
+
+    int getZookeeperPort();
+
+    int getKafkaBrokerPort();
+
+    static KafkaTestServer create(File logDir) {
+        return new KafkaTestServerImpl(logDir);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServerImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServerImpl.java
new file mode 100644
index 0000000..30ce67c
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServerImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+class KafkaTestServerImpl implements KafkaTestServer {
+
+    private final File logDir;
+    private TestingServer zkServer;
+    private CuratorFramework curatorClient;
+    private KafkaServerStartable kafkaServer;
+    private int kafkaPort = InstanceSpec.getRandomPort();
+    private int zookeeperPort = InstanceSpec.getRandomPort();
+
+    public KafkaTestServerImpl(File logDir) {
+        this.logDir = logDir;
+    }
+
+    @Override
+    public void start() throws Exception {
+        zkServer = new TestingServer(zookeeperPort, logDir);
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        curatorClient = CuratorFrameworkFactory.newClient(zkServer.getConnectString(), retryPolicy);
+        curatorClient.start();
+
+        Properties props = new Properties();
+
+        props.setProperty("zookeeper.connect", zkServer.getConnectString());
+        props.setProperty("broker.id", "0");
+        props.setProperty("port", "" + kafkaPort);
+        props.setProperty("log.dirs", logDir.getAbsolutePath());
+        props.setProperty("auto.create.topics.enable", "true");
+
+        kafkaServer = new KafkaServerStartable(new KafkaConfig(props));
+        kafkaServer.startup();
+    }
+
+    @Override
+    public void stop() throws IOException {
+        kafkaServer.shutdown();
+        curatorClient.close();
+        zkServer.close();
+    }
+
+    @Override
+    public int getZookeeperPort() {
+        return this.zookeeperPort;
+    }
+
+    @Override
+    public int getKafkaBrokerPort() {
+        return this.kafkaPort;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/pom.xml b/eagle-core/eagle-app/eagle-app-streamproxy/pom.xml
new file mode 100644
index 0000000..8672d7a
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-app-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-app-streamproxy</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/exception/StreamProxyException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/exception/StreamProxyException.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/exception/StreamProxyException.java
new file mode 100644
index 0000000..094547d
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/exception/StreamProxyException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.exception;
+
+public class StreamProxyException  extends Exception {
+    public StreamProxyException() {
+        super();
+    }
+
+    public StreamProxyException(String message) {
+        super(message);
+    }
+
+    public StreamProxyException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public StreamProxyException(Throwable cause) {
+        super(cause);
+    }
+
+    protected StreamProxyException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamConfigUpdateListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamConfigUpdateListener.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamConfigUpdateListener.java
new file mode 100644
index 0000000..524569e
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamConfigUpdateListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream;
+
+import org.apache.eagle.metadata.model.StreamDesc;
+
+public interface StreamConfigUpdateListener {
+    /**
+     * onStreamAdded listener callback method.
+     */
+    void onStreamAdded(StreamDesc streamDesc);
+
+    /**
+     * onStreamChanged listener callback method.
+     */
+    void onStreamChanged(StreamDesc streamDesc);
+
+    /**
+     * onStreamRemoved listener callback method.
+     */
+    void onStreamRemoved(StreamDesc streamDesc);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamMetadataUpdateService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamMetadataUpdateService.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamMetadataUpdateService.java
new file mode 100644
index 0000000..8ba0978
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamMetadataUpdateService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream;
+
+import org.apache.eagle.metadata.model.StreamDesc;
+
+import java.util.Map;
+
+public interface StreamMetadataUpdateService extends Runnable {
+    Map<String, StreamDesc> getStreamDescSnapshot();
+
+    void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxy.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxy.java
new file mode 100644
index 0000000..275fd59
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream;
+
+import org.apache.eagle.metadata.model.StreamDesc;
+
+import java.io.IOException;
+
+public interface StreamProxy extends StreamProxyProducer {
+    void open(StreamDesc streamDesc);
+
+    void close() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyManager.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyManager.java
new file mode 100644
index 0000000..5ee4765
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream;
+
+import com.google.inject.ImplementedBy;
+import org.apache.eagle.app.proxy.exception.StreamProxyException;
+import org.apache.eagle.app.proxy.stream.impl.StreamProxyManagerImpl;
+import org.apache.eagle.metadata.model.StreamDesc;
+
+import java.util.Collection;
+
+@ImplementedBy(StreamProxyManagerImpl.class)
+public interface StreamProxyManager {
+    StreamProxy getStreamProxy(String streamId) throws StreamProxyException;
+
+    Collection<StreamDesc> getAllStreamDesc();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyProducer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyProducer.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyProducer.java
new file mode 100644
index 0000000..479b8d4
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyProducer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream;
+
+import org.apache.eagle.app.messaging.StreamRecord;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Future;
+
+public interface StreamProxyProducer extends Closeable {
+    void send(List<StreamRecord> events) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyResource.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyResource.java
new file mode 100644
index 0000000..1d5e111
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyResource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.eagle.app.messaging.StreamRecord;
+import org.apache.eagle.common.rest.RESTResponse;
+import org.apache.eagle.common.security.RolesAllowed;
+import org.apache.eagle.common.security.User;
+import org.apache.eagle.metadata.model.StreamDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+@Path("/streams")
+public class StreamProxyResource {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProxyResource.class);
+    @Inject
+    private StreamProxyManager proxyManager;
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Collection<StreamDesc>> getAllStreamDesc() {
+        return RESTResponse.async(() -> proxyManager.getAllStreamDesc()).get();
+    }
+
+    @POST
+    @Path("/{streamId}")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @RolesAllowed( {User.Role.ADMINISTRATOR, User.Role.APPLICATION})
+    public RESTResponse produceEvent(@NotNull List<StreamRecord> records, @PathParam("streamId") String streamId) {
+        return RESTResponse.async((builder) -> {
+            try {
+                Preconditions.checkNotNull(records, "Records is empty");
+                proxyManager.getStreamProxy(streamId).send(records);
+                builder.status(true, Response.Status.OK)
+                    .message(String.format("Successfully wrote %s records into stream %s", records.size(), streamId));
+            } catch (Exception e) {
+                LOGGER.error("Error to write records to stream {}: {}", streamId, e.getMessage(), e);
+                builder.exception(e)
+                    .status(false, Response.Status.BAD_REQUEST)
+                    .message("Failed to write messages to stream " + streamId + ": " + e.getMessage());
+            }
+        }).get();
+    }
+
+    @GET
+    @Path("/{streamId}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse getSingleStreamDesc(@PathParam("streamId") String streamId) {
+        return RESTResponse.async((builder) -> {
+            Optional<StreamDesc> streamDesc = proxyManager.getAllStreamDesc()
+                .stream().filter((desc) -> desc.getStreamId().equalsIgnoreCase(streamId)).findAny();
+            if (streamDesc.isPresent()) {
+                builder.data(streamDesc.get())
+                    .status(true, Response.Status.OK);
+            } else {
+                builder.message("Stream not found, reason: stream not exist or proxy not initialized").status(false, Response.Status.BAD_REQUEST);
+            }
+        }).get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/KafkaStreamProxyProducerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/KafkaStreamProxyProducerImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/KafkaStreamProxyProducerImpl.java
new file mode 100644
index 0000000..4b2b333
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/KafkaStreamProxyProducerImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.eagle.alert.utils.StreamValidator;
+import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
+import org.apache.eagle.app.messaging.StreamRecord;
+import org.apache.eagle.app.proxy.stream.StreamProxyProducer;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaStreamProxyProducerImpl implements StreamProxyProducer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamProxyProducerImpl.class);
+    private final Producer producer;
+    private final KafkaStreamSinkConfig config;
+    private final String streamId;
+
+    public KafkaStreamProxyProducerImpl(String streamId, StreamSinkConfig streamConfig) {
+        Preconditions.checkNotNull(streamConfig, "Stream sink config for " + streamId + " is null");
+        this.streamId = streamId;
+        this.config = (KafkaStreamSinkConfig) streamConfig;
+        Properties properties = new Properties();
+        Preconditions.checkNotNull(config.getBrokerList(), "brokerList is null");
+        properties.put("metadata.broker.list", config.getBrokerList());
+        properties.put("serializer.class", config.getSerializerClass());
+        properties.put("key.serializer.class", config.getKeySerializerClass());
+        // new added properties for async producer
+        properties.put("producer.type", config.getProducerType());
+        properties.put("batch.num.messages", config.getNumBatchMessages());
+        properties.put("request.required.acks", config.getRequestRequiredAcks());
+        properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
+        ProducerConfig producerConfig = new ProducerConfig(properties);
+        this.producer = new Producer(producerConfig);
+    }
+
+    @Override
+    public void send(List<StreamRecord> events) throws IOException {
+        List<KeyedMessage> messages = new ArrayList<>(events.size());
+
+        for (StreamRecord record : events) {
+            String output = new ObjectMapper().writeValueAsString(record);
+            messages.add(new KeyedMessage(this.config.getTopicId(), output));
+        }
+
+        try {
+            // partition key may cause data skew
+            //producer.send(new KeyedMessage(this.topicId, key, output));
+            producer.send(messages);
+        } catch (Exception ex) {
+            LOGGER.error(ex.getMessage(), ex);
+            throw ex;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (this.producer != null) {
+            LOGGER.info("Closing kafka producer for stream {}", this.streamId);
+            this.producer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java
new file mode 100644
index 0000000..10bad33
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream.impl;
+
+import org.apache.eagle.app.proxy.stream.StreamConfigUpdateListener;
+import org.apache.eagle.app.proxy.stream.StreamMetadataUpdateService;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.StreamDesc;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+class StreamMetadataUpdateServiceImpl implements StreamMetadataUpdateService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamMetadataUpdateServiceImpl.class);
+
+    private final ApplicationEntityService applicationEntityService;
+    private final StreamConfigUpdateListener listener;
+    private Map<String, StreamDesc> streamIdDescMap;
+
+    private long startedTime = -1;
+    private long lastUpdatedTime = -1;
+    private static final long UPDATE_INTERVAL_MS = 10 * 1000;
+    private volatile boolean running;
+    private final Object lock;
+
+    StreamMetadataUpdateServiceImpl(StreamConfigUpdateListener listener, ApplicationEntityService applicationEntityService) {
+        this.applicationEntityService = applicationEntityService;
+        this.lock = new Object();
+        this.listener = listener;
+        this.streamIdDescMap = new HashMap<>();
+    }
+
+    @Override
+    public void run() {
+        this.startedTime = System.currentTimeMillis();
+        this.running = true;
+        while (this.running) {
+            try {
+                updateStreamMetadata();
+                Thread.sleep(UPDATE_INTERVAL_MS);
+            } catch (InterruptedException e) {
+                LOGGER.error(e.getMessage(), e);
+            }
+        }
+        LOGGER.info("Shutting down");
+    }
+
+    private void updateStreamMetadata() {
+        synchronized (lock) {
+            LOGGER.debug("Loading stream metadata ...");
+            int added = 0;
+            int changed = 0;
+            int removed = 0;
+            int total = 0;
+            Map<String, StreamDesc> latestStreamIdDescMap = new HashMap<>();
+            for (ApplicationEntity appEntity : applicationEntityService.findAll()) {
+                List<StreamDesc> streamDescList = appEntity.getStreams();
+                if (streamDescList != null && streamDescList.size() > 0) {
+                    for (StreamDesc streamDesc : streamDescList) {
+                        latestStreamIdDescMap.put(streamDesc.getStreamId(), streamDesc);
+                        if (streamIdDescMap.containsKey(streamDesc.getStreamId()) && !Objects.equals(streamDesc, streamIdDescMap.get(streamDesc.getStreamId()))) {
+                            this.listener.onStreamChanged(streamDesc);
+                            changed++;
+                        } else if (!streamIdDescMap.containsKey(streamDesc.getStreamId())) {
+                            added++;
+                            this.listener.onStreamAdded(streamDesc);
+                        }
+                    }
+                }
+                total++;
+            }
+
+            for (String streamId : streamIdDescMap.keySet()) {
+                if (!latestStreamIdDescMap.containsKey(streamId)) {
+                    removed++;
+                    this.listener.onStreamRemoved(streamIdDescMap.get(streamId));
+                }
+            }
+            this.streamIdDescMap = latestStreamIdDescMap;
+            if (added > 0 || changed > 0 || removed > 0) {
+                LOGGER.info("Loaded stream metadata: total = {}, added = {}, changed = {}, removed = {}", total, added, changed, removed);
+            } else {
+                LOGGER.debug("Loaded stream metadata: total = {}, added = {}, changed = {}, removed = {}", total, added, changed, removed);
+            }
+        }
+    }
+
+    @Override
+    public Map<String, StreamDesc> getStreamDescSnapshot() {
+        return streamIdDescMap;
+    }
+
+    public void shutdown() {
+        this.running = false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyImpl.java
new file mode 100644
index 0000000..a1d0b5e
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.utils.StreamValidationException;
+import org.apache.eagle.alert.utils.StreamValidator;
+import org.apache.eagle.app.messaging.StreamRecord;
+import org.apache.eagle.app.proxy.stream.StreamProxy;
+import org.apache.eagle.app.proxy.stream.StreamProxyProducer;
+import org.apache.eagle.metadata.model.StreamDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StreamProxyImpl implements StreamProxy {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProxyImpl.class);
+    private StreamProxyProducer proxyProducer;
+    private volatile boolean opened;
+    private String streamId;
+    private StreamValidator validator;
+
+    @Override
+    public void open(StreamDesc streamDesc) {
+        if (streamDesc.getSchema() != null) {
+            this.validator = new StreamValidator(streamDesc.getSchema());
+        }
+        this.streamId = streamDesc.getStreamId();
+        if (streamDesc.getSinkConfig() != null) {
+            this.proxyProducer = new KafkaStreamProxyProducerImpl(streamDesc.getStreamId(), streamDesc.getSinkConfig());
+        } else {
+            LOGGER.warn("Unable to initialize kafka producer because sink config is null for {}", streamId);
+            this.proxyProducer = null;
+        }
+        this.opened = true;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (this.proxyProducer != null) {
+            this.proxyProducer.close();
+        }
+        this.opened = false;
+    }
+
+    @Override
+    public void send(List<StreamRecord> events) throws IOException {
+        Preconditions.checkArgument(this.opened, "Stream proxy not opened for " + streamId);
+        Preconditions.checkNotNull(this.proxyProducer, "Stream proxy producer not initialized for " + streamId);
+        if (this.validator != null) {
+            for (StreamRecord event : events) {
+                try {
+                    this.validator.validateMap(event);
+                } catch (StreamValidationException e) {
+                    throw new IOException(e);
+                }
+            }
+        }
+        this.proxyProducer.send(events);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyManagerImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyManagerImpl.java
new file mode 100644
index 0000000..a79916c
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyManagerImpl.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.eagle.app.proxy.stream.StreamConfigUpdateListener;
+import org.apache.eagle.app.proxy.stream.StreamMetadataUpdateService;
+import org.apache.eagle.app.proxy.stream.StreamProxy;
+import org.apache.eagle.app.proxy.stream.StreamProxyManager;
+import org.apache.eagle.app.proxy.exception.StreamProxyException;
+import org.apache.eagle.metadata.model.StreamDesc;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+@Singleton
+public class StreamProxyManagerImpl implements StreamProxyManager, StreamConfigUpdateListener {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProxyManagerImpl.class);
+    private final StreamMetadataUpdateService streamMetadataUpdateService;
+    private final ConcurrentMap<String, StreamProxy> streamProxyConcurrentMap;
+
+    @Inject
+    public StreamProxyManagerImpl(ApplicationEntityService applicationEntityService) {
+        LOGGER.info("Initializing StreamProxyManager {}", this);
+        this.streamMetadataUpdateService = new StreamMetadataUpdateServiceImpl(this, applicationEntityService);
+        this.streamProxyConcurrentMap = new ConcurrentHashMap<>();
+        Thread streamMetadataUpdateServiceThread = new Thread(this.streamMetadataUpdateService);
+        streamMetadataUpdateServiceThread.setDaemon(true);
+        streamMetadataUpdateServiceThread.setName(StreamMetadataUpdateService.class.getName());
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                streamMetadataUpdateService.shutdown();
+                for (StreamProxy proxy : streamProxyConcurrentMap.values()) {
+                    try {
+                        proxy.close();
+                    } catch (IOException e) {
+                        LOGGER.error("Error to close {}: {}", proxy, e.getMessage(), e);
+                    }
+                }
+            }
+        });
+        streamMetadataUpdateServiceThread.start();
+    }
+
+    @Override
+    public StreamProxy getStreamProxy(String streamId) throws StreamProxyException {
+        if (!streamMetadataUpdateService.getStreamDescSnapshot().containsKey(streamId)) {
+            throw new StreamProxyException("Stream ID: " + streamId + " not found");
+        }
+        if (!streamProxyConcurrentMap.containsKey(streamId)) {
+            throw new StreamProxyException("Not stream proxy instance initialized for " + streamId);
+        }
+        return streamProxyConcurrentMap.get(streamId);
+    }
+
+    @Override
+    public Collection<StreamDesc> getAllStreamDesc() {
+        return streamMetadataUpdateService.getStreamDescSnapshot().values();
+    }
+
+    @Override
+    public void onStreamAdded(StreamDesc streamDesc) {
+        if (streamProxyConcurrentMap.containsKey(streamDesc.getStreamId())) {
+            LOGGER.warn("Adding already existing stream proxy {}", streamDesc.getStreamId());
+            this.onStreamChanged(streamDesc);
+            return;
+        }
+        LOGGER.info("Adding stream proxy {}", streamDesc.getStreamId());
+        StreamProxy proxy = new StreamProxyImpl();
+        proxy.open(streamDesc);
+        streamProxyConcurrentMap.put(streamDesc.getStreamId(), proxy);
+    }
+
+    @Override
+    public void onStreamChanged(StreamDesc streamDesc) {
+        if (!streamProxyConcurrentMap.containsKey(streamDesc.getStreamId())) {
+            LOGGER.warn("Updating non-existing stream proxy {}", streamDesc.getStreamId());
+            this.onStreamAdded(streamDesc);
+            return;
+        }
+        LOGGER.info("Updating stream proxy {}", streamDesc.getStreamId());
+        try {
+            LOGGER.info("Closing old stream proxy {}", streamDesc.getStreamId());
+            streamProxyConcurrentMap.get(streamDesc.getStreamId()).close();
+        } catch (IOException e) {
+            LOGGER.error("Unable to close {}", streamProxyConcurrentMap.get(streamDesc.getStreamId()));
+        } finally {
+            LOGGER.info("Adding stream proxy {}", streamDesc.getStreamId());
+            StreamProxyImpl proxy = new StreamProxyImpl();
+            proxy.open(streamDesc);
+            streamProxyConcurrentMap.put(streamDesc.getStreamId(), proxy);
+        }
+    }
+
+    @Override
+    public void onStreamRemoved(StreamDesc streamDesc) {
+        LOGGER.info("Removing stream proxy {}", streamDesc.getStreamId());
+        if (streamProxyConcurrentMap.containsKey(streamDesc.getStreamId())) {
+            try {
+                streamProxyConcurrentMap.get(streamDesc.getStreamId()).close();
+            } catch (IOException e) {
+                LOGGER.error("Unable to close {}", streamProxyConcurrentMap.get(streamDesc.getStreamId()));
+            }
+        } else {
+            LOGGER.warn("Unable to remove stream proxy {}, because not exist", streamDesc.getStreamId());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/test/java/org/apache/eagle/app/proxy/stream/StreamProxyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/test/java/org/apache/eagle/app/proxy/stream/StreamProxyTest.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/test/java/org/apache/eagle/app/proxy/stream/StreamProxyTest.java
new file mode 100644
index 0000000..194ba39
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/test/java/org/apache/eagle/app/proxy/stream/StreamProxyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.proxy.stream;
+
+import com.google.inject.Inject;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.messaging.KafkaStreamProvider;
+import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
+import org.apache.eagle.app.messaging.StreamRecord;
+import org.apache.eagle.app.proxy.stream.impl.StreamProxyImpl;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.apache.eagle.app.test.KafkaTestServer;
+import org.apache.eagle.metadata.model.StreamDesc;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class StreamProxyTest extends ApplicationTestBase {
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Inject
+    private StreamProxyManager proxyManager;
+    private KafkaTestServer kafkaTestServer;
+    private StreamDesc streamDesc;
+
+    @Before
+    public void before() throws Exception {
+        kafkaTestServer = KafkaTestServer.create(temporaryFolder.newFolder());
+        kafkaTestServer.start();
+        this.streamDesc = new StreamDesc();
+        this.streamDesc.setStreamId("TEST_METRIC_STREAM");
+        KafkaStreamSinkConfig sinkConfig = new KafkaStreamProvider().getSinkConfig("TEST_METRIC_STREAM", ConfigFactory.load());
+        this.streamDesc.setSinkConfig(sinkConfig);
+    }
+
+    @After
+    public void after() throws IOException {
+        kafkaTestServer.stop();
+    }
+
+    @Test
+    public void testProxyManagerInjection() {
+        Assert.assertNotNull(proxyManager);
+    }
+
+    @Test
+    public void testStreamProxyProduce() throws IOException {
+        StreamProxy streamProxy = new StreamProxyImpl();
+        streamProxy.open(streamDesc);
+        streamProxy.send(Collections.singletonList(new StreamRecord() {
+            {
+                put("metric", "DiskUsage");
+                put("host", "localhost");
+                put("value", 98);
+            }
+        }));
+        streamProxy.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-streamproxy/src/test/resources/application.conf
new file mode 100644
index 0000000..cf43880
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/test/resources/application.conf
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "application":{
+    "stream": {
+      "provider": "org.apache.eagle.app.messaging.KafkaStreamProvider"
+    }
+    "provider":{
+      "loader": "org.apache.eagle.app.service.impl.ApplicationProviderSPILoader"
+    }
+  }
+  "appId":"test_topology_name"
+  "spoutNum": 3
+  "loaded": true
+  "mode":"LOCAL"
+  "dataSinkConfig": {
+    "topic" : "test_topic",
+    "brokerList" : "sandbox.hortonworks.com:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/pom.xml b/eagle-core/eagle-app/pom.xml
index b37c31f..f0677fe 100644
--- a/eagle-core/eagle-app/pom.xml
+++ b/eagle-core/eagle-app/pom.xml
@@ -34,6 +34,7 @@
     <modules>
         <module>eagle-app-base</module>
         <module>eagle-app-utils</module>
+        <module>eagle-app-streamproxy</module>
     </modules>
 
     <build>

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java
index 7aaade3..1176bd9 100644
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java
@@ -29,6 +29,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -212,7 +213,7 @@ public class RESTResponse<T> {
             return this;
         }
 
-        private void runAsync(CompletableFuture future) {
+        private void runAsync(Future future) {
             try {
                 future.get();
             } catch (InterruptedException ex) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
index 0c49d82..05c874d 100644
--- a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
+++ b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
@@ -43,7 +43,7 @@ public class HadoopMetricMonitorApp extends StormApplication {
                         .namedByField("metric")
                         .eventTimeByField("timestamp")
                         .dimensionFields("host", "component", "site")
-                        .granularity(Calendar.MINUTE)
+                        .granularity(Calendar.SECOND)
                         .valueField("value"))
                 .fromStream("SYSTEM_METRIC_STREAM")
                 .saveAsMetric(MetricDescriptor.metricGroupByField("group")
@@ -51,7 +51,7 @@ public class HadoopMetricMonitorApp extends StormApplication {
                         .namedByField("metric")
                         .eventTimeByField("timestamp")
                         .dimensionFields("host", "group", "site", "device")
-                        .granularity(Calendar.MINUTE)
+                        .granularity(Calendar.SECOND)
                         .valueField("value")
                 )
                 .toTopology();

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
index eff1a70..ef13f23 100644
--- a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+++ b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
@@ -110,7 +110,7 @@
         <stream>
             <streamId>HADOOP_JMX_METRIC_STREAM</streamId>
             <description>Hadoop JMX Metric Stream including name node, resource manager, etc.</description>
-            <group>Hadoop Metric</group>
+            <group>hadoop metrics</group>
             <columns>
                 <column>
                     <name>host</name>
@@ -142,7 +142,7 @@
         <stream>
             <streamId>SYSTEM_METRIC_STREAM</streamId>
             <description>System Metrics Stream including CPU, Network, Disk, etc.</description>
-            <group>System Metric</group>
+            <group>system metrics</group>
             <columns>
                 <column>
                     <name>host</name>
@@ -178,7 +178,7 @@
         <stream>
             <streamId>HADOOP_JMX_RESOURCE_STREAM</streamId>
             <description>Hadoop JMX Resource Stream including name node, resource manager, etc.</description>
-            <group>Hadoop Metric</group>
+            <group>hadoop metrics</group>
             <columns>
                 <column>
                     <name>host</name>

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index 77e974a..8db44cd 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -80,6 +80,17 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-streamproxy</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-common</artifactId>
             <version>${project.version}</version>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java b/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java
index dffc197..81a10c8 100644
--- a/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java
+++ b/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java
@@ -24,7 +24,6 @@ import com.sun.jersey.core.util.Priority;
 import com.sun.jersey.spi.container.ContainerRequest;
 import com.sun.jersey.spi.container.ContainerRequestFilter;
 import io.dropwizard.auth.Auth;
-import io.dropwizard.auth.AuthenticationException;
 import io.dropwizard.auth.Authenticator;
 import io.dropwizard.auth.basic.BasicCredentials;
 import org.apache.eagle.common.rest.RESTResponse;
@@ -87,7 +86,12 @@ public class BasicAuthRequestFilter implements ContainerRequestFilter {
         .status(false, Response.Status.UNAUTHORIZED)
         .build();
 
-    private static final Response ALL_ACCESS_DENIED = RESTResponse.builder()
+    private static final Response INVALID_ACCESS_FORBIDDEN = RESTResponse.builder()
+        .message("Access denied, invalid username or password")
+        .status(false, Response.Status.FORBIDDEN)
+        .build();
+
+    private static final Response ALL_ACCESS_FORBIDDEN = RESTResponse.builder()
         .message("Access denied")
         .status(false, Response.Status.FORBIDDEN)
         .build();
@@ -101,7 +105,7 @@ public class BasicAuthRequestFilter implements ContainerRequestFilter {
             //Access denied for all
 
             if (hasDenyAllAnnotation) {
-                throw new WebApplicationException(ALL_ACCESS_DENIED);
+                throw new WebApplicationException(ALL_ACCESS_FORBIDDEN);
             }
 
             //Get request headers
@@ -175,8 +179,10 @@ public class BasicAuthRequestFilter implements ContainerRequestFilter {
                         }
                     }
                 } else {
-                    throw new WebApplicationException(UNAUTHORIZED_ACCESS_DENIED);
+                    throw new WebApplicationException(INVALID_ACCESS_FORBIDDEN);
                 }
+            } else {
+                throw new WebApplicationException(UNAUTHORIZED_ACCESS_DENIED);
             }
         } catch (WebApplicationException e) {
             throw e;

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/webapp/app/dev/index.html
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/index.html b/eagle-server/src/main/webapp/app/dev/index.html
index e2547bf..e9c48a6 100644
--- a/eagle-server/src/main/webapp/app/dev/index.html
+++ b/eagle-server/src/main/webapp/app/dev/index.html
@@ -91,40 +91,23 @@
 								<a>
 									<span ng-if="!Site.current()">
 										<span class="fa fa-home"></span>
-										Overview
+										Sites Overview
 									</span>
 									<span ng-if="Site.current()">
 										<span class="fa fa-server"></span>
-										{{Site.current().siteName || Site.current().siteId}}
+										Site: {{Site.current().siteName || Site.current().siteId}}
 									</span>
 								</a>
 
 								<ul class="dropdown-menu">
-									<li><a ng-click="Site.switchSite()"><span class="fa fa-home"></span> Overview</a></li>
+									<li><a ng-click="Site.switchSite()"><span class="fa fa-home"></span> Sites Overview</a></li>
 									<li ng-repeat="site in Site.list track by $index">
 										<a ng-click="Site.switchSite(site)">
-											<span class="fa fa-server"></span> {{site.siteName || site.siteId}}
+											<span class="fa fa-server"></span> Site: {{site.siteName || site.siteId}}
 										</a>
 									</li>
 								</ul>
 							</li>
-
-							<!-- FAQ -->
-							<li>
-								<a data-toggle="dropdown" aria-expanded="false">
-									<i class="glyphicon glyphicon-question-sign"></i>
-								</a>
-
-								<ul class="dropdown-menu">
-									<li><a>How to start using eagle</a></li>
-									<li><a>How to register new site</a></li>
-									<li><a>How to install application</a></li>
-									<li><a>How to manage application</a></li>
-									<li><a>How to develop application</a></li>
-									<li><a ui-sref="metricPreview()">Preview eagle metric</a></li>
-								</ul>
-							</li>
-
 							<!-- Notification -->
 							<li class="hover-dropdown">
 								<a>
@@ -146,6 +129,21 @@
 								</ul>
 							</li>
 
+							<!-- FAQ -->
+							<li>
+								<a data-toggle="dropdown" aria-expanded="false">
+									<i class="glyphicon glyphicon-question-sign"></i>
+								</a>
+
+								<ul class="dropdown-menu">
+									<li><a>How to start using eagle</a></li>
+									<li><a>How to register new site</a></li>
+									<li><a>How to install application</a></li>
+									<li><a>How to manage application</a></li>
+									<li><a>How to develop application</a></li>
+									<li><a ui-sref="metricPreview()">Preview eagle metric</a></li>
+								</ul>
+							</li>
 							<!-- Auth -->
 							<li ng-if="!Auth.isLogin">
 								<a ui-sref="login">Login</a>

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/webapp/app/dev/partials/alert/list.html
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/partials/alert/list.html b/eagle-server/src/main/webapp/app/dev/partials/alert/list.html
index 9be9a56..371248f 100644
--- a/eagle-server/src/main/webapp/app/dev/partials/alert/list.html
+++ b/eagle-server/src/main/webapp/app/dev/partials/alert/list.html
@@ -20,7 +20,7 @@
 	<div class="box-header with-border">
 		<span class="fa fa-bell"></span>
 		<h3 class="box-title">
-			Alert List
+			Alert Incidents
 		</h3>
 	</div>
 	<div class="box-body">

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js
index 9871ab7..88b44cd 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js
@@ -25,7 +25,7 @@
 	// =                                        Alert                                       =
 	// ======================================================================================
 	eagleControllers.controller('alertListCtrl', function ($scope, $wrapState, PageConfig, CompatibleEntity, Time) {
-		PageConfig.title = "Alerts";
+		PageConfig.title = "Alert Incidents";
 		$scope.site = $wrapState.param.siteId;
 
 		$scope.alertList = [];
@@ -86,7 +86,7 @@
 	// =                                       Stream                                       =
 	// ======================================================================================
 	eagleControllers.controller('alertStreamListCtrl', function ($scope, $wrapState, PageConfig, Application, Entity) {
-		PageConfig.title = "Streams";
+		PageConfig.title = "Alert Streams";
 
 		$scope.streamList = [];
 		$scope.site = $wrapState.param.siteId;
@@ -119,7 +119,7 @@
 	// =                                       Policy                                       =
 	// ======================================================================================
 	eagleControllers.controller('policyListCtrl', function ($scope, $wrapState, PageConfig, Entity, Policy) {
-		PageConfig.title = "Policies";
+		PageConfig.title = "Alert Policies";
 		$scope.loading = false;
 
 		$scope.policyList = [];

http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js
index 97b4704..03dfef9 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js
@@ -25,7 +25,7 @@
 	// =                                        Home                                        =
 	// ======================================================================================
 	eagleControllers.controller('homeCtrl', function ($scope, $wrapState, PageConfig) {
-		PageConfig.title = "Overveiw";
+		PageConfig.title = "Overview";
 
 		$scope.colorList = [
 			"bg-aqua",


Mime
View raw message