apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tushar Gosavi (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-649) Infrastructure for user define stram event listeners.
Date Thu, 23 Mar 2017 11:40:41 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938145#comment-15938145
] 

Tushar Gosavi commented on APEXCORE-649:
----------------------------------------

# Example use case
- One common use case is pushing application metrics to third party monitoring systems such
as Graphite, OpenTSDB, etc ..., see the `Example Plugin` below how this can be achieved.
- Taking decisions based on stats and events, such as kill the application in case of container
failure, which can be done by monitoring an container killed event and then killing the application.

# Writing an Plugin
User can define a Apex Plugin by extending from DAGExecutionPlugin. The important methods
in the class as as below

- **setup(DAGExecutionPluginContext)**
  In the setup user can register to the interested events the following events are supported.
    - *ContainerHeartbeat* - The heartbeat from StreamingContainer as passed to the plugin
for examination after it has been handled by the application master.
       {code:language=java}
       context.register(HEARTBEAT, new Handler<>(ContainerHeartbeat chb) {
         ...
       });
       {code}

    - *StramEvent* - All Stram events generated by platform can be monitored the platform.
       {code:language=java}
       context.register(STRAM_EVENT, new Handler<>(StreamEvent event) {
         ...
       });
      {code}

    - *Committed* - When committed windowId is changed the plugin is notified so that    plugin
can cleanup cached data if required.
     {code:language=java}
       context.register(COMMIT_EVENT, new Handler<>(Long wid) {
         ...
       });
      {code}

- **teardown()**
  clean additional resources created by the plugin.

The all the handlers should be thread-safe as there is no guarantee that plugin execution
environment provides for thread safety. Also the plugins should not block for long time as
it could prevent other plugins from
executing and may result in dropped events in case of full queue.

# Loading of plugin
The plugins are loaded by platform in application master. currently the plugins are searched
by two methods.

1. Through JavaServiceLoader functionality.
  In this case user can create a jar of his plugin with META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin
file in resource directory. This file should contain the fully classified name
  of the class of the plugin. (See https://docs.oracle.com/javase/tutorial/ext/basics/spi.html)

2. Providing class names of Plugins through application configuration file.
   Alternatively user can provide fully qualified name of the class implementing plugin in
application configuration file as given below.

    {code:language=java}
    <property>
      <name>apex.plugin.stram.plugins</name>
      <value>{fcn of plugin}</value>
    </property>
   {code}

## Example Plugin.
A sample plugin to push the container free memory metric to Grapite monitoring system is given
below.

 {code:language=java}
package com.tugo.apex.plugins;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.slf4j.Logger;

import org.apache.apex.engine.api.DAGExecutionPlugin;
import org.apache.apex.engine.api.DAGExecutionPluginContext;

import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;

import static org.slf4j.LoggerFactory.getLogger;

public class GraphitePushPlugin implements DAGExecutionPlugin
{
  private static final Logger LOG = getLogger(GraphitePushPlugin.class);

  private DAGExecutionPluginContext context;
  ScheduledExecutorService executorService;
  private String appName;
  private String host;
  private int port;
  private Socket socket;
  private OutputStream output;
  private boolean connected = false;
  @Override
  public void setup(DAGExecutionPluginContext context)
  {
    executorService = Executors.newSingleThreadScheduledExecutor();

    context.register(DAGExecutionPluginContext.HEARTBEAT, new DAGExecutionPluginContext.Handler<StreamingContainerUmbilicalProtocol.ContainerHeartbeat>()
    {
      @Override
      public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
      {
        handleHeartbeat(heartbeat);
      }
    });

    appName = context.getApplicationContext().getApplicationName();
    host = context.getLaunchConfig().get("graphite-host");
    port = Integer.parseInt(context.getLaunchConfig().get("graphite-port"));
  }

  synchronized void connect() throws IOException
  {
    if (!connected) {
      socket = new Socket(host, port);
      output = socket.getOutputStream();
      connected = true;
    }
  }

  private void handleHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
  {
    StringBuilder builder = new StringBuilder(1024);
    builder.append(appName).append(".").append(heartbeat.getContainerId()).append(".").append("freeMemory").append("
")
      .append(heartbeat.memoryMBFree).append(" ").append(heartbeat.sentTms / 1000).append("\n");
    try {
      connect();
      if (output != null) {
        output.write(builder.toString().getBytes());
      }
    } catch (IOException e) {
      connected = false;
      output = null;
      socket = null;
    }
  }

  @Override
  public void teardown()
  {
    if (socket != null) {
      try {
        if (output != null) {
          output.flush();
        }
        socket.close();
      } catch (IOException e) {
        LOG.warn("error while closing the socket");
      }
    }
  }
}
{code}


> Infrastructure for user define stram event listeners.
> -----------------------------------------------------
>
>                 Key: APEXCORE-649
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-649
>             Project: Apache Apex Core
>          Issue Type: Sub-task
>            Reporter: Tushar Gosavi
>            Assignee: Tushar Gosavi
>
> As  suggested while working on Visitor API, I have came up with following proposal. The
idea is to support user defined DAG listeners.  The plan is to
> support limitated set of events for now and we could add more events
> in future.
> For the details functionality propvided check attached document.
> Please provide feedback on provided proposal.
> https://docs.google.com/document/d/1SAIE0EjnCumrB1jKJSnbGvcml47Po8ZHFthfcbNJQgU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message