apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pra...@apache.org
Subject apex-core git commit: APEXCORE-649 Infrastructure for user define stram event listeners.
Date Fri, 24 Mar 2017 08:43:21 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 16d1bf62d -> 5f95ee0e9


APEXCORE-649 Infrastructure for user define stram event listeners.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/5f95ee0e
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/5f95ee0e
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/5f95ee0e

Branch: refs/heads/master
Commit: 5f95ee0e954fb505f4a0022fdf9ecdc0a80df6ca
Parents: 16d1bf6
Author: Tushar R. Gosavi <tushar@apache.org>
Authored: Tue Mar 21 12:39:43 2017 +0530
Committer: Tushar R. Gosavi <tushar@apache.org>
Committed: Fri Mar 24 13:57:02 2017 +0530

----------------------------------------------------------------------
 .../java/org/apache/apex/api/ApexPlugin.java    |  29 +++
 .../org/apache/apex/api/ApexPluginContext.java  |  29 +++
 .../org/apache/apex/api/DAGSetupPlugin.java     |   6 +-
 .../stram/StreamingAppMasterService.java        |  23 ++-
 .../stram/StreamingContainerManager.java        |  18 ++
 .../plan/logical/DAGSetupPluginManager.java     |  21 +-
 .../apex/engine/api/DAGExecutionPlugin.java     |  44 +++++
 .../engine/api/DAGExecutionPluginContext.java   |  89 +++++++++
 .../apache/apex/engine/api/PluginLocator.java   |  39 ++++
 .../AbstractDAGExecutionPluginContext.java      | 139 ++++++++++++++
 .../engine/plugin/ApexPluginDispatcher.java     |  27 +++
 .../apex/engine/plugin/ApexPluginManager.java   | 191 +++++++++++++++++++
 .../plugin/DefaultApexPluginDispatcher.java     | 123 ++++++++++++
 .../engine/plugin/NoOpApexPluginDispatcher.java |  36 ++++
 .../plugin/loaders/ChainedPluginLocator.java    |  58 ++++++
 .../loaders/PropertyBasedPluginLocator.java     |  65 +++++++
 .../ServiceLoaderBasedPluginLocator.java        |  48 +++++
 .../plugin/loaders/StaticPluginLocator.java     |  46 +++++
 .../apache/apex/engine/plugin/DebugPlugin.java  | 114 +++++++++++
 .../apache/apex/engine/plugin/NoOpPlugin.java   |  42 ++++
 .../apache/apex/engine/plugin/PluginTests.java  | 130 +++++++++++++
 ...rg.apache.apex.engine.api.DAGExecutionPlugin |  19 ++
 22 files changed, 1314 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/api/src/main/java/org/apache/apex/api/ApexPlugin.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/ApexPlugin.java b/api/src/main/java/org/apache/apex/api/ApexPlugin.java
new file mode 100644
index 0000000..b9a8b78
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/ApexPlugin.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import com.datatorrent.api.Component;
+
+/**
+ * Marker interface for ApexPlugins.
+ * @param <T>
+ */
+public interface ApexPlugin<T extends ApexPluginContext> extends Component<T>
+{
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/api/src/main/java/org/apache/apex/api/ApexPluginContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/ApexPluginContext.java b/api/src/main/java/org/apache/apex/api/ApexPluginContext.java
new file mode 100644
index 0000000..1b72f63
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/ApexPluginContext.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import com.datatorrent.api.Context;
+
+/**
+ * Marker interface for Context used by ApexPlugins. Plugin interfaces with
+ * the Apex through the context.
+ */
+public interface ApexPluginContext extends Context
+{
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
index d2e7199..6c54bed 100644
--- a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
+++ b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Component;
-import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 
 /**
@@ -43,7 +41,7 @@ import com.datatorrent.api.DAG;
  * </ul>
  */
 @InterfaceStability.Evolving
-public interface DAGSetupPlugin extends Component<DAGSetupPlugin.DAGSetupPluginContext>, Serializable
+public interface DAGSetupPlugin extends ApexPlugin<DAGSetupPlugin.DAGSetupPluginContext>, Serializable
 {
 
   /**
@@ -89,7 +87,7 @@ public interface DAGSetupPlugin extends Component<DAGSetupPlugin.DAGSetupPluginC
    */
   void postValidateDAG();
 
-  public static class DAGSetupPluginContext implements Context
+  public static class DAGSetupPluginContext implements ApexPluginContext
   {
     private final DAG dag;
     private final Configuration conf;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index c0e09ab..a885a49 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -40,6 +40,14 @@ import javax.xml.bind.annotation.XmlElement;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.DAGExecutionPlugin;
+import org.apache.apex.engine.api.PluginLocator;
+import org.apache.apex.engine.plugin.ApexPluginDispatcher;
+import org.apache.apex.engine.plugin.DefaultApexPluginDispatcher;
+import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
+import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
+import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
@@ -151,6 +159,7 @@ public class StreamingAppMasterService extends CompositeService
   private final ClusterAppStats stats = new ClusterAppStats();
   private StramDelegationTokenManager delegationTokenManager = null;
   private AppDataPushAgent appDataPushAgent;
+  private ApexPluginDispatcher apexPluginDispatcher;
 
   public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
   {
@@ -575,10 +584,22 @@ public class StreamingAppMasterService extends CompositeService
       this.appDataPushAgent = new AppDataPushAgent(dnmgr, appContext);
       addService(this.appDataPushAgent);
     }
-    // initialize all services added above
+    initApexPluginDispatcher();
+
+    // Initialize all services added above
     super.serviceInit(conf);
   }
 
+  public static final String PLUGINS_CONF_KEY = "apex.plugin.stram.plugins";
+  private void initApexPluginDispatcher()
+  {
+    PluginLocator<DAGExecutionPlugin> locator = new ChainedPluginLocator<>(new ServiceLoaderBasedPluginLocator<>(DAGExecutionPlugin.class),
+        new PropertyBasedPluginLocator<>(DAGExecutionPlugin.class, PLUGINS_CONF_KEY));
+    apexPluginDispatcher = new DefaultApexPluginDispatcher(locator, appContext, dnmgr, stats);
+    dnmgr.apexPluginDispatcher = apexPluginDispatcher;
+    addService(apexPluginDispatcher);
+  }
+
   @Override
   protected void serviceStart() throws Exception
   {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index ee07af1..f229e80 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,6 +65,8 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.plugin.ApexPluginDispatcher;
+import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -175,6 +177,10 @@ import com.datatorrent.stram.webapp.StreamInfo;
 import net.engio.mbassy.bus.MBassador;
 import net.engio.mbassy.bus.config.BusConfiguration;
 
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.COMMIT_EVENT;
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.HEARTBEAT;
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.STRAM_EVENT;
+
 /**
  * Tracks topology provisioning/allocation to containers<p>
  * <br>
@@ -231,6 +237,7 @@ public class StreamingContainerManager implements PlanContext
   private final ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap = new ConcurrentSkipListMap<>();
   private final ConcurrentMap<PTOperator, PTOperator> slowestUpstreamOp = new ConcurrentHashMap<>();
   private long committedWindowId;
+  private long lastCommittedWindowId = Checkpoint.INITIAL_CHECKPOINT.getWindowId();
   // (operator id, port name) to timestamp
   private final Map<Pair<Integer, String>, Long> operatorPortLastEndWindowTimestamps = Maps.newConcurrentMap();
   private final Map<Integer, Long> operatorLastEndWindowTimestamps = Maps.newConcurrentMap();
@@ -252,6 +259,7 @@ public class StreamingContainerManager implements PlanContext
 
   //logical operator name to latest counters. exists for backward compatibility.
   private final Map<String, Object> latestLogicalCounters = Maps.newHashMap();
+  public transient ApexPluginDispatcher apexPluginDispatcher = new NoOpApexPluginDispatcher();
 
   private final LinkedHashMap<String, ContainerInfo> completedContainers = new LinkedHashMap<String, ContainerInfo>()
   {
@@ -807,6 +815,10 @@ public class StreamingContainerManager implements PlanContext
     processEvents();
 
     committedWindowId = updateCheckpoints(waitForRecovery);
+    if (lastCommittedWindowId != committedWindowId) {
+      apexPluginDispatcher.dispatch(COMMIT_EVENT, committedWindowId);
+      lastCommittedWindowId = committedWindowId;
+    }
     calculateEndWindowStats();
     if (this.vars.enableStatsRecording) {
       recordStats(currentTms);
@@ -1802,6 +1814,7 @@ public class StreamingContainerManager implements PlanContext
     rsp.stackTraceRequired = sca.stackTraceRequested;
     sca.stackTraceRequested = false;
 
+    apexPluginDispatcher.dispatch(HEARTBEAT, heartbeat);
     return rsp;
   }
 
@@ -2394,6 +2407,7 @@ public class StreamingContainerManager implements PlanContext
   @Override
   public void recordEventAsync(StramEvent ev)
   {
+    apexPluginDispatcher.dispatch(STRAM_EVENT, ev);
     if (eventBus != null) {
       eventBus.publishAsync(ev);
     }
@@ -3299,4 +3313,8 @@ public class StreamingContainerManager implements PlanContext
     return latestLogicalCounters.get(operatorName);
   }
 
+  public void setApexPluginDispatcher(ApexPluginDispatcher manager)
+  {
+    this.apexPluginDispatcher = manager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
index ad37071..2f1a904 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
@@ -24,10 +24,9 @@ import java.util.List;
 import org.slf4j.Logger;
 
 import org.apache.apex.api.DAGSetupPlugin;
+import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.stram.StramUtils;
-
 import static org.slf4j.LoggerFactory.getLogger;
 
 public class DAGSetupPluginManager
@@ -37,7 +36,7 @@ public class DAGSetupPluginManager
   private final transient List<DAGSetupPlugin> plugins = new ArrayList<>();
   private Configuration conf;
 
-  public static final String DAGSETUP_PLUGINS_CONF_KEY = "org.apache.apex.api";
+  public static final String DAGSETUP_PLUGINS_CONF_KEY = "apex.plugin.dag.setup";
   private DAGSetupPlugin.DAGSetupPluginContext contex;
 
   private void loadVisitors(Configuration conf)
@@ -47,20 +46,8 @@ public class DAGSetupPluginManager
       return;
     }
 
-    String classNamesStr = conf.get(DAGSETUP_PLUGINS_CONF_KEY);
-    if (classNamesStr == null) {
-      return;
-    }
-    String[] classNames = classNamesStr.split(",");
-    for (String className : classNames) {
-      try {
-        Class<? extends DAGSetupPlugin> plugin = StramUtils.classForName(className, DAGSetupPlugin.class);
-        plugins.add(StramUtils.newInstance(plugin));
-        LOG.info("Found DAG setup plugin {}", plugin);
-      } catch (IllegalArgumentException e) {
-        LOG.warn("Could not load plugin {}", className);
-      }
-    }
+    PropertyBasedPluginLocator<DAGSetupPlugin> locator = new PropertyBasedPluginLocator<>(DAGSetupPlugin.class, DAGSETUP_PLUGINS_CONF_KEY);
+    this.plugins.addAll(locator.discoverPlugins(conf));
   }
 
   public void setup(DAGSetupPlugin.DAGSetupPluginContext context)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPlugin.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPlugin.java b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPlugin.java
new file mode 100644
index 0000000..5a3b5b9
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPlugin.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Component;
+
+/**
+ * An Apex plugin is a user code which runs inside Stram. The interaction
+ * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest
+ * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)}
+ *
+ * Following events are supported
+ * <ul>
+ *   <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
+ *   <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
+ *   <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
+ * </ul>
+ *
+ * A plugin should register a single handler for an event, In case multiple handlers are registered for an event,
+ * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown
+ * such as helper threads and open files.
+ */
+@InterfaceStability.Evolving
+public interface DAGExecutionPlugin extends Component<DAGExecutionPluginContext>
+{
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java
new file mode 100644
index 0000000..dc3153e
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.api;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener.BatchedOperatorStats;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.api.StramEvent;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.webapp.AppInfo;
+import com.datatorrent.stram.webapp.LogicalOperatorInfo;
+
+/**
+ * An Apex plugin is a user code which runs inside Stram. The interaction
+ * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest
+ * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)}
+ *
+ * Following events are supported
+ * <ul>
+ *   <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
+ *   <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
+ *   <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
+ * </ul>
+ *
+ * A plugin should register a single handler for an event, In case multiple handlers are registered for an event,
+ * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown
+ * such as helper threads and open files.
+ */
+@InterfaceStability.Evolving
+public interface DAGExecutionPluginContext extends Context
+{
+  class RegistrationType<T>
+  {
+  }
+
+  RegistrationType<StreamingContainerUmbilicalProtocol.ContainerHeartbeat> HEARTBEAT = new RegistrationType<>();
+  RegistrationType<StramEvent> STRAM_EVENT = new RegistrationType<>();
+  RegistrationType<Long> COMMIT_EVENT = new RegistrationType<>();
+
+  <T> void register(RegistrationType<T> type, Handler<T> handler);
+
+  interface Handler<T>
+  {
+    void handle(T data);
+  }
+
+  public StramAppContext getApplicationContext();
+
+  public AppInfo.AppStats getApplicationStats();
+
+  public Configuration getLaunchConfig();
+
+  public DAG getDAG();
+
+  public String getOperatorName(int id);
+
+  public BatchedOperatorStats getPhysicalOperatorStats(int id);
+
+  public List<LogicalOperatorInfo> getLogicalOperatorInfoList();
+
+  public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName);
+
+  public long windowIdToMillis(long windowId);
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/api/PluginLocator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/api/PluginLocator.java b/engine/src/main/java/org/apache/apex/engine/api/PluginLocator.java
new file mode 100644
index 0000000..e0f70be
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/api/PluginLocator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.api;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Interface to discover plugins during Apex Master initialization. This should return collection of
+ * objects implementing an interface of type T.
+ */
+@InterfaceStability.Evolving
+public interface PluginLocator<T>
+{
+  /**
+   * Discover list of apex plugins.
+   *
+   * @return list of apex plugins.
+   */
+  Collection<T> discoverPlugins(Configuration conf);
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
new file mode 100644
index 0000000..a92b57b
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.apex.engine.api.DAGExecutionPluginContext;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.webapp.AppInfo;
+import com.datatorrent.stram.webapp.LogicalOperatorInfo;
+
+public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionPluginContext
+{
+  private final StreamingContainerManager dnmgr;
+  private final Configuration launchConf;
+  private final StramAppContext appContext;
+  private final AppInfo.AppStats stats;
+
+  public AbstractDAGExecutionPluginContext(StramAppContext appContext, StreamingContainerManager dnmgr, AppInfo.AppStats stats, Configuration launcConf)
+  {
+    this.appContext = appContext;
+    this.dnmgr = dnmgr;
+    this.launchConf = launcConf;
+    this.stats = stats;
+  }
+
+  @Override
+  public StramAppContext getApplicationContext()
+  {
+    return appContext;
+  }
+
+  @Override
+  public AppInfo.AppStats getApplicationStats()
+  {
+    return stats;
+  }
+
+  @Override
+  public DAG getDAG()
+  {
+    return dnmgr.getLogicalPlan();
+  }
+
+  @Override
+  public String getOperatorName(int id)
+  {
+    PTOperator ptOperator = dnmgr.getPhysicalPlan().getAllOperators().get(id);
+    if (ptOperator != null) {
+      return ptOperator.getName();
+    }
+    return null;
+  }
+
+  @Override
+  public Configuration getLaunchConfig()
+  {
+    return launchConf;
+  }
+
+  @Override
+  public StatsListener.BatchedOperatorStats getPhysicalOperatorStats(int id)
+  {
+    PTOperator ptOperator = dnmgr.getPhysicalPlan().getAllOperators().get(id);
+    if (ptOperator != null) {
+      return ptOperator.stats;
+    }
+    return null;
+  }
+
+  @Override
+  public List<LogicalOperatorInfo> getLogicalOperatorInfoList()
+  {
+    return dnmgr.getLogicalOperatorInfoList();
+  }
+
+  @Override
+  public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName)
+  {
+    return dnmgr.getWindowMetrics(operatorName);
+  }
+
+  @Override
+  public long windowIdToMillis(long windowId)
+  {
+    return dnmgr.windowIdToMillis(windowId);
+  }
+
+  @Override
+  public Attribute.AttributeMap getAttributes()
+  {
+    return appContext.getAttributes();
+  }
+
+  @Override
+  public <T> T getValue(Attribute<T> key)
+  {
+    return appContext.getValue(key);
+  }
+
+  @Override
+  public void setCounters(Object counters)
+  {
+    appContext.setCounters(counters);
+  }
+
+  @Override
+  public void sendMetrics(Collection<String> metricNames)
+  {
+    appContext.sendMetrics(metricNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
new file mode 100644
index 0000000..62dd255
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import org.apache.apex.engine.api.DAGExecutionPluginContext.RegistrationType;
+import org.apache.hadoop.service.Service;
+
+public interface ApexPluginDispatcher extends Service
+{
+  <T> void dispatch(RegistrationType<T> registrationType, T data);
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
new file mode 100644
index 0000000..190cb6b
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.DAGExecutionPlugin;
+import org.apache.apex.engine.api.DAGExecutionPluginContext.Handler;
+import org.apache.apex.engine.api.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.engine.api.PluginLocator;
+import org.apache.commons.digester.plugins.PluginContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.webapp.AppInfo;
+
+/**
+ * A default implementation for ApexPluginDispatcher. It handler common tasks such as per handler
+ * registration. actual dispatching is left for classes extending from it.
+ */
+public abstract class ApexPluginManager extends AbstractService
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ApexPluginManager.class);
+  protected final Collection<DAGExecutionPlugin> plugins = Lists.newArrayList();
+  protected final StramAppContext appContext;
+  protected final StreamingContainerManager dmgr;
+  private final PluginLocator locator;
+  private final AppInfo.AppStats stats;
+  protected Configuration launchConfig;
+  protected FileContext fileContext;
+  protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>();
+  protected PluginContext pluginContext;
+
+  public ApexPluginManager(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats)
+  {
+    super(ApexPluginManager.class.getName());
+    this.locator = locator;
+    this.appContext = context;
+    this.dmgr = dmgr;
+    this.stats = stats;
+    LOG.debug("Creating apex service ");
+  }
+
+  private Configuration readLaunchConfiguration() throws IOException
+  {
+    Path appPath = new Path(appContext.getApplicationPath());
+    Path  configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME);
+    try {
+      LOG.debug("Reading launch configuration file ");
+      URI uri = appPath.toUri();
+      Configuration config = new YarnConfiguration();
+      fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
+      FSDataInputStream is = fileContext.open(configFilePath);
+      config.addResource(is);
+      LOG.debug("Read launch configuration");
+      return config;
+    } catch (FileNotFoundException ex) {
+      LOG.warn("Configuration file not found {}", configFilePath);
+      return new Configuration();
+    }
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception
+  {
+    super.serviceInit(conf);
+    this.launchConfig = readLaunchConfiguration();
+    if (locator != null) {
+      Collection<DAGExecutionPlugin> plugins = locator.discoverPlugins(this.launchConfig);
+      if (plugins != null) {
+        this.plugins.addAll(plugins);
+        for (DAGExecutionPlugin plugin : plugins) {
+          LOG.info("Detected plugin {}", plugin);
+        }
+      }
+    }
+
+    for (DAGExecutionPlugin plugin : plugins) {
+      plugin.setup(new PluginManagerImpl(plugin));
+    }
+  }
+
+  @Override
+  protected void serviceStop() throws Exception
+  {
+    for (DAGExecutionPlugin plugin : plugins) {
+      plugin.teardown();
+    }
+    super.serviceStop();
+  }
+
+  /**
+   * Keeps information about plugin and its registrations. Dispatcher use this
+   * information while delivering events to plugin.
+   */
+  class PluginInfo
+  {
+    private final DAGExecutionPlugin plugin;
+    private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>();
+
+    <T> void put(RegistrationType<T> registrationType, Handler<T> handler)
+    {
+      registrationMap.put(registrationType, handler);
+    }
+
+    <T> Handler<T> get(RegistrationType<T> registrationType)
+    {
+      return (Handler<T>)registrationMap.get(registrationType);
+    }
+
+    public PluginInfo(DAGExecutionPlugin plugin)
+    {
+      this.plugin = plugin;
+    }
+
+    public DAGExecutionPlugin getPlugin()
+    {
+      return plugin;
+    }
+  }
+
+  PluginInfo getPluginInfo(DAGExecutionPlugin plugin)
+  {
+    PluginInfo pInfo = pluginInfoMap.get(plugin);
+    if (pInfo == null) {
+      pInfo = new PluginInfo(plugin);
+      pluginInfoMap.put(plugin, pInfo);
+    }
+    return pInfo;
+  }
+
+  public <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner)
+  {
+    PluginInfo pInfo = getPluginInfo(owner);
+    pInfo.put(type, handler);
+  }
+
+  /**
+   * A wrapper PluginManager to track registration from a plugin. with this plugin
+   * don't need to pass explicit owner argument during registration.
+   */
+  class PluginManagerImpl extends AbstractDAGExecutionPluginContext
+  {
+    private final DAGExecutionPlugin owner;
+
+    PluginManagerImpl(DAGExecutionPlugin plugin)
+    {
+      super(appContext, dmgr, stats, launchConfig);
+      this.owner = plugin;
+    }
+
+    @Override
+    public <T> void register(RegistrationType<T> type, Handler<T> handler)
+    {
+      ApexPluginManager.this.register(type, handler, owner);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
new file mode 100644
index 0000000..0c30943
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.DAGExecutionPluginContext.Handler;
+import org.apache.apex.engine.api.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.engine.api.PluginLocator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.webapp.AppInfo;
+
+/**
+ * Handle dispatching of events from Stram to Plugins. This implementation creates an executor
+ * service to process the event asynchronously. A separate task {@link DefaultApexPluginDispatcher.ProcessEventTask}
+ * is created to process an event and then submitted to the executor for execution.
+ */
+public class DefaultApexPluginDispatcher extends ApexPluginManager implements ApexPluginDispatcher
+{
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultApexPluginDispatcher.class);
+  private int qsize = 4098;
+  private ArrayBlockingQueue<Runnable> blockingQueue;
+  private ExecutorService executorService;
+
+  public DefaultApexPluginDispatcher(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats)
+  {
+    super(locator, context, dmgr, stats);
+  }
+
+  @Override
+  public <T> void dispatch(RegistrationType<T> registrationType, T data)
+  {
+    if (executorService != null) {
+      executorService.submit(new ProcessEventTask<>(registrationType, data));
+    }
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception
+  {
+    super.serviceInit(conf);
+    LOG.debug("Creating plugin dispatch queue with size {}", qsize);
+    blockingQueue = new ArrayBlockingQueue<>(qsize);
+    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler()
+    {
+      @Override
+      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+      {
+        try {
+          blockingQueue.remove();
+          executor.submit(r);
+        } catch (NoSuchElementException ex) {
+          // Ignore no-such element as queue may finish, while this handler is called.
+        }
+      }
+    };
+
+    executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+        blockingQueue, new NameableThreadFactory("PluginExecutorThread"), rejectionHandler);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception
+  {
+    executorService.shutdownNow();
+    executorService.awaitTermination(10, TimeUnit.SECONDS);
+    if (!executorService.isTerminated()) {
+      LOG.warn("Executor service still active for plugins");
+    }
+    executorService = null;
+  }
+
+  private class ProcessEventTask<T> implements Runnable
+  {
+    private final RegistrationType<T> registrationType;
+    private final T data;
+
+    public ProcessEventTask(RegistrationType<T> type, T data)
+    {
+      this.registrationType = type;
+      this.data = data;
+    }
+
+    @Override
+    public void run()
+    {
+      for (final PluginInfo pInfo : pluginInfoMap.values()) {
+        final Handler<T> handler = pInfo.get(registrationType);
+        if (handler != null) {
+          handler.handle(data);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
new file mode 100644
index 0000000..11eb5d1
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import org.apache.apex.engine.api.DAGExecutionPluginContext.RegistrationType;
+import org.apache.hadoop.service.AbstractService;
+
+public class NoOpApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher
+{
+  public NoOpApexPluginDispatcher()
+  {
+    super(NoOpApexPluginDispatcher.class.getName());
+  }
+
+  @Override
+  public <T> void dispatch(RegistrationType<T> registrationType, T data)
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ChainedPluginLocator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ChainedPluginLocator.java b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ChainedPluginLocator.java
new file mode 100644
index 0000000..42d4dc4
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ChainedPluginLocator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin.loaders;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.PluginLocator;
+import org.apache.hadoop.conf.Configuration;
+
+public class ChainedPluginLocator<T> implements PluginLocator<T>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ChainedPluginLocator.class);
+  List<PluginLocator> locators = new ArrayList<>();
+
+  public ChainedPluginLocator(PluginLocator<T>... locators)
+  {
+    for (PluginLocator locator : locators) {
+      this.locators.add(locator);
+    }
+  }
+
+  @Override
+  public Collection<T> discoverPlugins(Configuration conf)
+  {
+    List<T> plugins = new ArrayList<>();
+
+    for (PluginLocator<T> locator : locators) {
+      Collection<T> currentPlugins = locator.discoverPlugins(conf);
+      if (currentPlugins != null) {
+        LOG.info("Loader {} detected {} plugins ", locator.getClass().getName(), currentPlugins.size());
+        plugins.addAll(currentPlugins);
+      }
+    }
+
+    return plugins;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/loaders/PropertyBasedPluginLocator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/loaders/PropertyBasedPluginLocator.java b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/PropertyBasedPluginLocator.java
new file mode 100644
index 0000000..b9fc2a5
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/PropertyBasedPluginLocator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin.loaders;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.PluginLocator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.stram.StramUtils;
+
+public class PropertyBasedPluginLocator<T> implements PluginLocator<T>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(PropertyBasedPluginLocator.class);
+  private final Class<T> klass;
+  private final String propertyName;
+
+  public PropertyBasedPluginLocator(Class<T> klass, String propertyName)
+  {
+    this.klass = klass;
+    this.propertyName = propertyName;
+  }
+
+  @Override
+  public Collection<T> discoverPlugins(Configuration conf)
+  {
+    List<T> detectedPlugins = new ArrayList<>();
+    String classNamesStr = conf.get(this.propertyName);
+    if (classNamesStr == null) {
+      return detectedPlugins;
+    }
+
+    String[] classNames = classNamesStr.split(",");
+    for (String className : classNames) {
+      try {
+        Class<? extends T> plugin = StramUtils.classForName(className, this.klass);
+        detectedPlugins.add(StramUtils.newInstance(plugin));
+      } catch (IllegalArgumentException e) {
+        LOG.warn("Could not load plugin {}", className);
+      }
+    }
+    return detectedPlugins;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ServiceLoaderBasedPluginLocator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ServiceLoaderBasedPluginLocator.java b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ServiceLoaderBasedPluginLocator.java
new file mode 100644
index 0000000..3295329
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ServiceLoaderBasedPluginLocator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin.loaders;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import org.apache.apex.engine.api.PluginLocator;
+import org.apache.hadoop.conf.Configuration;
+
+public class ServiceLoaderBasedPluginLocator<T> implements PluginLocator<T>
+{
+  private final Class<T> klass;
+
+  public ServiceLoaderBasedPluginLocator(Class<T> klass)
+  {
+    this.klass = klass;
+  }
+
+  @Override
+  public Collection<T> discoverPlugins(Configuration conf)
+  {
+    List<T> discovered = new ArrayList<>();
+    ServiceLoader<T> loader = ServiceLoader.load(this.klass);
+    for (T plugin : loader) {
+      discovered.add(plugin);
+    }
+    return discovered;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/loaders/StaticPluginLocator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/loaders/StaticPluginLocator.java b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/StaticPluginLocator.java
new file mode 100644
index 0000000..f6b0dfc
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/StaticPluginLocator.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin.loaders;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.PluginLocator;
+import org.apache.hadoop.conf.Configuration;
+
+public class StaticPluginLocator<T> implements PluginLocator<T>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(StaticPluginLocator.class);
+
+  private final T[] plugins;
+
+  public StaticPluginLocator(T... plugins)
+  {
+    this.plugins = plugins;
+  }
+
+  @Override
+  public Collection<T> discoverPlugins(Configuration conf)
+  {
+    return Arrays.asList(plugins);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
new file mode 100644
index 0000000..6ad8073
--- /dev/null
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.apex.engine.api.DAGExecutionPlugin;
+import org.apache.apex.engine.api.DAGExecutionPluginContext;
+import org.apache.apex.engine.api.DAGExecutionPluginContext.Handler;
+
+import com.datatorrent.stram.api.StramEvent;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.COMMIT_EVENT;
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.HEARTBEAT;
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.STRAM_EVENT;
+
+public class DebugPlugin implements DAGExecutionPlugin
+{
+  private int eventCount = 0;
+  private int heartbeatCount = 0;
+  private int commitCount = 0;
+  private final Lock lock = new ReentrantLock();
+  final Condition events  = lock.newCondition();
+
+  @Override
+  public void setup(DAGExecutionPluginContext context)
+  {
+    context.register(STRAM_EVENT, new Handler<StramEvent>()
+    {
+      @Override
+      public void handle(StramEvent stramEvent)
+      {
+        lock();
+        eventCount++;
+        events.signal();
+        unlock();
+      }
+    });
+
+    context.register(HEARTBEAT, new Handler<StreamingContainerUmbilicalProtocol.ContainerHeartbeat>()
+    {
+      @Override
+      public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
+      {
+        lock();
+        heartbeatCount++;
+        events.signal();
+        unlock();
+      }
+    });
+
+    context.register(COMMIT_EVENT, new Handler<Long>()
+    {
+      @Override
+      public void handle(Long aLong)
+      {
+        lock();
+        commitCount++;
+        events.signal();
+        unlock();
+      }
+    });
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  public int getEventCount()
+  {
+    return eventCount;
+  }
+
+  public int getHeartbeatCount()
+  {
+    return heartbeatCount;
+  }
+
+  public int getCommitCount()
+  {
+    return commitCount;
+  }
+
+  void lock()
+  {
+    this.lock.lock();
+  }
+
+  void unlock()
+  {
+    this.lock.unlock();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
new file mode 100644
index 0000000..4aad641
--- /dev/null
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.DAGExecutionPlugin;
+import org.apache.apex.engine.api.DAGExecutionPluginContext;
+
+public class NoOpPlugin implements DAGExecutionPlugin
+{
+  private static final Logger LOG = LoggerFactory.getLogger(NoOpPlugin.class);
+
+  @Override
+  public void setup(DAGExecutionPluginContext context)
+  {
+    LOG.info("NoOpPlugin plugin called init ");
+  }
+
+  @Override
+  public void teardown()
+  {
+    LOG.info("NoOpPlugin plugin teardown called ");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
new file mode 100644
index 0000000..4848983
--- /dev/null
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.engine.api.DAGExecutionPlugin;
+import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
+import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
+import org.apache.apex.engine.plugin.loaders.StaticPluginLocator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.stram.api.StramEvent;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.support.StramTestSupport;
+
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.COMMIT_EVENT;
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.HEARTBEAT;
+import static org.apache.apex.engine.api.DAGExecutionPluginContext.STRAM_EVENT;
+
+public class PluginTests
+{
+
+  private static final Configuration conf = new Configuration();
+
+  @Test
+  public void testStaticPluginLoader()
+  {
+    DAGExecutionPlugin plugin1 = new NoOpPlugin();
+    DAGExecutionPlugin plugin2 = new DebugPlugin();
+
+    StaticPluginLocator<DAGExecutionPlugin> locator1 = new StaticPluginLocator<>(plugin1);
+    StaticPluginLocator<DAGExecutionPlugin> locator2 = new StaticPluginLocator<>(plugin2);
+
+    Collection<DAGExecutionPlugin> discovered1 = locator1.discoverPlugins(conf);
+    Assert.assertEquals("Number of plugins discovered ", 1, discovered1.size());
+    Assert.assertEquals("Type is NoOpPlugin", discovered1.iterator().next().getClass(), NoOpPlugin.class);
+    Assert.assertEquals("Type is NoOpPlugin", discovered1.iterator().next(), plugin1);
+
+    Collection<DAGExecutionPlugin> discovered2 = locator2.discoverPlugins(conf);
+    Assert.assertEquals("Number of plugins discovered ", 1, discovered2.size());
+    Assert.assertEquals("Type is NoOpPlugin", discovered2.iterator().next().getClass(), DebugPlugin.class);
+    Assert.assertEquals("Type is NoOpPlugin", discovered2.iterator().next(), plugin2);
+
+    ChainedPluginLocator<DAGExecutionPlugin> chained = new ChainedPluginLocator<>(locator1, locator2);
+    Collection<DAGExecutionPlugin> chainedDiscovered = chained.discoverPlugins(conf);
+    Assert.assertEquals("Number of plugins discovered ", 2, chainedDiscovered.size());
+    Assert.assertTrue(chainedDiscovered.contains(plugin1));
+    Assert.assertTrue(chainedDiscovered.contains(plugin2));
+  }
+
+  @Test
+  public void testServicePluginLoader()
+  {
+    ServiceLoaderBasedPluginLocator<DAGExecutionPlugin> locator = new ServiceLoaderBasedPluginLocator<>(DAGExecutionPlugin.class);
+    Collection<DAGExecutionPlugin> discovered = locator.discoverPlugins(conf);
+    Assert.assertEquals("Total number of plugins detected ", 1, discovered.size());
+    Assert.assertEquals("Type is NoOpPlugin", discovered.iterator().next().getClass(), DebugPlugin.class);
+  }
+
+  @Test
+  public void testDispatch() throws InterruptedException
+  {
+    DebugPlugin debugPlugin = new DebugPlugin();
+    StaticPluginLocator<? extends DAGExecutionPlugin> locator = new StaticPluginLocator<>(debugPlugin);
+    ApexPluginDispatcher pluginManager = new DefaultApexPluginDispatcher(locator,
+        new StramTestSupport.TestAppContext(new Attribute.AttributeMap.DefaultAttributeMap()), null, null);
+    pluginManager.init(new Configuration());
+    int count = debugPlugin.getEventCount();
+    pluginManager.dispatch(STRAM_EVENT, new StramEvent(StramEvent.LogLevel.DEBUG)
+    {
+      @Override
+      public String getType()
+      {
+        return "TestEvent";
+      }
+    });
+
+    debugPlugin.lock();
+    while (debugPlugin.getEventCount() == count) {
+      debugPlugin.events.await(5, TimeUnit.SECONDS);
+    }
+    debugPlugin.unlock();
+
+    Assert.assertEquals("Total stram event received ", debugPlugin.getEventCount(), 1);
+
+    count = debugPlugin.getCommitCount();
+    pluginManager.dispatch(COMMIT_EVENT, new Long(1234));
+    debugPlugin.lock();
+    while (debugPlugin.getCommitCount() == count) {
+      debugPlugin.events.await(5, TimeUnit.SECONDS);
+    }
+    debugPlugin.unlock();
+
+    count = debugPlugin.getHeartbeatCount();
+    pluginManager.dispatch(HEARTBEAT, new StreamingContainerUmbilicalProtocol.ContainerHeartbeat());
+    debugPlugin.lock();
+    while (debugPlugin.getHeartbeatCount() == count) {
+      debugPlugin.events.await(5, TimeUnit.SECONDS);
+    }
+    debugPlugin.unlock();
+    pluginManager.stop();
+
+    Assert.assertEquals(1, debugPlugin.getEventCount());
+    Assert.assertEquals(1, debugPlugin.getHeartbeatCount());
+    Assert.assertEquals(1, debugPlugin.getCommitCount());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/test/resources/META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin b/engine/src/test/resources/META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin
new file mode 100644
index 0000000..cd70a45
--- /dev/null
+++ b/engine/src/test/resources/META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin
@@ -0,0 +1,19 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.apex.engine.plugin.DebugPlugin


Mime
View raw message