chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shre...@apache.org
Subject svn commit: r1662365 - in /chukwa/trunk: ./ src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/ src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ src/test/j...
Date Thu, 26 Feb 2015 06:44:22 GMT
Author: shreyas
Date: Thu Feb 26 06:44:21 2015
New Revision: 1662365

URL: http://svn.apache.org/r1662365
Log:
CHUKWA-737. Heartbeat adaptor to push status to remote http server (Shreyas Subramanya)

Added:
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/ChukwaStatusChecker.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusChecker.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusCheckerException.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/HttpWriter.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
Modified:
    chukwa/trunk/CHANGES.txt
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java

Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1662365&r1=1662364&r2=1662365&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Thu Feb 26 06:44:21 2015
@@ -5,6 +5,8 @@ Trunk (unreleased changes)
   NEW FEATURES
 
     CHUKWA-736. SSL support for chukwa. (Shreyas Subramanya)
+    
+    CHUKWA-737. Heartbeat adaptor to push status to remote http server (Shreyas Subramanya)
 
   IMPROVEMENTS
 

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java?rev=1662365&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
(added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
Thu Feb 26 06:44:21 2015
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat.StatusChecker;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class HeartbeatAdaptor extends AbstractAdaptor {
+
+  private final Logger log = Logger.getLogger(HeartbeatAdaptor.class);  
+  private Timer timer = new Timer();
+  JSONObject status = new JSONObject();
+  private int period = 3;
+  private List<StatusChecker> allCheckers = new ArrayList<StatusChecker>();
+  private final String DEFAULT_PACKAGE = "org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat";
+  private String arguments;
+  long seqId = 0;
+  private final String STREAM_NAME = "STATUS";
+  private boolean _shouldUseConnector = false;
+  private String _host;
+  private int _port;
+
+  
+  class Task extends TimerTask{
+    @Override
+    public void run() {
+      try {
+        heartbeat();
+      } catch (InterruptedException e) {
+        log.error(ExceptionUtil.getStackTrace(e));
+      }      
+    }
+    @SuppressWarnings("unchecked")
+    private void heartbeat() throws InterruptedException {
+      status.put("time", System.currentTimeMillis());
+      JSONArray array = new JSONArray();
+      for (StatusChecker checker : allCheckers) {
+        array.add(checker.getStatus());
+      }
+      status.put("components", array);
+      if(_shouldUseConnector){
+        ChunkImpl chunk = new ChunkImpl(type, STREAM_NAME, seqId, status.toString()
+            .getBytes(), HeartbeatAdaptor.this);
+        dest.add(chunk);
+      } else {
+        sendDirectly(status.toString());
+      }
+      seqId++;
+    }
+    
+    private void sendDirectly(String data) {
+      DataOutputStream dos = null;
+      Socket sock = null;
+      byte[] bdata = data.getBytes();
+      try {
+        sock = new Socket(_host, _port);
+        dos = new DataOutputStream(sock.getOutputStream());
+        dos.writeInt(bdata.length);
+        dos.write(bdata);
+        dos.flush();
+      } catch (Exception e) {
+        log.debug(ExceptionUtil.getStackTrace(e));
+      } finally {
+        if (dos != null) {
+          try {
+            dos.close();
+          } catch (IOException e) {
+            log.debug("Error closing dataoutput stream:" + e);
+          }
+        }
+        if (sock != null) {
+          try {
+            sock.close();
+          } catch (IOException e) {
+            log.debug("Error closing socket: " + e);
+          }
+        }
+      }
+    }
+
+  }
+  
+  @Override
+  public String getCurrentStatus() {
+    return type + " " + arguments;
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    timer.cancel();
+    return seqId;
+  }
+
+  @Override
+  public void start(long offset) throws AdaptorException {
+    seqId = offset;
+    timer.scheduleAtFixedRate(new Task(), 0, period * 1000);    
+  }
+
+  @Override
+  public String parseArgs(String s) {
+    // match patterns like localhost 1234 (aa host1 port1, bb, cc host2 port2) 60
+    Pattern p1 = Pattern.compile("(\\(.*\\),?)+\\s(\\d+)");
+    Matcher m1 = p1.matcher(s);
+    if(!m1.matches()){
+      log.error("Invalid adaptor parameters. Usage: HeartbeatAdaptor DefaultProcessor <host>
<port> <list-of-status-checkers> <period> <offset>");
+      return null;
+    }
+    try{
+      String providers = m1.group(1);
+      period = Integer.parseInt(m1.group(2));
+      // match pattern like (aa host1 port1, bb, cc host2, port2) and capture the string
without braces
+      Pattern p2 = Pattern.compile("\\(((?:(?:[\\w/:\\.]+\\s*)+,?\\s*)+)\\)");
+      Matcher m2 = p2.matcher(providers);
+      if(!m2.matches()){
+        log.error("Invalid adaptor parameters. Usage: PingAdaptor DefaultProcessor <host>
<port> <list-of-status-providers> <period> <offset>");
+        log.error("Specify list of status-providers as (provider1 args1, provider2 args2...).
Pattern used for matching:"+p2.pattern());
+        return null;
+      }
+      String[] checkerList = m2.group(1).split(",");
+      for(String checker: checkerList){
+        String args[] = checker.trim().split(" ");
+        String checkerName = args[0];
+        try {          
+          Object c = Class.forName(checkerName).newInstance();
+          if(StatusChecker.class.isInstance(c)){
+            StatusChecker sp = (StatusChecker)c;
+            sp.init(Arrays.copyOfRange(args, 1, args.length));
+            allCheckers.add(sp);
+          } else {
+            throw new Exception("Unsupported checker:"+checkerName);
+          }
+        } catch (Exception e) {
+          log.debug("Error instantiating StatusChecker:" + checkerName + " due to " + e);
+          
+          String newProvider = DEFAULT_PACKAGE + "." + checkerName;
+          log.debug("Trying with default package name " + DEFAULT_PACKAGE);
+          try {
+            Object c = Class.forName(newProvider).newInstance();
+            if(StatusChecker.class.isInstance(c)){
+              StatusChecker sp = (StatusChecker)c;
+              sp.init(Arrays.copyOfRange(args, 1, args.length));
+              allCheckers.add(sp);
+            } else {
+              log.error("Unsupported StatusChecker:"+newProvider);
+              return null;
+            }
+          } catch (Exception e1) {
+            log.error("Error instantiating StatusChecker:" + checker + " due to " + e1);
+            log.error(ExceptionUtil.getStackTrace(e1));
+            return null;
+          }
+        }
+      }
+    } catch(NumberFormatException nfe){
+      log.error(ExceptionUtil.getStackTrace(nfe));
+      return null;
+    }
+    arguments = s;
+    Configuration chukwaConf = ChukwaAgent.getStaticConfiguration();
+    _host = chukwaConf.get("chukwa.http.writer.host", "localhost");
+    _port = Integer.parseInt(chukwaConf.get("chukwa.http.writer.port", "8802"));
+    String connector = chukwaConf.get("chukwa.agent.connector");
+    if(connector != null && connector.contains("PipelineConnector")){
+      _shouldUseConnector = true;
+    }
+    return s;
+  }
+}

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/ChukwaStatusChecker.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/ChukwaStatusChecker.java?rev=1662365&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/ChukwaStatusChecker.java
(added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/ChukwaStatusChecker.java
Thu Feb 26 06:44:21 2015
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.json.simple.JSONObject;
+
+public class ChukwaStatusChecker implements StatusChecker {
+  JSONObject status = new JSONObject();
+  ChukwaAgent agent;
+  
+  @SuppressWarnings("unchecked")
+  public ChukwaStatusChecker() throws AdaptorException{
+    agent = ChukwaAgent.getAgent();
+    status.put("component", "Chukwa.Agent");
+    try {
+      status.put("host", InetAddress.getLocalHost().getHostName());
+    } catch (UnknownHostException e) {
+      throw new AdaptorException("Could not get localhost name", e.getCause());
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public JSONObject getStatus() {
+    status.put("adaptor.count", agent.getAdaptorList().size());
+    return status;
+  }
+
+  @Override
+  public void init(String... args) {
+    //not used    
+  }
+
+}

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java?rev=1662365&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
(added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
Thu Feb 26 06:44:21 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+
+/**
+ * Check the status through http interface. Takes the component name to be included ion
+ * the status and the uri as the arguments.
+ *
+ */
+public class HttpStatusChecker implements StatusChecker {
+  private String componentName, uri;
+  private JSONObject status = new JSONObject();
+  Logger log = Logger.getLogger(HttpStatusChecker.class);
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void init(String... args) throws StatusCheckerException {
+    if(args.length != 2){
+      throw new StatusCheckerException("Insufficient number of arguments for HttpStatusChecker");
+    }
+    componentName = args[0];
+    uri = args[1];
+    status.put("component", componentName);
+    status.put("uri", uri);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public JSONObject getStatus() {
+    HttpURLConnection connection = null;
+    try{
+      URL url = new URL(uri);
+      connection = (HttpURLConnection)url.openConnection();
+      connection.connect();
+      status.put("status", "running");
+    } catch (Exception e) {
+      status.put("status", "stopped");    
+    } finally {
+      if(connection != null){
+        connection.disconnect();
+      }
+    }
+    return status;    
+  }
+}

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusChecker.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusChecker.java?rev=1662365&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusChecker.java
(added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusChecker.java
Thu Feb 26 06:44:21 2015
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat;
+
+import org.json.simple.JSONObject;
+
+/**
+ * Any service status being sent through HeartbeatAdaptor should
+ * implement this interface
+ */
+public interface StatusChecker {
+  public void init(String... args) throws StatusCheckerException;
+  public JSONObject getStatus();
+}

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusCheckerException.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusCheckerException.java?rev=1662365&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusCheckerException.java
(added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/StatusCheckerException.java
Thu Feb 26 06:44:21 2015
@@ -0,0 +1,22 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat;
+
+public class StatusCheckerException extends Exception {
+  
+  private static final long serialVersionUID = -1039172824878846049L;
+
+  public StatusCheckerException() {
+    super();
+  }
+
+  public StatusCheckerException(String arg0, Throwable arg1) {
+    super(arg0, arg1);
+  }
+
+  public StatusCheckerException(String arg0) {
+    super(arg0);
+  }
+
+  public StatusCheckerException(Throwable arg0) {
+    super(arg0);
+  }
+}

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/HttpWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/HttpWriter.java?rev=1662365&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/HttpWriter.java
(added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/HttpWriter.java
Thu Feb 26 06:44:21 2015
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+public class HttpWriter extends PipelineableWriter{
+  String _host;
+  int _port;
+  private Set<String> _whiteListSet = new HashSet<String>();
+  private final Logger log = Logger.getLogger(HttpWriter.class);
+
+  @Override
+  public void init(Configuration c) throws WriterException {
+    _host = c.get("chukwa.http.writer.host", "localhost");
+    String port = c.get("chukwa.http.writer.port", "8802");
+    String whiteListProp = c.get("chukwa.http.writer.whitelist", "STATUS");
+    String[] whiteList = whiteListProp.split(",");
+    for(String adaptor:whiteList){
+      _whiteListSet.add(adaptor.trim());
+    }
+    try{
+      _port = Integer.parseInt(port);
+    } catch(NumberFormatException e){
+      throw new WriterException(e);
+    }
+  }
+
+  @Override
+  public void close() throws WriterException {
+  }
+  
+  @Override
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+    CommitStatus rv = ChukwaWriter.COMMIT_OK;
+    DataOutputStream dos = null;
+    Socket sock = null;
+    try{
+      sock = new Socket(_host, _port);
+      dos = new DataOutputStream(sock.getOutputStream());
+      for(Chunk chunk:chunks){
+        if(!_whiteListSet.contains(chunk.getStreamName())){
+          continue;
+        }
+        dos.writeInt(chunk.getData().length);
+        dos.write(chunk.getData());
+        dos.flush();
+      }
+      log.info("Written chunks");
+    } catch(Exception e){
+      throw new WriterException(e);
+    } finally {
+      if(dos != null){
+        try {
+          dos.close();
+        } catch(IOException e) {
+          log.error("Error closing dataoutput stream:" + e);
+        }
+      }
+      if (sock != null) {
+        try {
+          sock.close();
+        } catch (IOException e) {
+          log.error("Error closing socket: " + e);
+        }
+      }
+      if (next != null) {
+        rv = next.add(chunks); //pass data through
+      }
+    }
+    return rv;
+  }
+
+}

Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java?rev=1662365&view=auto
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
(added)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
Thu Feb 26 06:44:21 2015
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.json.simple.parser.JSONParser;
+
+import junit.framework.TestCase;
+
+public class TestHeartbeatAdaptor extends TestCase {
+  private volatile boolean shutdown = false;
+  private final int port = 4321;
+  public void testPingAdaptor() throws IOException, InterruptedException{
+    ChukwaAgent agent = ChukwaAgent.getAgent();
+    Configuration conf = agent.getConfiguration();
+    conf.set("chukwa.http.writer.host", "localhost");
+    conf.set("chukwa.http.writer.port", String.valueOf(port));
+    conf.set("chukwa.pipeline", "org.apache.hadoop.chukwa.datacollection.writer.HttpWriter");
+    agent.connector = new PipelineConnector();
+    agent.connector.start();
+    System.out.println("Started connector");
+    
+    String adaptor = agent.processAddCommand("add HeartbeatAdaptor DefaultProcessor (ChukwaStatusChecker,
HttpStatusChecker Invalid.component http://localhost:4322, HttpStatusChecker Chukwa.rest.server
http://localhost:9090/rest/v2) 3 0");
+    //assertTrue(agent.adaptorCount() == 1);
+    if(agent.connector != null){
+      agent.connector.shutdown();
+    }
+    
+    LocalServer server = new LocalServer();
+    server.start();
+    
+    try {
+      server.join(10000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    if(server.getFailMessage() != null){
+      fail(server.getFailMessage());
+    }
+    assertTrue(server.messageCount > 0);
+    server.interrupt();
+    agent.stopAdaptor(adaptor, false);
+    agent.shutdown();    
+  }
+  
+  class LocalServer extends Thread {
+    ServerSocket sock;
+    String failMessage = null;
+    int messageCount = 0;
+    
+    LocalServer() throws IOException{
+      sock = new ServerSocket();
+      sock.setReuseAddress(true);
+      sock.bind(new InetSocketAddress(port));
+      System.out.println("Started local server");      
+    }
+    
+    //calling fail() from this thread will not cause testcase to fail. So propagate error
to main thread.
+    String getFailMessage(){
+      return failMessage;
+    }
+    
+    int getMessageCount(){
+      return messageCount;
+    }
+    
+    @Override
+    public void run(){
+      while(!shutdown){
+        try {
+          Socket socket = sock.accept();
+          DataInputStream dis = new DataInputStream(socket.getInputStream());
+          int size;
+          try{
+            while((size = dis.readInt()) > 0){
+              if(size > 1024){
+                fail();
+              }
+              messageCount++;
+              byte[] buffer = new byte[size];
+              dis.read(buffer);
+              String data = new String(buffer);
+              System.out.println("Received:"+data);
+              JSONParser json  = new JSONParser();
+              //make sure we have a parseable json
+              json.parse(data);
+            }
+          } catch(java.io.EOFException e){
+            System.out.println("reached end of stream, so closing this socket");
+          } finally {
+            socket.close();            
+          }
+        } catch (Exception e) {
+          failMessage = ExceptionUtil.getStackTrace(e);
+        }
+      }
+    }
+  }
+}

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java?rev=1662365&r1=1662364&r2=1662365&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
(original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
Thu Feb 26 06:44:21 2015
@@ -52,6 +52,7 @@ public class TestAdaptorController exten
   MockHttpServletRequest request;
   MockHttpServletResponse response;
   StringBuilder sb;
+  String adaptor;
 
   protected void setUp() throws Exception {
     agent = ChukwaAgent.getAgent();
@@ -77,11 +78,12 @@ public class TestAdaptorController exten
     request.setContextPath("/foo/bar");
 
     response = new MockHttpServletResponse();
-    agent.processAddCommandE("add org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor
SomeDataType 0");
+    adaptor = agent.processAddCommandE("add org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor
SomeDataType 0");
     sb = new StringBuilder();
   }
 
   protected void tearDown() throws Exception {
+    agent.getAdaptor(adaptor);
     agent.shutdown();
     jettyServer.stop();
   }



Mime
View raw message