chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From billgra...@apache.org
Subject svn commit: r999123 - in /incubator/chukwa/trunk: ./ ivy/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/ src/test/org/apache/hadoop/...
Date Mon, 20 Sep 2010 21:17:09 GMT
Author: billgraham
Date: Mon Sep 20 21:17:06 2010
New Revision: 999123

URL: http://svn.apache.org/viewvc?rev=999123&view=rev
Log:
CHUKWA-515. REST API for Agent (Bill Graham)

Added:
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java   (with props)
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java   (with props)
    incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java   (with props)
    incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/rest/
    incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java   (with props)
Modified:
    incubator/chukwa/trunk/ivy.xml
    incubator/chukwa/trunk/ivy/libraries.properties
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java

Modified: incubator/chukwa/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/ivy.xml?rev=999123&r1=999122&r2=999123&view=diff
==============================================================================
--- incubator/chukwa/trunk/ivy.xml (original)
+++ incubator/chukwa/trunk/ivy.xml Mon Sep 20 21:17:06 2010
@@ -188,6 +188,10 @@
       name="core"
       rev="${jetty.eclipse.jdt.core.version}"
       conf="jetty->master"/>
+    <dependency org="springframework"
+      name="spring-mock"
+      rev="${spring-mock.version}"
+      conf="common->default"/>
 <!--    <dependency org="jaxb"
       name="jaxb-api"
       rev="${jaxb.version}"

Modified: incubator/chukwa/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/ivy/libraries.properties?rev=999123&r1=999122&r2=999123&view=diff
==============================================================================
--- incubator/chukwa/trunk/ivy/libraries.properties (original)
+++ incubator/chukwa/trunk/ivy/libraries.properties Mon Sep 20 21:17:06 2010
@@ -45,6 +45,7 @@ ezmorph.version=1.0.6
 mysql-connector.version=5.1.6
 
 rats-lib.version=0.5.1
+spring-mock.version=1.2.6
 jdiff.version=1.0.9
 xmlenc.version=0.52
 xerces.version=1.4.4

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java?rev=999123&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java (added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java Mon Sep 20 21:17:06 2010
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection;
+
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.LinkedList;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manages stats for multiple objects of type T. T can be any class that is used
+ * as a key for offset statistics (i.e. Agent, Collector, etc.). A client would
+ * create an instance of this class and call <code>addOffsetDataPoint<code>
+ * repeatedly over time. Then <code>calcAverageRate</code> can be called to
+ * retrieve the average offset-unit per second over a given time interval.
+ * <P>
+ * For a given object T that is actively adding data points, stats are kept for
+ * up to 20 minutes.
+ * <P>
+ * Care should be taken to always call <code>remove()</code> when old T objects
+ * should no longer be tracked.
+ */
+public class OffsetStatsManager<T> {
+  protected Logger log = Logger.getLogger(getClass());
+
+  /*
+   * This value is how far back we keep data for. Old data is purge when new
+   * data is added.
+   */
+  private static long DEFAULT_STATS_DATA_TTL = 20L * 60L * 1000L; // 20 minutes
+
+  /**
+   * How far back can our data be to be considered fresh enough, relative to the
+   * interval requests. For example if this value is 0.25 and interval requested
+   * is 60 seconds, our most recent data point must be no more than 15 seconds old.
+   */
+  private static double DEFAULT_STALE_THRESHOLD = 0.25;
+
+
+  /**
+   * How far back do we need to have historical data for, relative to the
+   * interval requested. For example if this value is 0.25 and the interval
+   * requested is 60 seconds, our most oldest data point must be within 15
+   * seconds of the most recent data point - 60.
+   */
+  private static double DEFAULT_AGE_THRESHOLD = 0.25;
+
+  // These can be made configurable if someone needs to do so
+  private long statsDataTTL = DEFAULT_STATS_DATA_TTL;
+  private double staleThresholdPercent = DEFAULT_STALE_THRESHOLD;
+  private double ageThresholdPercent = DEFAULT_AGE_THRESHOLD;
+
+  private Map<T, OffsetDataStats> offsetStatsMap =
+          new ConcurrentHashMap<T, OffsetDataStats>();
+
+  public OffsetStatsManager() {
+    this(DEFAULT_STATS_DATA_TTL);
+  }
+
+  public OffsetStatsManager(long statsDataTTL) {
+    this.statsDataTTL = statsDataTTL;
+  }
+
+  /**
+   * Record that at a given point in time an object key had a given offset.
+   * @param key Object to key this data point to
+   * @param offset How much of an offset to record
+   * @param timestamp The time the offset occured
+   */
+  public void addOffsetDataPoint(T key, long offset, long timestamp) {
+    OffsetDataStats stats = null;
+
+    synchronized (offsetStatsMap) {
+      if (offsetStatsMap.get(key) == null)
+        offsetStatsMap.put(key, new OffsetDataStats());
+
+      stats = offsetStatsMap.get(key);
+    }
+
+    stats.add(new OffsetData(offset, timestamp));
+    stats.prune(statsDataTTL);
+
+    if (log.isDebugEnabled())
+      log.debug("Added offset - key=" + key + ", offset=" + offset +
+                ", time=" + new Date(timestamp) + ", dataCount=" +
+                stats.getOffsetDataList().size());
+  }
+
+  public double calcAverageRate(T key, long timeIntervalSecs) {
+    OffsetDataStats stats = get(key);
+    if (stats == null) {
+      if (log.isDebugEnabled())
+        log.debug("No stats data found key=" + key);
+      return -1;
+    }
+
+    // first get the most recent data point to see if we're stale
+    long now = System.currentTimeMillis();
+    long mostRecentThreashold = now -
+            timeIntervalSecs * (long)(staleThresholdPercent * 1000);
+    OffsetData newestOffsetData = stats.mostRecentDataPoint();
+
+    if (newestOffsetData == null || newestOffsetData.olderThan(mostRecentThreashold)) {
+      if (log.isDebugEnabled())
+        log.debug("Stats data too stale for key=" + key);
+
+      return -1; // data is too stale
+    }
+
+    // then get the oldest data point to see if we have enough coverage
+    long then = newestOffsetData.getTimestamp() - timeIntervalSecs * 1000L;
+    long thenDelta = timeIntervalSecs * (long)(ageThresholdPercent * 1000);
+
+    OffsetData oldestOffsetData = null;
+    long minDiff = -1;
+    long lastDiff = -1;
+    for (OffsetData offsetData : stats.getOffsetDataList()) {
+      long diff = offsetData.within(then, thenDelta);
+
+      if (diff < 0) continue;
+
+      if (minDiff == -1 || minDiff < diff) {
+        // this is the data point closest to our target then time
+        minDiff = diff;
+        oldestOffsetData = offsetData;
+      }
+
+      // optimize so is we got a minDiff, but the diffs are getting worse, then
+      // we've found the closet point and we can move on
+      if (minDiff != -1 && lastDiff != -1 && diff > lastDiff) {
+        break;
+      }
+
+      lastDiff = diff;
+    }
+
+    if (oldestOffsetData == null) {
+      if (log.isDebugEnabled())
+        log.debug("Stats data history too short for key=" + key);
+
+      return -1;
+    }
+
+    return newestOffsetData.averageRate(oldestOffsetData);
+  }
+
+  public OffsetData oldestDataPoint(T key) {
+    OffsetDataStats stats = get(key);
+    return stats.oldestDataPoint();
+  }
+
+  public OffsetData mostRecentDataPoint(T key) {
+    OffsetDataStats stats = get(key);
+    return stats.mostRecentDataPoint();
+  }
+
+  /**
+   * Remove key from the set of objects that we're tracking stats for.
+   * @param key key of stats to be removed
+   */
+  public void remove(T key) {
+    synchronized (offsetStatsMap) {
+      offsetStatsMap.remove(key);
+    }
+  }
+
+  /**
+   * Remove all objectst that we're tracking stats for.
+   */
+  public void clear() {
+    synchronized (offsetStatsMap) {
+      offsetStatsMap.clear();
+    }
+  }
+
+  /**
+   * Fetch OffsetDataStats for key.
+   * @param key key that stats are to be returned for
+   */
+  private OffsetDataStats get(T key) {
+    synchronized (offsetStatsMap) {
+      return offsetStatsMap.get(key);
+    }
+  }
+
+  public class OffsetData {
+    private long offset;
+    private long timestamp;
+
+    private OffsetData(long offset, long timestamp) {
+      this.offset = offset;
+      this.timestamp = timestamp;
+    }
+
+    public long getOffset() { return offset; }
+    public long getTimestamp() { return timestamp; }
+
+    public double averageRate(OffsetData previous) {
+      if (previous == null) return -1;
+
+      return new Double((offset - previous.getOffset())) /
+             new Double((timestamp - previous.getTimestamp())) * 1000L;
+    }
+
+    public boolean olderThan(long timestamp) {
+      return this.timestamp < timestamp;
+    }
+
+    public long within(long timestamp, long delta) {
+
+      long diff = Math.abs(this.timestamp - timestamp);
+
+      if (diff < delta) return diff;
+      return -1;
+    }
+  }
+
+  private class OffsetDataStats {
+    private volatile LinkedList<OffsetData> offsetDataList = new LinkedList<OffsetData>();
+
+    public LinkedList<OffsetData> getOffsetDataList() {
+      return offsetDataList;
+    }
+
+    public void add(OffsetData offsetData) {
+      synchronized(offsetDataList) {
+        offsetDataList.add(offsetData);
+      }
+    }
+
+    public OffsetData oldestDataPoint() {
+      synchronized(offsetDataList) {
+        return offsetDataList.peekFirst();
+      }
+    }
+
+    public OffsetData mostRecentDataPoint() {
+      synchronized(offsetDataList) {
+        return offsetDataList.peekLast();
+      }
+    }
+
+    public void prune(long ttl) {
+      long cutoff = System.currentTimeMillis() - ttl;
+
+      OffsetData data;
+      synchronized(offsetDataList) {
+        while ((data = offsetDataList.peekFirst()) != null) {
+          if (data.getTimestamp() > cutoff) break;
+
+          offsetDataList.removeFirst();
+        }
+      }
+    }
+  }
+}

Propchange: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/OffsetStatsManager.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=999123&r1=999122&r2=999123&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Mon Sep 20 21:17:06 2010
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
-import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,26 +41,46 @@ import org.apache.hadoop.chukwa.datacoll
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
 import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
+import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
 import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
-import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
 import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.thread.BoundedThreadPool;
+import com.sun.jersey.spi.container.servlet.ServletContainer;
 
 /**
  * The local agent daemon that runs on each machine. This class is designed to
  * be embeddable, for use in testing.
+ * <P>
+ * The agent will start an HTTP REST interface listening on port. Configs for
+ * the agent are:
+ * <ul>
+ * <li><code>chukwaAgent.http.port</code> Port to listen on (default=9090).</li>
+ * <li><code>chukwaAgent.http.rest.controller.packages</code> Java packages to
+ * inspect for JAX-RS annotated classes to be added as servlets to the REST
+ * server.</li>
+ * </ul>
  * 
  */
 public class ChukwaAgent implements AdaptorManager {
   // boolean WRITE_CHECKPOINTS = true;
   static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "chukwaAgent");
 
+  private static final int HTTP_SERVER_THREADS = 120;
+  private static Server jettyServer = null;
+  private OffsetStatsManager adaptorStatsManager = null;
+  private Timer statsCollector = null;
+
   static Logger log = Logger.getLogger(ChukwaAgent.class);
   static ChukwaAgent agent = null;
 
@@ -124,6 +143,10 @@ public class ChukwaAgent implements Adap
     return controlSock.getPort();
   }
 
+  public OffsetStatsManager getAdaptorStatsManager() {
+    return adaptorStatsManager;
+  }
+
   /**
    * @param args
    * @throws AdaptorException
@@ -210,6 +233,10 @@ public class ChukwaAgent implements Adap
         "chukwa_checkpoint_");
     final int CHECKPOINT_INTERVAL_MS = conf.getInt(
         "chukwaAgent.checkpoint.interval", 5000);
+    final int STATS_INTERVAL_MS = conf.getInt(
+        "chukwaAgent.stats.collection.interval", 10000);
+    final int STATS_DATA_TTL_MS = conf.getInt(
+        "chukwaAgent.stats.data.ttl", 1200000);
 
     if (conf.get("chukwaAgent.checkpoint.dir") != null)
       checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
@@ -227,6 +254,7 @@ public class ChukwaAgent implements Adap
     log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
         + "]");
     log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
+    log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
     log.info("Config - tags: [" + tags + "]");
 
     if (DO_CHECKPOINT_RESTORE) {
@@ -260,6 +288,20 @@ public class ChukwaAgent implements Adap
       controlSock.start(); // this sets us up as a daemon
       log.info("control socket started on port " + controlSock.portno);
 
+      // start the HTTP server with stats collection
+      try {
+        this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
+        this.statsCollector = new Timer("ChukwaAgent Stats Collector");
+
+        startHttpServer(conf);
+
+        statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
+                STATS_INTERVAL_MS, STATS_INTERVAL_MS);
+      } catch (Exception e) {
+        log.error("Couldn't start HTTP server", e);
+        throw new RuntimeException(e);
+      }
+
       // shouldn't start checkpointing until we're finishing launching
       // adaptors on boot
       if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
@@ -273,6 +315,69 @@ public class ChukwaAgent implements Adap
 
   }
 
+  private void startHttpServer(Configuration conf) throws Exception {
+    int portNum = conf.getInt("chukwaAgent.http.port", 9090);
+    String jaxRsAddlPackages = conf.get("chukwaAgent.http.rest.controller.packages");
+    StringBuilder jaxRsPackages = new StringBuilder(
+            "org.apache.hadoop.chukwa.datacollection.agent.rest");
+
+    // Allow the ability to add additional servlets to the server
+    if (jaxRsAddlPackages != null)
+      jaxRsPackages.append(';').append(jaxRsAddlPackages);
+
+    // Set up jetty connector
+    SelectChannelConnector jettyConnector = new SelectChannelConnector();
+    jettyConnector.setLowResourcesConnections(HTTP_SERVER_THREADS - 10);
+    jettyConnector.setLowResourceMaxIdleTime(1500);
+    jettyConnector.setPort(portNum);
+
+    // Set up jetty server, using connector
+    jettyServer = new Server(portNum);
+    jettyServer.setConnectors(new org.mortbay.jetty.Connector[] { jettyConnector });
+    BoundedThreadPool pool = new BoundedThreadPool();
+    pool.setMaxThreads(HTTP_SERVER_THREADS);
+    jettyServer.setThreadPool(pool);
+
+    // Create the controller servlets
+    ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
+    servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
+            "com.sun.jersey.api.core.PackagesResourceConfig");
+    servletHolder.setInitParameter("com.sun.jersey.config.property.packages",
+            jaxRsPackages.toString());
+
+    // Create the server context and add the servlet
+    Context root = new Context(jettyServer, "/rest/v1", Context.SESSIONS);
+    root.setAttribute("ChukwaAgent", this);
+    root.addServlet(servletHolder, "/*");
+    root.setAllowNullPathInfo(false);
+
+    // And finally, fire up the server
+    jettyServer.start();
+    jettyServer.setStopAtShutdown(true);
+
+    log.info("started Chukwa http agent interface on port " + portNum);
+  }
+
+  /**
+   * Take snapshots of offset data so we can report flow rate stats.
+   */
+  private class StatsCollectorTask extends TimerTask {
+
+    public void run() {
+      long now = System.currentTimeMillis();
+
+      for(String adaptorId : getAdaptorList().keySet()) {
+        Adaptor adaptor = getAdaptor(adaptorId);
+        if(adaptor == null) continue;
+
+        Offset offset = adaptorPositions.get(adaptor);
+        if(offset == null) continue;
+
+        adaptorStatsManager.addOffsetDataPoint(adaptor, offset.offset, now);
+      }
+    }
+  }
+
   // words should contain (space delimited):
   // 0) command ("add")
   // 1) Optional adaptor name, followed by =
@@ -372,6 +477,7 @@ public class ChukwaAgent implements Adap
         } catch (Exception e) {
           Adaptor failed = adaptorsByName.remove(adaptorID);
           adaptorPositions.remove(failed);
+          adaptorStatsManager.remove(failed);
           log.warn("failed to start adaptor", e);
           if(e instanceof AdaptorException)
             throw (AdaptorException)e;
@@ -550,8 +656,8 @@ public class ChukwaAgent implements Adap
    * If the adaptor is written correctly, its offset won't change after
    * returning from shutdown.
    * 
-   * @param number the adaptor to stop
-   * @param gracefully if true, shutdown, if false, hardStop
+   * @param name the adaptor to stop
+   * @param shutdownMode if true, shutdown, if false, hardStop
    * @return the number of bytes synched at stop. -1 on error
    */
   public long stopAdaptor(String name, AdaptorShutdownPolicy shutdownMode) {
@@ -569,6 +675,7 @@ public class ChukwaAgent implements Adap
       return offset;
     } else {
       adaptorPositions.remove(toStop);
+      adaptorStatsManager.remove(toStop);
     }
     ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
     ChukwaAgent.agentMetrics.removedAdaptor.inc();
@@ -645,6 +752,17 @@ public class ChukwaAgent implements Adap
    */
   public void shutdown(boolean exit) {
     controlSock.shutdown(); // make sure we don't get new requests
+
+    if (statsCollector != null) {
+      statsCollector.cancel();
+    }
+
+    try {
+      jettyServer.stop();
+    } catch (Exception e) {
+      log.error("Couldn't stop jetty server.", e);
+    }
+
     if (checkpointer != null) {
       checkpointer.cancel();
       try {
@@ -667,6 +785,7 @@ public class ChukwaAgent implements Adap
     }
     adaptorsByName.clear();
     adaptorPositions.clear();
+    adaptorStatsManager.clear();
     if (exit)
       System.exit(0);
   }

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java?rev=999123&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java (added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java Mon Sep 20 21:17:06 2010
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.agent.rest;
+
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.json.JSONObject;
+import org.json.JSONException;
+
+import javax.ws.rs.Path;
+import javax.ws.rs.GET;
+import javax.ws.rs.Produces;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.MediaType;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletResponse;
+import java.text.DecimalFormat;
+import java.util.Map;
+
+/**
+ * JAX-RS controller to handle all HTTP request to the Agent that deal with adaptors.
+ *
+ * To return all adaptors:
+ *   GET /rest/v1/adaptor
+ *   Optional QS params: viewType=text|xml (default=xml)
+ *
+ * To return a single adaptor:
+ *   GET /rest/v1/adaptor/[adaptorId]
+ *   Optional QS params: viewType=text|xml (default=xml)
+ *
+ * To remove an adaptor:
+ *   DELETE /rest/v1/adaptor/[adaptorId]
+ *
+ * To add an adaptor:
+ *   POST /rest/v1/adaptor
+ *   Content-Type: application/json
+ *   Optional QS params: viewType=text|xml (default=xml)
+ *
+ *   { "DataType" : "foo",
+ *     "AdaptorClass" : "FooAdaptor",
+ *     "AdaptorParams" : "params",
+ *     "Offset"     : "0" }
+ *   The first 3 params above are the only required ones.
+ */
+@Path("/adaptor")
+public class AdaptorController {
+
+  private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat();
+
+  static {
+    DECIMAL_FORMAT.setMinimumFractionDigits(2);
+    DECIMAL_FORMAT.setMaximumFractionDigits(2);
+    DECIMAL_FORMAT.setGroupingUsed(false);
+  }
+
+  /**
+   * Adds an adaptor to the agent and returns the adaptor info.
+   * @param context servletContext
+   * @param viewType type of view to return (text|xml)
+   * @param postBody JSON post body
+   * @return Response object
+   */
+  @POST
+  @Consumes("application/json")
+  @Produces({"text/xml","text/plain"})
+  public Response addAdaptor(@Context ServletContext context,
+                             @QueryParam("viewType") String viewType,
+                             String postBody) {
+    ChukwaAgent agent = (ChukwaAgent)context.getAttribute("ChukwaAgent");
+
+    if (postBody == null) return badRequestResponse("Empty POST body.");
+
+    // parse the json.
+    StringBuilder addCommand = new StringBuilder("add ");
+    try {
+      JSONObject reqJson = new JSONObject(postBody);
+
+      String dataType = reqJson.getString("DataType");
+      //TODO: figure out how to set this per-adaptor
+      //String cluster = reqJson.getString("Cluster");
+      String adaptorClass = reqJson.getString("AdaptorClass");
+
+      String adaptorParams = fetchOptionalString(reqJson, "AdaptorParams");
+      long offset = fetchOptionalLong(reqJson, "Offset", 0);
+
+      addCommand.append(adaptorClass).append(' ');
+      addCommand.append(dataType);
+      if (adaptorParams != null)
+        addCommand.append(' ').append(adaptorParams);
+      addCommand.append(' ').append(offset);
+
+    } catch (JSONException e) {
+      return badRequestResponse("Invalid JSON passed: '" + postBody + "', error: " + e.getMessage());
+    }
+
+    // add the adaptor
+    try {
+      String adaptorId = agent.processAddCommandE(addCommand.toString());
+
+      return doGetAdaptor(agent, adaptorId, viewType);
+    } catch (AdaptorException e) {
+      return badRequestResponse("Could not add adaptor for postBody: '" + postBody +
+              "', error: " + e.getMessage());
+    }
+  }
+
+  /**
+   * Remove an adaptor from the agent
+   * @param context ServletContext
+   * @param adaptorId id of adaptor to remove.
+   * @return Response object
+   */
+  @DELETE
+  @Path("/{adaptorId}")
+  @Produces({"text/plain"})
+  public Response removeAdaptor(@Context ServletContext context,
+                                @PathParam("adaptorId") String adaptorId) {
+    ChukwaAgent agent = (ChukwaAgent)context.getAttribute("ChukwaAgent");
+
+    // validate that we have an adaptorId
+    if (adaptorId == null) {
+      return badRequestResponse("Missing adaptorId.");
+    }
+
+    // validate that we have a valid adaptorId
+    if (agent.getAdaptor(adaptorId) == null) {
+      return badRequestResponse("Invalid adaptorId: " + adaptorId);
+    }
+
+    // stop the agent
+    agent.stopAdaptor(adaptorId, true);
+    return Response.ok().build();
+  }
+
+  /**
+   * Get all adaptors
+   * @param context ServletContext
+   * @param viewType type of view to return (text|xml)
+   * @return Response object
+   */
+  @GET
+  @Produces({"text/xml", "text/plain"})
+  public Response getAdaptors(@Context ServletContext context,
+                              @QueryParam("viewType") String viewType) {
+    ChukwaAgent agent = (ChukwaAgent)context.getAttribute("ChukwaAgent");
+    return doGetAdaptor(agent, null, viewType);
+  }
+
+  /**
+   * Get a single adaptor
+   * @param context ServletContext
+   * @param viewType type of view to return (text|xml)
+   * @param adaptorId id of the adaptor to return
+   * @return Response object
+   */
+  @GET
+  @Path("/{adaptorId}")
+  @Produces({"text/xml","text/plain"})
+  public Response getAdaptor(@Context ServletContext context,
+                             @QueryParam("viewType") String viewType,
+                             @PathParam("adaptorId") String adaptorId) {
+    ChukwaAgent agent = (ChukwaAgent)context.getAttribute("ChukwaAgent");
+    return doGetAdaptor(agent, adaptorId, viewType);
+  }
+
+  /**
+   * Handles the common rendering logic of checking for view type and returning
+   * a Response with one or all adaptors.
+   * @return Response object
+   */
+  private Response doGetAdaptor(ChukwaAgent agent, String adaptorId, String viewType) {
+    if ("text".equals(viewType)) {
+      return textResponse(buildAdaptorText(agent, adaptorId));
+    }
+    else if ("xml".equals(viewType) || viewType == null) {
+      return xmlResponse(buildAdaptorXML(agent, adaptorId));
+    }
+    else {
+      return badRequestResponse("Invalid viewType: " + viewType);
+    }
+  }
+
+  /**
+   * Renders info for one or all adaptors in XML.
+   */
+  protected String buildAdaptorXML(ChukwaAgent agent, String adaptorId) {
+
+    StringBuilder out = new StringBuilder(
+            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+
+    appendStartTag(out, "Response");
+
+    if (adaptorId == null) {
+      Map<String, String> adaptorMap = agent.getAdaptorList();
+      appendStartTag(out, "Adaptors", "total", adaptorMap.size());
+
+      for (String name : adaptorMap.keySet()) {
+        Adaptor adaptor = agent.getAdaptor(name);
+        appendAdaptorXML(out, agent, adaptor);
+      }
+
+      appendEndTag(out, "Adaptors");
+    }
+    else {
+      Adaptor adaptor = agent.getAdaptor(adaptorId);
+      if (adaptor != null) {
+        appendAdaptorXML(out, agent, adaptor);
+      }
+      else {
+        appendElement(out, "Error", "Invalid adaptor id: " + adaptorId);
+      }
+    }
+
+    appendEndTag(out, "Response");
+
+    return out.toString();
+  }
+
+  /**
+   * Renders info for one or all adaptors in plain text (YAML).
+   */
+  protected String buildAdaptorText(ChukwaAgent agent, String adaptorId) {
+    StringBuilder out = new StringBuilder();
+
+    Map<String, String> adaptorMap = agent.getAdaptorList();
+    int indent = 0;
+
+    if (adaptorId == null) {
+      appendNvp(out, indent, "adaptor_count", adaptorMap.size());
+      appendNvp(out, indent, "adaptors", "");
+
+      indent += 4;
+      for(String name : adaptorMap.keySet()) {
+        Adaptor adaptor = agent.getAdaptor(name);
+        appendAdaptorText(out, indent, agent, adaptor);
+      }
+    }
+    else {
+      Adaptor adaptor = agent.getAdaptor(adaptorId);
+      if (adaptor != null) {
+        appendNvp(out, indent, "adaptor", "");
+        indent += 4;
+        appendAdaptorText(out, indent, agent, adaptor);
+      }
+      else {
+        appendNvp(out, indent, "error_message", "Invalid adaptor id: " + adaptorId, true);
+      }
+    }
+
+    return out.toString();
+  }
+
+  private void appendAdaptorText(StringBuilder out, int indent,
+                                 ChukwaAgent agent, Adaptor adaptor) {
+    appendNvp(out, indent, "- adaptor_id", agent.offset(adaptor).adaptorID());
+    appendNvp(out, indent, "data_type", adaptor.getType());
+    appendNvp(out, indent, "offset", agent.offset(adaptor).offset());
+    appendNvp(out, indent, "adaptor_class", adaptor.getClass().getName());
+    appendNvp(out, indent, "adaptor_params", adaptor.getCurrentStatus(), true);
+
+    OffsetStatsManager adaptorStats = agent.getAdaptorStatsManager();
+
+    appendNvp(out, indent, "average_rates", "");
+    indent += 4;
+    appendNvp(out, indent, "- rate",
+            DECIMAL_FORMAT.format(adaptorStats.calcAverageRate(adaptor,  60)));
+    appendNvp(out, indent, "interval", "60");
+    appendNvp(out, indent, "- rate",
+            DECIMAL_FORMAT.format(adaptorStats.calcAverageRate(adaptor,  300)));
+    appendNvp(out, indent, "interval", "300");
+    appendNvp(out, indent, "- rate",
+            DECIMAL_FORMAT.format(adaptorStats.calcAverageRate(adaptor,  600)));
+    appendNvp(out, indent, "interval", "600");
+    indent -= 4;
+  }
+
+  private void appendAdaptorXML(StringBuilder out,
+                               ChukwaAgent agent, Adaptor adaptor) {
+    appendStartTag(out, "Adaptor",
+            "id", agent.offset(adaptor).adaptorID(),
+            "dataType", adaptor.getType(),
+            "offset", agent.offset(adaptor).offset());
+
+    appendElement(out, "AdaptorClass", adaptor.getClass().getName());
+    appendElement(out, "AdaptorParams", adaptor.getCurrentStatus());
+
+    OffsetStatsManager adaptorStats = agent.getAdaptorStatsManager();
+
+    appendElement(out, "AverageRate",
+            DECIMAL_FORMAT.format(adaptorStats.calcAverageRate(adaptor,  60)),
+            "intervalSeconds", "60");
+    appendElement(out, "AverageRate",
+            DECIMAL_FORMAT.format(adaptorStats.calcAverageRate(adaptor,  300)),
+            "intervalSeconds", "300");
+    appendElement(out, "AverageRate",
+            DECIMAL_FORMAT.format(adaptorStats.calcAverageRate(adaptor,  600)),
+            "intervalSeconds", "600");
+
+    appendEndTag(out, "Adaptor");
+  }
+
+  // *** static helper methods below. could be moved into a util class ***
+
+  //   * response handling *
+
+  private static Response textResponse(Object content) {
+    return Response.ok(content, MediaType.TEXT_PLAIN).build();
+  }
+
+  private static Response xmlResponse(String content) {
+    return Response.ok(content, MediaType.TEXT_XML).build();
+  }
+
+  private static Response badRequestResponse(String content) {
+    return Response.status(HttpServletResponse.SC_BAD_REQUEST)
+                     .entity(content).build();
+  }
+
+  //   * json handling *
+
+  private static String fetchOptionalString(JSONObject json, String name) {
+    try {
+      return json.getString(name);
+    } catch (JSONException e) {}
+    return null;
+  }
+
+  private static long fetchOptionalLong(JSONObject json, String name, long defaultLong) {
+    try {
+      return json.getLong(name);
+    } catch (JSONException e) {
+      return defaultLong;
+    }
+  }
+
+  //   * plain text response handling *
+
+  /**
+   * Helper for appending name/value pairs to the ServletOutputStream in the
+   * format [name]: [value]
+   */
+  protected static void appendNvp(StringBuilder out,
+                                  String name, Object value) {
+    appendNvp(out, 0, name, value, false);
+  }
+
+  /**
+   * Helper for appending name/value pairs to the ServletOutputStream in the
+   * format [name]: [value] with indent number of spaces prepended.
+   */
+  protected static void appendNvp(StringBuilder out, int indent,
+                                  String name, Object value) {
+    appendNvp(out, indent, name, value, false);
+  }
+
+  /**
+   * Helper for appending name/value pairs to the ServletOutputStream in the
+   * format [name]: [value] with indent number of spaces prepended. Set
+   * stringLiteral=true if the value might contain special YAML characters.
+   */
+  protected static void appendNvp(StringBuilder out, int indent,
+                                  String name, Object value,
+                                  boolean stringLiteral) {
+
+    if (name.startsWith("- ") && indent > 1) indent -= 2;
+
+    indent(out, indent);
+    out.append(name);
+    out.append(": ");
+
+    if (stringLiteral) {
+      out.append('|').append('\n');
+      indent(out, indent + 2);
+    }
+
+    if (value != null)
+      out.append(value.toString()).append('\n');
+  }
+
+  /**
+   * Helper to insert a number of spaces into the output stream.
+   */
+  protected static void indent(StringBuilder out, int indent) {
+    for (int i = 0; i < indent; i++) {
+      out.append(' ');
+    }
+  }
+
+  //   * XML text response handling *
+
+  /**
+   * XML helper to append a Element start tag. Optionally a number of attributeNvps
+   * can be passed, which will be inserted as XML atrribute names and values,
+   * alternating between names and values.
+   */
+  protected static void appendStartTag(StringBuilder out,
+                                       String name,
+                                       Object... attributeNvps) {
+    out.append("<");
+    out.append(name);
+    for(int i = 0; i < attributeNvps.length - 1; i = i + 2) {
+      out.append(" ");
+      out.append(attributeNvps[i].toString());
+      out.append("=\"");
+      if (attributeNvps[i + 1] != null)
+        out.append(StringEscapeUtils.escapeXml(attributeNvps[i + 1].toString()));
+      out.append("\"");
+    }
+    out.append(">");
+  }
+
+  /**
+   * XML helper to append a Element end tag.
+   */
+  protected static void appendEndTag(StringBuilder out,
+                                     String name) {
+    out.append("</");
+    out.append(name);
+    out.append(">");
+  }
+
+  /**
+   * XML helper to append an Element and it's child text value. Optionally a
+   * number of attributeNvps can be passed, which will be inserted as XML
+   * atrribute names and values, alternating between names and values.
+   */
+  protected static void appendElement(StringBuilder out,
+                                      String name, Object value,
+                                      Object... attributeNvps) {
+    appendStartTag(out, name, attributeNvps);
+    if (value != null)
+      out.append(StringEscapeUtils.escapeXml(value.toString()));
+    appendEndTag(out, name);
+  }
+
+}

Propchange: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java?rev=999123&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java (added)
+++ incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java Mon Sep 20 21:17:06 2010
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection;
+
+import junit.framework.TestCase;
+
+/**
+ * Verifies that the stats manager calculates stats properly. This test will take
+ * at least 13 seconds to run.
+ */
+public class TestOffsetStatsManager extends TestCase {
+
+  org.apache.hadoop.chukwa.datacollection.OffsetStatsManager<DummyKey> statsManager = null;
+  DummyKey dummyKey = new DummyKey();
+
+  protected void setUp() throws Exception {
+    statsManager = new OffsetStatsManager<DummyKey>();
+    dummyKey = new DummyKey();
+  }
+
+  public void testCalcAverageRate() throws InterruptedException {
+
+    // add roughly 1000 bytes per second for about 5 seconds
+    for (int i = 0; i < 5; i++) {
+      statsManager.addOffsetDataPoint(dummyKey, 1000 * i, System.currentTimeMillis());
+      Thread.sleep(1000);
+    }
+
+    // calculate 5 second average
+    double rate = statsManager.calcAverageRate(dummyKey, 5);
+    assertTrue("Invalid average, expected about 1 byte/sec, found " + rate,
+                 Math.abs(1000 - rate) < 1.5);
+  }
+
+  public void testCalcAverageRateStaleData() throws InterruptedException {
+
+    // add offsets for about 5 seconds, but timestamp them 3 seconds ago to make
+    // them stale
+    for (int i = 0; i < 5; i++) {
+      statsManager.addOffsetDataPoint(dummyKey, 1000 * i, System.currentTimeMillis() - 3000L);
+      Thread.sleep(1000);
+    }
+
+    // calculate 5 second average
+    double rate = statsManager.calcAverageRate(dummyKey, 5);
+    assertEquals("Should have gotten a stale data response", -1.0, rate);
+  }
+
+  public void testCalcAverageRateNotEnoughData() throws InterruptedException {
+
+    // add offsets for about 3 seconds
+    for (int i = 0; i < 3; i++) {
+      statsManager.addOffsetDataPoint(dummyKey, 1000 * i, System.currentTimeMillis());
+      Thread.sleep(1000);
+    }
+
+    // calculate 5 second average
+    double rate = statsManager.calcAverageRate(dummyKey, 5);
+    assertEquals("Should have gotten a stale data response", -1.0, rate);
+  }
+
+  private class DummyKey {}
+}

Propchange: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java?rev=999123&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java (added)
+++ incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java Mon Sep 20 21:17:06 2010
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.agent.rest;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.springframework.mock.web.MockHttpServletRequest;
+import org.springframework.mock.web.MockHttpServletResponse;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHandler;
+import org.mortbay.jetty.Server;
+
+import javax.servlet.ServletException;
+import javax.servlet.Servlet;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import com.sun.jersey.spi.container.servlet.ServletContainer;
+
+/**
+ * Tests the basic functionality of the AdaptorController.
+ */
+public class TestAdaptorController extends TestCase {
+  protected Log log = LogFactory.getLog(getClass());
+
+  Server jettyServer;
+  ChukwaAgent agent;
+  Servlet servlet;
+  MockHttpServletRequest request;
+  MockHttpServletResponse response;
+  StringBuilder sb;
+
+  protected void setUp() throws Exception {
+    agent = new ChukwaAgent();
+
+    ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
+    servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
+            "com.sun.jersey.api.core.PackagesResourceConfig");
+    servletHolder.setInitParameter("com.sun.jersey.config.property.packages",
+            "org.apache.hadoop.chukwa.datacollection.agent.rest");
+    servletHolder.setServletHandler(new ServletHandler());
+
+    jettyServer = new Server();
+
+    Context root = new Context(jettyServer, "/foo/bar", Context.SESSIONS);
+    root.setAttribute("ChukwaAgent", agent);
+    root.addServlet(servletHolder, "/*");
+
+    jettyServer.start();
+    jettyServer.setStopAtShutdown(true);
+
+    servlet = servletHolder.getServlet();
+    request = new MockHttpServletRequest();
+    request.setContextPath("/foo/bar");
+
+    response = new MockHttpServletResponse();
+    agent.processAddCommandE("add org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor SomeDataType 0");
+    sb = new StringBuilder();
+  }
+
+  protected void tearDown() throws Exception {
+    agent.shutdown();
+    jettyServer.stop();
+  }
+
+  public void testGetPlainText() throws IOException, ServletException {
+    request.setServletPath("/adaptor");
+    request.setRequestURI(request.getContextPath() + request.getServletPath());
+    request.setQueryString("viewType=text");
+    request.setMethod("GET");
+
+    servlet.service(request, response);
+
+    // assert response
+    assertTextResponse(response, 1);
+
+    //assert agent
+    assertEquals("Incorrect total number of adaptors", 1, agent.adaptorCount());
+  }
+
+  private void assertOccurs(String message, int occurances, String text, String match) {
+
+    int index = -1;
+    for(int i = 0; i < occurances; i++) {
+      index = text.indexOf(match, index + 1);
+      assertTrue(message + ": " + text, index != -1);
+    }
+  }
+
+  public void testGetXml() throws IOException, ServletException {
+    request.setServletPath("/adaptor");
+    request.setRequestURI(request.getContextPath() + request.getServletPath());
+    request.setQueryString("viewType=xml");
+    request.setMethod("GET");
+
+    servlet.service(request, response);
+
+    String responseContent = response.getContentAsString();
+
+    // assert response
+    assertXmlResponse(response, 1);
+
+    //assert agent
+    assertEquals("Incorrect total number of adaptors", 1, agent.adaptorCount());
+  }
+
+  public void testGetInvalidViewType() throws IOException, ServletException {
+    request.setServletPath("/adaptor");
+    request.setRequestURI(request.getContextPath() + request.getServletPath());
+    request.setQueryString("viewType=unsupportedViewType");
+    request.setMethod("GET");
+
+    servlet.service(request, response);
+
+    // assert response
+    assertEquals("Unexpected response status", 400, response.getStatus());
+    assertEquals("Unexpected response content",
+            "Invalid viewType: unsupportedViewType", response.getContentAsString());
+  }
+
+  public void testDeleteAdaptor() throws IOException, ServletException {
+    String adaptorId = agent.getAdaptorList().keySet().iterator().next();
+
+    request.setServletPath("/adaptor/" + adaptorId);
+    request.setRequestURI(request.getContextPath() + request.getServletPath());
+
+    //assert agent
+    assertEquals("Incorrect total number of adaptors", 1, agent.adaptorCount());
+    request.setMethod("DELETE");
+
+    servlet.service(request, response);
+
+    // assert response
+    assertEquals("Unexpected response status", 200, response.getStatus());
+
+    //assert agent
+    assertEquals("Incorrect total number of adaptors", 0, agent.adaptorCount());
+  }
+
+  public void testAddAdaptor() throws IOException, ServletException {
+    request.setServletPath("/adaptor");
+    request.setRequestURI(request.getContextPath() + request.getServletPath());
+    request.setContentType("application/json;charset=UTF-8");
+    request.setContent("{ \"DataType\" : \"SomeDataType\", \"AdaptorClass\" : \"org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor\", \"AdaptorParams\" : \"1000\", \"Offset\" : \"5555\" }".getBytes());
+
+    //assert agent
+    assertEquals("Incorrect total number of adaptors", 1, agent.adaptorCount());
+    String initialAdaptorId = agent.getAdaptorList().keySet().iterator().next();
+
+    request.setMethod("POST");
+
+    servlet.service(request, response);
+
+    // assert response
+    String responseContent = assertXmlResponse(response, 1);
+    String newAdaptorId = null;
+    for (String id : agent.getAdaptorList().keySet()) {
+      if (id != initialAdaptorId) {
+        newAdaptorId = id;
+        break;
+      }
+    }
+
+    //assert agent
+    assertEquals("Incorrect total number of adaptors", 2, agent.adaptorCount());
+
+    assertOccurs("Response did not contain adaptorId", 1, responseContent, newAdaptorId);
+
+    //assert agent
+    assertEquals("Incorrect total number of adaptors", 2, agent.adaptorCount());
+
+    // fire a doGet to assert that the servlet shows 2 adaptors
+    request.setQueryString("viewType=text");
+    request.setMethod("GET");
+    request.setContentType(null);
+    request.setContent(null);
+
+    servlet.service(request, response);
+
+    // assert response
+    assertTextResponse(response, 2);
+  }
+
+  private String assertTextResponse(MockHttpServletResponse response,
+                                  int adaptorCount)
+                                  throws UnsupportedEncodingException {
+    String responseContent = response.getContentAsString();
+
+    assertEquals("Unexpected response status", 200, response.getStatus());
+    //Content it correct when executed via an HTTP client, but it doesn't seem
+    //to get set by the servlet
+    //assertEquals("Unexpected content type", "text/plain;charset=UTF-8",
+    //            response.getContentType());
+    assertOccurs("Response text doesn't include correct adaptor_count", 1,
+            responseContent, "adaptor_count: " + adaptorCount);
+    assertOccurs("Response text doesn't include adaptor class",
+            adaptorCount, responseContent,
+            "adaptor_class: org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor");
+    assertOccurs("Response text doesn't include data type",
+            adaptorCount, responseContent, "data_type: SomeDataType");
+
+    return responseContent;
+  }
+
+  private String assertXmlResponse(MockHttpServletResponse response,
+                                   int adaptorCount)
+                                   throws UnsupportedEncodingException {
+    String responseContent = response.getContentAsString();
+
+    // assert response
+    assertEquals("Unexpected response status", 200, response.getStatus());
+    //Content it correct when executed via an HTTP client, but it doesn't seem
+    //to get set by the servlet
+    //assertEquals("Unexpected content type", "text/xml;charset=UTF-8",
+    //            response.getContentType());
+    assertOccurs("Response XML doesn't include correct adaptor_count", adaptorCount,
+            responseContent, "Adaptor>");
+    assertOccurs("Response XML doesn't include AdaptorClass", adaptorCount, responseContent,
+            "<AdaptorClass>org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor</AdaptorClass>");
+    assertOccurs("Response XML doesn't include dataType", adaptorCount, responseContent,
+            "dataType=\"SomeDataType\"");
+
+    return responseContent;
+  }
+
+  //  ** test utility methods **
+
+  public void testPrintNvp() throws IOException {
+    AdaptorController.appendNvp(sb, "foo", "bar");
+    assertEquals("Unexpected NVP", "foo: bar\n", sb.toString());
+  }
+
+  public void testPrintNvpIndented() throws IOException {
+    AdaptorController.appendNvp(sb, 4, "foo", "bar");
+    assertEquals("Unexpected NVP", "    foo: bar\n", sb.toString());
+  }
+
+  public void testPrintNvpIndentedWithDash() throws IOException {
+    AdaptorController.appendNvp(sb, 4, "- foo", "bar");
+    assertEquals("Unexpected NVP", "  - foo: bar\n", sb.toString());
+  }
+
+  public void testPrintNvpIndentedWithStringLiteral() throws IOException {
+    AdaptorController.appendNvp(sb, 4, "foo", "bar", true);
+    assertEquals("Unexpected NVP",
+            "    foo: |\n      bar\n", sb.toString());
+  }
+
+  public void testPrintStartTag() throws IOException {
+    AdaptorController.appendStartTag(sb, "Foo");
+    assertEquals("Unexpected XML", "<Foo>", sb.toString());
+  }
+
+  public void testPrintStartTagWithAttributes() throws IOException {
+    AdaptorController.appendStartTag(sb, "Foo", "a", "A", "b", "B <");
+    assertEquals("Unexpected XML",
+            "<Foo a=\"A\" b=\"B &lt;\">", sb.toString());
+  }
+
+  public void testPrintElement() throws IOException {
+    AdaptorController.appendElement(sb, "Foo", "Bar");
+    assertEquals("Unexpected XML", "<Foo>Bar</Foo>", sb.toString());
+  }
+
+  public void testPrintElementWithAttributes() throws IOException {
+    AdaptorController.appendElement(sb, "Foo", "Bar < -- />", "a", "A", "b", "B");
+    assertEquals("Unexpected XML",
+            "<Foo a=\"A\" b=\"B\">Bar &lt; -- /&gt;</Foo>", sb.toString());
+  }
+
+  public void testPrintEndTag() throws IOException {
+    AdaptorController.appendEndTag(sb, "Foo");
+    assertEquals("Unexpected XML", "</Foo>", sb.toString());
+  }
+}

Propchange: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL



Mime
View raw message