incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1227231 - in /incubator/accumulo/branches/1.3/src: server/src/main/java/org/apache/accumulo/server/trace/ trace/src/main/java/cloudtrace/instrument/receivers/
Date Wed, 04 Jan 2012 17:28:41 GMT
Author: ecn
Date: Wed Jan  4 17:28:41 2012
New Revision: 1227231

URL: http://svn.apache.org/viewvc?rev=1227231&view=rev
Log:
ACCUMULO-243: close the connection on write failures; drop spans if there are no tracers

Modified:
    incubator/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
    incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java
    incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java
    incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java

Modified: incubator/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1227231&r1=1227230&r2=1227231&view=diff
==============================================================================
--- incubator/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
(original)
+++ incubator/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
Wed Jan  4 17:28:41 2012
@@ -16,9 +16,7 @@
  */
 package org.apache.accumulo.server.trace;
 
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.channels.ServerSocketChannel;
+import java.net.InetAddress;
 import java.util.TimerTask;
 
 import org.apache.accumulo.core.Constants;
@@ -30,10 +28,11 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
@@ -41,9 +40,6 @@ import org.apache.thrift.TByteArrayOutpu
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.CreateMode;
@@ -154,15 +150,10 @@ public class TraceServer implements Watc
       }
     }
     
-    int port = conf.getPort(Property.TRACE_PORT);
-    final ServerSocket sock = ServerSocketChannel.open().socket();
-    sock.setReuseAddress(true);
-    sock.bind(new InetSocketAddress(port));
-    final TServerTransport transport = new TServerSocket(sock);
-    server = new TThreadPoolServer(new SpanReceiver.Processor(new Receiver()), transport);
-    final InetSocketAddress address = new InetSocketAddress(Accumulo.getLocalAddress(args),
sock.getLocalPort());
-    registerInZooKeeper(AddressUtil.toString(address));
-    
+    ServerPort serverPort = TServerUtils.startServer(Property.TRACE_PORT, new SpanReceiver.Processor(new
Receiver()), "tracer", "tracer", false);
+    server = serverPort.server;
+    InetAddress address = Accumulo.getLocalAddress(args);
+    registerInZooKeeper(address.getHostAddress() + ":" + serverPort.port);
     writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
   }
   

Modified: incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java?rev=1227231&r1=1227230&r2=1227231&view=diff
==============================================================================
--- incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java
(original)
+++ incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java
Wed Jan  4 17:28:41 2012
@@ -59,7 +59,11 @@ public abstract class AsyncSpanReceiver<
     timer.schedule(new TimerTask() {
       @Override
       public void run() {
-        sendSpans();
+        try {
+          sendSpans();
+        } catch (Exception ex) {
+          log.warn("Exception sending spans to destination", ex);
+        }
       }
       
     }, 0, millis);

Modified: incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java?rev=1227231&r1=1227230&r2=1227231&view=diff
==============================================================================
--- incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java
(original)
+++ incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java
Wed Jan  4 17:28:41 2012
@@ -43,6 +43,8 @@ public class SendSpansViaThrift extends 
   
   @Override
   protected Client createDestination(String destination) throws Exception {
+    if (destination == null)
+      return null;
     try {
       String[] hostAddr = destination.split(":", 2);
       log.debug("Connecting to " + hostAddr[0] + ":" + hostAddr[1]);
@@ -60,7 +62,14 @@ public class SendSpansViaThrift extends 
   
   @Override
   protected void send(Client client, RemoteSpan s) throws Exception {
-    client.span(s);
+    if (client != null) {
+      try {
+        client.span(s);
+      } catch (Exception ex) {
+        client.getInputProtocol().getTransport().close();
+        client = null;
+      }
+    }
   }
   
   protected String getSpanKey(Map<String,String> data) {

Modified: incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java?rev=1227231&r1=1227230&r2=1227231&view=diff
==============================================================================
--- incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java
(original)
+++ incubator/accumulo/branches/1.3/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java
Wed Jan  4 17:28:41 2012
@@ -60,6 +60,37 @@ public class ZooSpanClient extends SendS
     zoo.getChildren(path, true);
   }
   
+  /*
+   * (non-Javadoc)
+   * 
+   * @see cloudtrace.instrument.receivers.AsyncSpanReceiver#flush()
+   */
+  @Override
+  public void flush() {
+    if (!hosts.isEmpty())
+      super.flush();
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see cloudtrace.instrument.receivers.AsyncSpanReceiver#sendSpans()
+   */
+  @Override
+  void sendSpans() {
+    if (hosts.isEmpty()) {
+      if (!sendQueue.isEmpty()) {
+        log.error("No hosts to send data to, dropping queued spans");
+        synchronized (sendQueue) {
+          sendQueue.clear();
+          sendQueue.notifyAll();
+        }
+      }
+    } else {
+      super.sendSpans();
+    }
+  }
+
   synchronized private void updateHosts(String path, List<String> children) {
     log.debug("Scanning trace hosts in zookeeper: " + path);
     try {



Mime
View raw message