Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 96A29200C40 for ; Thu, 23 Mar 2017 12:40:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 953BF160B75; Thu, 23 Mar 2017 11:40:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B1E27160B84 for ; Thu, 23 Mar 2017 12:40:47 +0100 (CET) Received: (qmail 11309 invoked by uid 500); 23 Mar 2017 11:40:46 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 11298 invoked by uid 99); 23 Mar 2017 11:40:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Mar 2017 11:40:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 9EAB418028D for ; Thu, 23 Mar 2017 11:40:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -98.549 X-Spam-Level: X-Spam-Status: No, score=-98.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id QDZMVgeaaG8r for ; Thu, 23 Mar 2017 11:40:43 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id DDB185F58E for ; Thu, 23 Mar 2017 11:40:42 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 26340E04A8 for ; Thu, 23 Mar 2017 11:40:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 9A48221D9D for ; Thu, 23 Mar 2017 11:40:41 +0000 (UTC) Date: Thu, 23 Mar 2017 11:40:41 +0000 (UTC) From: "Tushar Gosavi (JIRA)" To: dev@apex.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (APEXCORE-649) Infrastructure for user define stram event listeners. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 23 Mar 2017 11:40:48 -0000 [ https://issues.apache.org/jira/browse/APEXCORE-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938145#comment-15938145 ] Tushar Gosavi commented on APEXCORE-649: ---------------------------------------- # Example use case - One common use case is pushing application metrics to third party monitoring systems such as Graphite, OpenTSDB, etc ..., see the `Example Plugin` below how this can be achieved. - Taking decisions based on stats and events, such as kill the application in case of container failure, which can be done by monitoring an container killed event and then killing the application. # Writing an Plugin User can define a Apex Plugin by extending from DAGExecutionPlugin. The important methods in the class as as below - **setup(DAGExecutionPluginContext)** In the setup user can register to the interested events the following events are supported. - *ContainerHeartbeat* - The heartbeat from StreamingContainer as passed to the plugin for examination after it has been handled by the application master. {code:language=java} context.register(HEARTBEAT, new Handler<>(ContainerHeartbeat chb) { ... }); {code} - *StramEvent* - All Stram events generated by platform can be monitored the platform. {code:language=java} context.register(STRAM_EVENT, new Handler<>(StreamEvent event) { ... }); {code} - *Committed* - When committed windowId is changed the plugin is notified so that plugin can cleanup cached data if required. {code:language=java} context.register(COMMIT_EVENT, new Handler<>(Long wid) { ... }); {code} - **teardown()** clean additional resources created by the plugin. The all the handlers should be thread-safe as there is no guarantee that plugin execution environment provides for thread safety. Also the plugins should not block for long time as it could prevent other plugins from executing and may result in dropped events in case of full queue. # Loading of plugin The plugins are loaded by platform in application master. currently the plugins are searched by two methods. 1. Through JavaServiceLoader functionality. In this case user can create a jar of his plugin with META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin file in resource directory. This file should contain the fully classified name of the class of the plugin. (See https://docs.oracle.com/javase/tutorial/ext/basics/spi.html) 2. Providing class names of Plugins through application configuration file. Alternatively user can provide fully qualified name of the class implementing plugin in application configuration file as given below. {code:language=java} apex.plugin.stram.plugins {fcn of plugin} {code} ## Example Plugin. A sample plugin to push the container free memory metric to Grapite monitoring system is given below. {code:language=java} package com.tugo.apex.plugins; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; import org.apache.apex.engine.api.DAGExecutionPlugin; import org.apache.apex.engine.api.DAGExecutionPluginContext; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; import static org.slf4j.LoggerFactory.getLogger; public class GraphitePushPlugin implements DAGExecutionPlugin { private static final Logger LOG = getLogger(GraphitePushPlugin.class); private DAGExecutionPluginContext context; ScheduledExecutorService executorService; private String appName; private String host; private int port; private Socket socket; private OutputStream output; private boolean connected = false; @Override public void setup(DAGExecutionPluginContext context) { executorService = Executors.newSingleThreadScheduledExecutor(); context.register(DAGExecutionPluginContext.HEARTBEAT, new DAGExecutionPluginContext.Handler() { @Override public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat) { handleHeartbeat(heartbeat); } }); appName = context.getApplicationContext().getApplicationName(); host = context.getLaunchConfig().get("graphite-host"); port = Integer.parseInt(context.getLaunchConfig().get("graphite-port")); } synchronized void connect() throws IOException { if (!connected) { socket = new Socket(host, port); output = socket.getOutputStream(); connected = true; } } private void handleHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat) { StringBuilder builder = new StringBuilder(1024); builder.append(appName).append(".").append(heartbeat.getContainerId()).append(".").append("freeMemory").append(" ") .append(heartbeat.memoryMBFree).append(" ").append(heartbeat.sentTms / 1000).append("\n"); try { connect(); if (output != null) { output.write(builder.toString().getBytes()); } } catch (IOException e) { connected = false; output = null; socket = null; } } @Override public void teardown() { if (socket != null) { try { if (output != null) { output.flush(); } socket.close(); } catch (IOException e) { LOG.warn("error while closing the socket"); } } } } {code} > Infrastructure for user define stram event listeners. > ----------------------------------------------------- > > Key: APEXCORE-649 > URL: https://issues.apache.org/jira/browse/APEXCORE-649 > Project: Apache Apex Core > Issue Type: Sub-task > Reporter: Tushar Gosavi > Assignee: Tushar Gosavi > > As suggested while working on Visitor API, I have came up with following proposal. The idea is to support user defined DAG listeners. The plan is to > support limitated set of events for now and we could add more events > in future. > For the details functionality propvided check attached document. > Please provide feedback on provided proposal. > https://docs.google.com/document/d/1SAIE0EjnCumrB1jKJSnbGvcml47Po8ZHFthfcbNJQgU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)