Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 7333 invoked from network); 29 Apr 2010 17:27:18 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 29 Apr 2010 17:27:18 -0000 Received: (qmail 54356 invoked by uid 500); 29 Apr 2010 17:27:18 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 54331 invoked by uid 500); 29 Apr 2010 17:27:17 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 54323 invoked by uid 99); 29 Apr 2010 17:27:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Apr 2010 17:27:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Apr 2010 17:27:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7454523888AD; Thu, 29 Apr 2010 17:26:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r939394 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java Date: Thu, 29 Apr 2010 17:26:23 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100429172623.7454523888AD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: asrabkin Date: Thu Apr 29 17:26:23 2010 New Revision: 939394 URL: http://svn.apache.org/viewvc?rev=939394&view=rev Log: CHUKWA-479. Support HTTP trigger actions. Contributed by Bill Graham. Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java Modified: hadoop/chukwa/trunk/CHANGES.txt Modified: hadoop/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=939394&r1=939393&r2=939394&view=diff ============================================================================== --- hadoop/chukwa/trunk/CHANGES.txt (original) +++ hadoop/chukwa/trunk/CHANGES.txt Thu Apr 29 17:26:23 2010 @@ -4,6 +4,8 @@ Trunk (unreleased changes) NEW FEATURES + CHUKWA-479. Support HTTP trigger actions (Bill Graham via asrabkin) + CHUKWA-469. Add JMSAdaptor. (Bill Graham via asrabkin) CHUKWA-477. Support post-demux trigger. (Bill Graham via Eric Yang) Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java?rev=939394&view=auto ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java (added) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java Thu Apr 29 17:26:23 2010 @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datatrigger; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.OutputStreamWriter; +import java.net.URL; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.util.Map; +import java.util.HashMap; + +/** + * Trigger action that makes an HTTP request when executed. + *

+ * To use this trigger, two types of configurations must be set. First, this class + * must be configured to be invoked for a given trigger event. Second, the + * the relevant settings for the HTTP request(s) to be made must be set as + * described below. + *

+ * The general format of this classes configs is + * chukwa.trigger.[eventName].http.[N].[paramName] where + * eventName is the name of the event the request values are bound + * to (see TriggerEvent), N is a counter for each request configured (starting at 1) + * and paramName is the request parameter being set. + *

+ * Using the post demux success trigger event as an example, the first request + * to be fired would use the following configurations + *

    + *
  • chukwa.trigger.post.demux.success.http.1.url - The HTTP url to + * invoke.
  • + *
  • chukwa.trigger.post.demux.success.http.1.method - The HTTP method + * (optional, default=GET).
  • + *
  • chukwa.trigger.post.demux.success.http.1.headers - A comma-delimited + * set of HTTP headers (in [headerName]:[headerValue] form) to + * include (optional).
  • + *
  • chukwa.trigger.post.demux.success.http.1.body - The text HTTP body + * to include (optional).
  • + *
  • chukwa.trigger.post.demux.success.http.1.connect.timeout - The + * HTTP connection timeout setting in milliseconds (optional, default=5000ms).
  • + *
  • chukwa.trigger.post.demux.success.http.1.read.timeout - The + * HTTP read timeout setting in milliseconds (optional, default=5000ms).
  • + *
+ * @see TriggerAction + * @see TriggerEvent + */ +public class HttpTriggerAction implements TriggerAction { + protected Log log = LogFactory.getLog(getClass()); + + + /** + * Iterates over each URL found, fetched other settings and fires and HTTP + * request. + * + * @param conf + * @param fs + * @param src + * @param triggerEvent + * @throws IOException + */ + public void execute(Configuration conf, FileSystem fs, + FileStatus[] src, TriggerEvent triggerEvent) throws IOException { + + if (log.isDebugEnabled()) { + for (FileStatus file : src) { + log.debug("Execute file: " + file.getPath()); + } + } + + int reqNumber = 1; + URL url = null; + while ((url = getUrl(conf, triggerEvent, reqNumber)) != null) { + + // get settings for this request + String method = getMethod(conf, triggerEvent, reqNumber); + Map headers = getHeaders(conf, triggerEvent, reqNumber); + String body = getBody(conf, triggerEvent, reqNumber); + int connectTimeout = getConnectTimeout(conf, triggerEvent, reqNumber); + int readTimeout = getReadTimeout(conf, triggerEvent, reqNumber); + + try { + // make the request + makeHttpRequest(url, method, headers, body, connectTimeout, readTimeout); + } + catch(Exception e) { + log.error("Error making request to " + url, e); + } + reqNumber++; + } + } + + private void makeHttpRequest(URL url, String method, + Map headers, String body, + int connectTimeout, int readTimeout) throws IOException { + if (url == null) { + return; + } + + // initialize the connection + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + conn.setRequestMethod(method); + conn.setDoInput(true); + conn.setConnectTimeout(connectTimeout); + conn.setReadTimeout(readTimeout); + + // set headers + boolean contentLengthExists = false; + if (headers != null) { + for (String name: headers.keySet()) { + if (log.isDebugEnabled()) { + log.debug("Setting header " + name + ": " + headers.get(name)); + } + if (name.equalsIgnoreCase("content-length")) { + contentLengthExists = true; + } + conn.setRequestProperty(name, headers.get(name)); + } + } + + // set content-length if not already set + if (!"GET".equals(method) && !contentLengthExists) { + String contentLength = body != null ? String.valueOf(body.length()) : "0"; + conn.setRequestProperty("Content-Length", contentLength); + } + + // send body if it exists + if (body != null) { + conn.setDoOutput(true); + OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream()); + writer.write(body); + writer.flush(); + writer.close(); + } + else { + conn.setDoOutput(false); + } + + // read reponse code/message and dump response + log.info("Making HTTP " + method + " to: " + url); + int responseCode = conn.getResponseCode(); + log.info("HTTP Response code: " + responseCode); + + + if (responseCode != 200) { + log.info("HTTP Response message: " + conn.getResponseMessage()); + } + else { + BufferedReader reader = new BufferedReader( + new InputStreamReader(conn.getInputStream())); + String line; + StringBuilder sb = new StringBuilder(); + while ((line = reader.readLine()) != null) { + if(sb.length() > 0) { + sb.append("\n"); + } + sb.append(line); + } + log.info("HTTP Response:\n" + sb); + + reader.close(); + } + + conn.disconnect(); + } + + protected URL getUrl(Configuration conf, + TriggerEvent triggerEvent, + int reqNumber) throws MalformedURLException { + String urlString = conf.get(getConfigKey(triggerEvent, reqNumber, "url"), null); + if (urlString == null) { + return null; + } + + return new URL(urlString); + } + + protected String getMethod(Configuration conf, + TriggerEvent triggerEvent, + int reqNumber) { + return conf.get(getConfigKey(triggerEvent, reqNumber, "method"), "GET"); + } + + protected Map getHeaders(Configuration conf, + TriggerEvent triggerEvent, + int reqNumber) { + Map headerMap = new HashMap(); + + String headers = conf.get(getConfigKey(triggerEvent, reqNumber, "headers"), null); + + if (headers != null) { + String[] headersSplit = headers.split(","); + for (String header : headersSplit) { + String[] nvp = header.split(":", 2); + if (nvp.length < 2) { + log.error("Invalid HTTP header found: " + nvp); + continue; + } + headerMap.put(nvp[0].trim(), nvp[1].trim()); + } + } + + return headerMap; + } + + protected String getBody(Configuration conf, + TriggerEvent triggerEvent, + int reqNumber) { + return conf.get(getConfigKey(triggerEvent, reqNumber, "body"), "GET"); + } + + protected int getConnectTimeout(Configuration conf, + TriggerEvent triggerEvent, + int reqNumber) { + String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "connect.timeout"), null); + return timeout != null ? Integer.parseInt(timeout) : 5000; + } + + + protected int getReadTimeout(Configuration conf, + TriggerEvent triggerEvent, + int reqNumber) { + String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "read.timeout"), null); + return timeout != null ? Integer.parseInt(timeout) : 5000; + } + + private String getConfigKey(TriggerEvent triggerEvent, int reqNumber, String name) { + return triggerEvent.getConfigKeyBase() + ".http." + reqNumber + "." + name; + } +}