eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject incubator-eagle git commit: [EAGLE-397] Implement KafkaStreamSink with type safe StreamSinkDesc
Date Wed, 27 Jul 2016 08:29:51 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop f1a93b1bb -> 043a103fd


[EAGLE-397] Implement KafkaStreamSink with type safe StreamSinkDesc

https://issues.apache.org/jira/browse/EAGLE-397

Author: Hao Chen <hao@apache.org>

Closes #278 from haoch/EAGLE-397.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/043a103f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/043a103f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/043a103f

Branch: refs/heads/develop
Commit: 043a103fd6137fd9367e5ad520fb19fffb954469
Parents: f1a93b1
Author: Hao Chen <hao@apache.org>
Authored: Wed Jul 27 16:29:32 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Wed Jul 27 16:29:32 2016 +0800

----------------------------------------------------------------------
 .../apache/eagle/app/ApplicationContext.java    | 11 ++---
 .../eagle/app/sink/AbstractStreamSink.java      |  5 ++-
 .../eagle/app/sink/DefaultStreamSinkDesc.java   | 42 ++++++++++++++++++
 .../apache/eagle/app/sink/KafkaStreamSink.java  | 29 ++++++------
 .../eagle/app/sink/KafkaStreamSinkDesc.java     | 46 ++++++++++++++++++++
 .../eagle/app/sink/LoggingStreamSink.java       | 20 +++------
 .../org/apache/eagle/app/sink/StreamSink.java   | 11 ++---
 .../app/spi/AbstractApplicationProvider.java    |  6 +--
 .../apache/eagle/metadata/model/StreamDesc.java | 31 +++++--------
 .../eagle/metadata/model/StreamSinkDesc.java    | 25 +++--------
 .../app/example/ExampleApplicationTest.java     |  1 -
 11 files changed, 140 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
index dc2a456..cf8646c 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
@@ -18,7 +18,6 @@ package org.apache.eagle.app;
 
 import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.app.sink.AbstractStreamSink;
 import org.apache.eagle.app.sink.StreamSink;
 import org.apache.eagle.app.sink.mapper.*;
 import org.apache.eagle.app.spi.ApplicationProvider;
@@ -77,12 +76,10 @@ public class ApplicationContext implements Serializable, ApplicationLifecycleLis
         if(null != outputStreams){
             outputStreams.forEach((stream) -> {
                 try {
-                    StreamSink streamSink = (StreamSink) sinkClass.newInstance();
-                    streamSink.init(stream,this);
                     StreamDesc streamDesc = new StreamDesc();
-                    streamDesc.setStreamSchema(stream);
-                    streamDesc.setSinkContext(streamSink.getSinkContext());
-                    streamDesc.setSinkType(sinkClass);
+                    StreamSink streamSink = (StreamSink) sinkClass.newInstance();
+                    streamDesc.setSink(streamSink.init(stream,this));
+                    streamDesc.setSchema(stream);
                     streamDesc.setStreamId(stream.getStreamId());
                     streamDescMap.put(streamDesc.getStreamId(),streamDesc);
                     streamDefinitionMap.put(streamDesc.getStreamId(),stream);
@@ -113,7 +110,7 @@ public class ApplicationContext implements Serializable, ApplicationLifecycleLis
         checkStreamExists(streamId);
         Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
         try {
-            AbstractStreamSink abstractStreamSink = (AbstractStreamSink) sinkClass.newInstance();
+            StreamSink abstractStreamSink = (StreamSink) sinkClass.newInstance();
             abstractStreamSink.setEventMapper(mapper);
             abstractStreamSink.init(streamDefinitionMap.get(streamId),this);
             return abstractStreamSink;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
index 322ff8b..8280b5b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
@@ -23,19 +23,20 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.app.sink.mapper.StreamEventMapper;
+import org.apache.eagle.metadata.model.StreamSinkDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 
-public abstract class AbstractStreamSink extends StreamSink {
+public abstract class AbstractStreamSink<K extends StreamSinkDesc> extends StreamSink<K>
{
     private final static Logger LOG = LoggerFactory.getLogger(AbstractStreamSink.class);
     private final static String KEY_FIELD = "KEY";
     private final static String VALUE_FIELD = "VALUE";
     private StreamEventMapper streamEventMapper;
 
-    public AbstractStreamSink setEventMapper(StreamEventMapper streamEventMapper){
+    public AbstractStreamSink<K> setEventMapper(StreamEventMapper streamEventMapper){
         this.streamEventMapper = streamEventMapper;
         return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkDesc.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkDesc.java
new file mode 100644
index 0000000..05aa8c0
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkDesc.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import org.apache.eagle.metadata.model.StreamSinkDesc;
+
+public class DefaultStreamSinkDesc implements StreamSinkDesc {
+    private final Class<?> streamPersistClass;
+    private final static String NONE_STORAGE_TYPE = "NONE";
+
+    public DefaultStreamSinkDesc(Class<?> streamPersistClass){
+        this.streamPersistClass = streamPersistClass;
+    }
+
+    @Override
+    public String getType() {
+        return NONE_STORAGE_TYPE;
+    }
+
+    public Class<?> getSinkClass() {
+        return streamPersistClass;
+    }
+
+    @Override
+    public Class<? extends StreamSinkDesc> getDescClass() {
+        return DefaultStreamSinkDesc.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index 1490368..5f8c176 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -23,19 +23,20 @@ import org.apache.eagle.app.ApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 
-public class KafkaStreamSink extends AbstractStreamSink {
+public class KafkaStreamSink extends AbstractStreamSink<KafkaStreamSinkDesc> {
     private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
     private String topicId;
 
     @Override
-    public void init(StreamDefinition streamDefinition, ApplicationContext context) {
-        this.topicId = String.format("EAGLE_TOPIC_%s_%s_%s",
+    public KafkaStreamSinkDesc init(StreamDefinition streamDefinition, ApplicationContext
context) {
+        this.topicId = String.format("EAGLE.%s.%s",
                 context.getAppEntity().getSite().getSiteId(),
-                context.getAppEntity().getDescriptor().getType(),
-                streamDefinition.getStreamId());
+                streamDefinition.getStreamId()).toLowerCase();
+        KafkaStreamSinkDesc desc = new KafkaStreamSinkDesc();
+        desc.setTopicId(topicId);
+        return desc;
     }
 
     @Override
@@ -49,14 +50,14 @@ public class KafkaStreamSink extends AbstractStreamSink {
         LOGGER.info("TODO: producing {} to '{}'",streamEvent,topicId);
     }
 
-    @Override
-    public Map<String, Object> getSinkContext() {
-        return new HashMap<String,Object>(){
-            {
-                put("kafka.topic",KafkaStreamSink.this.topicId);
-            }
-        };
-    }
+//    @Override
+//    public Map<String, Object> getSink() {
+//        return new HashMap<String,Object>(){
+//            {
+//                put("kafka.topic",KafkaStreamSink.this.topicId);
+//            }
+//        };
+//    }
 
     @Override
     public void onAppInstall() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkDesc.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkDesc.java
new file mode 100644
index 0000000..440f67c
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkDesc.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import org.apache.eagle.metadata.model.StreamSinkDesc;
+
+public class KafkaStreamSinkDesc implements StreamSinkDesc {
+    private String topicId;
+
+    public String getTopicId() {
+        return topicId;
+    }
+
+    public void setTopicId(String topicId) {
+        this.topicId = topicId;
+    }
+
+    @Override
+    public String getType() {
+        return "KAFKA";
+    }
+
+    @Override
+    public Class<?> getSinkClass() {
+        return KafkaStreamSink.class;
+    }
+
+    @Override
+    public Class<? extends StreamSinkDesc> getDescClass() {
+        return KafkaStreamSinkDesc.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index 8acf325..64c019e 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -22,28 +22,15 @@ import org.apache.eagle.app.ApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
-public class LoggingStreamSink extends AbstractStreamSink {
+public class LoggingStreamSink extends AbstractStreamSink<DefaultStreamSinkDesc> {
     private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
 
     @Override
-    public void init(StreamDefinition streamDefinition, ApplicationContext context) {
-        // do nothing
-    }
-
-    @Override
     protected void onEvent(StreamEvent streamEvent) {
         LOGGER.info("Receiving {}",streamEvent);
     }
 
     @Override
-    public Map<String, Object> getSinkContext() {
-        return new HashMap<>();
-    }
-
-    @Override
     public void onAppInstall() {
         LOGGER.info("Executing onAppInstall callback, do nothing");
     }
@@ -52,4 +39,9 @@ public class LoggingStreamSink extends AbstractStreamSink {
     public void onAppUninstall() {
         LOGGER.info("Executing onAppUninstall callback, do nothing");
     }
+
+    @Override
+    public DefaultStreamSinkDesc init(StreamDefinition streamDefinition, ApplicationContext
context) {
+        return new DefaultStreamSinkDesc(LoggingStreamSink.class);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
index 2052484..e98e8bd 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
@@ -20,18 +20,19 @@ import backtype.storm.topology.base.BaseBasicBolt;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.app.ApplicationContext;
 import org.apache.eagle.app.ApplicationLifecycleListener;
+import org.apache.eagle.app.sink.mapper.StreamEventMapper;
+import org.apache.eagle.metadata.model.StreamSinkDesc;
 
-import java.util.Map;
-
-public abstract class StreamSink extends BaseBasicBolt implements ApplicationLifecycleListener
{
+public abstract class StreamSink<T extends StreamSinkDesc> extends BaseBasicBolt implements
ApplicationLifecycleListener {
     /**
      * Should only initialize metadata in this method but must not open any resource like
connection
      *
      * @param streamDefinition
      * @param context
      */
-    public abstract void init(StreamDefinition streamDefinition, ApplicationContext context);
-    public abstract Map<String,Object> getSinkContext();
+    public abstract T init(StreamDefinition streamDefinition, ApplicationContext context);
+
+    public abstract StreamSink<T> setEventMapper(StreamEventMapper streamEventMapper);
 
     @Override
     public void onAppStart() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 8d61066..37311eb 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -22,7 +22,7 @@ import org.apache.eagle.app.Application;
 import org.apache.eagle.app.config.ApplicationProviderConfig;
 import org.apache.eagle.app.config.ApplicationProviderDescConfig;
 import org.apache.eagle.app.sink.KafkaStreamSink;
-import org.apache.eagle.app.sink.AbstractStreamSink;
+import org.apache.eagle.app.sink.StreamSink;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 import org.apache.eagle.metadata.model.ApplicationDocs;
 import org.apache.eagle.metadata.model.Configuration;
@@ -78,8 +78,8 @@ public abstract class AbstractApplicationProvider<T extends Application>
impleme
                 envConfig.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_SINK_TYPE;
         try {
             Class<?> sinkClass = Class.forName(sinkClassName);
-            if(!AbstractStreamSink.class.isAssignableFrom(sinkClass)){
-                throw new IllegalStateException(sinkClassName+ "is not assignable from "+AbstractStreamSink.class.getCanonicalName());
+            if(!StreamSink.class.isAssignableFrom(sinkClass)){
+                throw new IllegalStateException(sinkClassName+ "is not assignable from "+StreamSink.class.getCanonicalName());
             }
             applicationDesc.setSinkClass(sinkClass);
         } catch (ClassNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
index c9f1e7f..65b0dbf 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
@@ -18,13 +18,10 @@ package org.apache.eagle.metadata.model;
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
-import java.util.Map;
-
 public class StreamDesc {
     private String streamId;
-    private StreamDefinition streamSchema;
-    private Class<?> sinkType;
-    private Map<String,Object> sinkContext;
+    private StreamDefinition schema;
+    private StreamSinkDesc sink;
 
     public String getStreamId() {
         return streamId;
@@ -34,27 +31,19 @@ public class StreamDesc {
         this.streamId = streamId;
     }
 
-    public StreamDefinition getStreamSchema() {
-        return streamSchema;
-    }
-
-    public void setStreamSchema(StreamDefinition streamSchema) {
-        this.streamSchema = streamSchema;
-    }
-
-    public Class<?> getSinkType() {
-        return sinkType;
+    public StreamDefinition getSchema() {
+        return schema;
     }
 
-    public void setSinkType(Class<?> sinkType) {
-        this.sinkType = sinkType;
+    public void setSchema(StreamDefinition streamSchema) {
+        this.schema = streamSchema;
     }
 
-    public Map<String, Object> getSinkContext() {
-        return sinkContext;
+    public StreamSinkDesc getSink() {
+        return sink;
     }
 
-    public void setSinkContext(Map<String, Object> sinkContext) {
-        this.sinkContext = sinkContext;
+    public void setSink(StreamSinkDesc sinkDesc) {
+        this.sink = sinkDesc;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
index 42d7603..10b2b67 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
@@ -16,25 +16,10 @@
  */
 package org.apache.eagle.metadata.model;
 
-import java.util.Map;
+import java.io.Serializable;
 
-public class StreamSinkDesc {
-    private Class<?> sinkType;
-    private Map<String,Object> sinkContext;
-
-    public Class<?> getSinkType() {
-        return sinkType;
-    }
-
-    public void setSinkType(Class<?> sinkType) {
-        this.sinkType = sinkType;
-    }
-
-    public Map<String, Object> getSinkContext() {
-        return sinkContext;
-    }
-
-    public void setSinkContext(Map<String, Object> sinkContext) {
-        this.sinkContext = sinkContext;
-    }
+public interface StreamSinkDesc extends Serializable {
+    String getType();
+    Class<?> getSinkClass();
+    Class<? extends StreamSinkDesc> getDescClass();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/043a103f/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
index f29c6ff..51b8c60 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
@@ -94,7 +94,6 @@ public class ExampleApplicationTest {
         simulator.submit(ExampleApplicationProvider2.class);
     }
 
-
     /**
      * For DEBUG
      * @param args


Mime
View raw message