chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r892367 - in /hadoop/chukwa/trunk/src: java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java
Date Fri, 18 Dec 2009 20:19:30 GMT
Author: asrabkin
Date: Fri Dec 18 20:19:29 2009
New Revision: 892367

URL: http://svn.apache.org/viewvc?rev=892367&view=rev
Log:
CHUKWA-431. UDP Adaptor.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java?rev=892367&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
(added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
Fri Dec 18 20:19:29 2009
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.net.*;
+import java.util.Arrays;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor;
+import org.apache.log4j.Logger;
+
+public class UDPAdaptor extends AbstractAdaptor {
+
+  static Logger log = Logger.getLogger(UDPAdaptor.class);
+  
+  int portno;
+  DatagramSocket ds;
+  volatile boolean running = true;
+  volatile long bytesReceived = 0;
+  String source;
+  
+  class ListenThread extends Thread {
+    public void run() {
+      byte[] buf = new byte[1024];
+      DatagramPacket dp = new DatagramPacket(buf, buf.length);
+      try {
+        while(running) {
+          ds.receive(dp);
+          byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
+          bytesReceived += trimmedBuf.length;
+          Chunk c = new ChunkImpl(type, source, bytesReceived, trimmedBuf, UDPAdaptor.this);
+          dest.add(c);
+        }
+      } catch(Exception e) {
+        log.error("can't read UDP messages in " + adaptorID, e);
+      }
+    }
+  }
+  ListenThread lt;
+  
+  @Override
+  public String parseArgs(String s) {
+    portno = Integer.parseInt(s);
+    source = "udp:"+portno;
+    return s;
+  }
+
+  @Override
+  public void start(long offset) throws AdaptorException {
+    try {
+      bytesReceived = offset;
+      ds = new DatagramSocket(portno);
+      portno = ds.getLocalPort();
+      lt = new ListenThread();
+      lt.start();
+    } catch(Exception e) {
+      throw new AdaptorException(e);
+    }
+  }
+
+  @Override
+  public String getCurrentStatus() {
+    return type + " " + portno;
+  }
+
+  @Override
+  public void hardStop() throws AdaptorException {
+    shutdown(AdaptorShutdownPolicy.HARD_STOP);
+  }
+
+  @Override
+  public long shutdown() throws AdaptorException {
+    return shutdown(AdaptorShutdownPolicy.GRACEFULLY); 
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    try {
+      running = false;
+      if(shutdownPolicy == AdaptorShutdownPolicy.GRACEFULLY)
+        lt.join();
+    } catch(InterruptedException e) {}
+    return bytesReceived;
+  }
+
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java?rev=892367&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java
Fri Dec 18 20:19:29 2009
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.RCheckFTAdaptor;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.hadoop.chukwa.*;
+import java.net.*;
+
+
+public class TestUDPAdaptor extends TestCase implements ChunkReceiver {
+  volatile boolean receivedOK = false;
+  String STR = "a short string";
+  
+  public void testUDP() throws Exception {
+    UDPAdaptor u = new UDPAdaptor();
+    u.parseArgs("Test", "0", AdaptorManager.NULL);
+    u.start("id", "Test", 0, this);
+    
+    DatagramSocket send = new DatagramSocket();
+    byte[] buf = STR.getBytes();
+    DatagramPacket p = new DatagramPacket(buf, buf.length);
+    p.setSocketAddress(new InetSocketAddress("127.0.0.1",u.portno));
+    send.send(p);
+    
+    synchronized(this) {
+      wait(1000);
+    }
+    assertTrue(receivedOK);
+  }
+  
+  public void add(Chunk c) {
+    assertTrue(c.getDataType().equals("Test"));
+    assertEquals(c.getSeqID(), c.getData().length);
+    assertTrue(STR.equals(new String(c.getData())));
+    receivedOK= true;
+    synchronized(this) {
+      notify();
+    }
+   }
+
+}



Mime
View raw message