incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [02/27] git commit: Fixed BLUR-171.
Date Mon, 29 Jul 2013 12:57:33 GMT
Fixed BLUR-171.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/a455ee70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/a455ee70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/a455ee70

Branch: refs/heads/0.2.0-newtypesystem
Commit: a455ee7067eb75d5bd7390229992878d16f5a14a
Parents: a726ca5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Jul 21 16:10:45 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Jul 21 16:10:45 2013 -0400

----------------------------------------------------------------------
 .../blur/server/ControllerServerContext.java    |  71 ++++++++++
 .../server/ControllerServerEventHandler.java    |  92 ++++++++++++
 .../apache/blur/server/ShardServerContext.java  |  21 +++
 .../blur/server/ShardServerEventHandler.java    |  18 ++-
 .../blur/thrift/ThriftBlurControllerServer.java |   6 +-
 .../blur/thrift/ThriftBlurShardServer.java      |   2 +-
 .../java/org/apache/blur/utils/BlurUtil.java    | 141 ++++++++++++++++++-
 .../server/TServerEventHandler.java             |   3 +-
 .../thrift_0_9_0/server/TSimpleServer.java      |   2 +-
 .../thrift_0_9_0/server/TThreadPoolServer.java  |   2 +-
 .../server/AbstractNonblockingServer.java       |   2 +-
 .../util/ResetableTByteArrayOutputStream.java   |  50 +++++++
 .../thrift/util/ResetableTMemoryBuffer.java     | 109 ++++++++++++++
 distribution/src/main/scripts/conf/log4j.xml    |  43 +++++-
 14 files changed, 543 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-core/src/main/java/org/apache/blur/server/ControllerServerContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/ControllerServerContext.java b/blur-core/src/main/java/org/apache/blur/server/ControllerServerContext.java
new file mode 100644
index 0000000..9fc3bcd
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/ControllerServerContext.java
@@ -0,0 +1,71 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext;
+
+/**
+ * The thrift session that holds the connection string of the client.
+ */
+public class ControllerServerContext implements ServerContext {
+
+  private final static Map<Thread, ControllerServerContext> _threadsToContext = new
ConcurrentHashMap<Thread, ControllerServerContext>();
+  private final SocketAddress _localSocketAddress;
+  private final SocketAddress _remoteSocketAddress;
+  private final String _connectionString;
+
+  public ControllerServerContext(SocketAddress localSocketAddress, SocketAddress remoteSocketAddress)
{
+    _localSocketAddress = localSocketAddress;
+    _remoteSocketAddress = remoteSocketAddress;
+    _connectionString = _localSocketAddress.toString() + "\t" + _remoteSocketAddress.toString();
+  }
+
+  /**
+   * Registers the {@link ControllerServerContext} for this thread.
+   * 
+   * @param context
+   *          the {@link ControllerServerContext}.
+   */
+  public static void registerContextForCall(ControllerServerContext context) {
+    _threadsToContext.put(Thread.currentThread(), context);
+  }
+
+  /**
+   * Gets the {@link ControllerServerContext} for this {@link Thread}.
+   * 
+   * @return the {@link ControllerServerContext}.
+   */
+  public static ControllerServerContext getShardServerContext() {
+    return _threadsToContext.get(Thread.currentThread());
+  }
+
+  public SocketAddress getRocalSocketAddress() {
+    return _localSocketAddress;
+  }
+
+  public SocketAddress getRemoteSocketAddress() {
+    return _remoteSocketAddress;
+  }
+
+  public String getConnectionString() {
+    return _connectionString;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-core/src/main/java/org/apache/blur/server/ControllerServerEventHandler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/ControllerServerEventHandler.java
b/blur-core/src/main/java/org/apache/blur/server/ControllerServerEventHandler.java
new file mode 100644
index 0000000..7ca983d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/ControllerServerEventHandler.java
@@ -0,0 +1,92 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.metrics.MetricsConstants.BLUR;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.TServerEventHandler;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransport;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+/**
+ * {@link ConrtollerServerContext} is the session manager for the controller servers.
+ */
+public class ControllerServerEventHandler implements TServerEventHandler {
+
+  private static final Log LOG = LogFactory.getLog(ControllerServerEventHandler.class);
+  private final Meter _connectionMeter;
+  private final AtomicLong _connections = new AtomicLong();
+
+  public ControllerServerEventHandler() {
+    Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, BLUR, "Connections"), new Gauge<Long>()
{
+      @Override
+      public Long value() {
+        return null;
+      }
+    });
+    _connectionMeter = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, BLUR, "Connections/s"),
"Connections/s",
+        TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void preServe() {
+    LOG.debug("preServe");
+  }
+
+  @Override
+  public ServerContext createContext(TProtocol input, TProtocol output, Object selectionKeyObject)
{
+    LOG.debug("Client connected");
+    SelectionKey selectionKey = (SelectionKey) selectionKeyObject;
+    SocketChannel channel = (SocketChannel) selectionKey.channel();
+    Socket socket = channel.socket();
+    SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
+    SocketAddress localSocketAddress = socket.getLocalSocketAddress();
+    _connectionMeter.mark();
+    _connections.incrementAndGet();
+    return new ControllerServerContext(localSocketAddress, remoteSocketAddress);
+  }
+
+  @Override
+  public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output)
{
+    LOG.debug("Client disconnected");
+    _connections.decrementAndGet();
+  }
+
+  @Override
+  public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport
outputTransport) {
+    LOG.debug("Method called");
+    ControllerServerContext context = (ControllerServerContext) serverContext;
+    ControllerServerContext.registerContextForCall(context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java b/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java
index 431ec96..2b80339 100644
--- a/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java
@@ -17,6 +17,7 @@ package org.apache.blur.server;
  * limitations under the License.
  */
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -39,6 +40,15 @@ public class ShardServerContext implements ServerContext {
 
   private final static Map<Thread, ShardServerContext> _threadsToContext = new ConcurrentHashMap<Thread,
ShardServerContext>();
   private final Map<String, IndexSearcherClosable> _indexSearcherMap = new HashMap<String,
IndexSearcherClosable>();
+  private final SocketAddress _localSocketAddress;
+  private final SocketAddress _remoteSocketAddress;
+  private final String _connectionString;
+
+  public ShardServerContext(SocketAddress localSocketAddress, SocketAddress remoteSocketAddress)
{
+    _localSocketAddress = localSocketAddress;
+    _remoteSocketAddress = remoteSocketAddress;
+    _connectionString = _localSocketAddress.toString() + "\t" + _remoteSocketAddress.toString();
+  }
 
   /**
    * Registers the {@link ShardServerContext} for this thread.
@@ -130,4 +140,15 @@ public class ShardServerContext implements ServerContext {
     return table + "/" + shard;
   }
 
+  public SocketAddress getRocalSocketAddress() {
+    return _localSocketAddress;
+  }
+
+  public SocketAddress getRemoteSocketAddress() {
+    return _remoteSocketAddress;
+  }
+
+  public String getConnectionString() {
+    return _connectionString;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java b/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
index 0f0742a..55fb8d4 100644
--- a/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
+++ b/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
@@ -19,6 +19,10 @@ package org.apache.blur.server;
 import static org.apache.blur.metrics.MetricsConstants.BLUR;
 import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -43,7 +47,7 @@ public class ShardServerEventHandler implements TServerEventHandler {
   private static final Log LOG = LogFactory.getLog(ShardServerEventHandler.class);
   private final Meter _connectionMeter;
   private final AtomicLong _connections = new AtomicLong();
-  
+
   public ShardServerEventHandler() {
     Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, BLUR, "Connections"), new Gauge<Long>()
{
       @Override
@@ -51,7 +55,8 @@ public class ShardServerEventHandler implements TServerEventHandler {
         return null;
       }
     });
-    _connectionMeter = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, BLUR, "Connections/s"),
"Connections/s", TimeUnit.SECONDS);
+    _connectionMeter = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, BLUR, "Connections/s"),
"Connections/s",
+        TimeUnit.SECONDS);
   }
 
   @Override
@@ -60,11 +65,16 @@ public class ShardServerEventHandler implements TServerEventHandler {
   }
 
   @Override
-  public ServerContext createContext(TProtocol input, TProtocol output) {
+  public ServerContext createContext(TProtocol input, TProtocol output, Object selectionKeyObject)
{
     LOG.debug("Client connected");
+    SelectionKey selectionKey = (SelectionKey) selectionKeyObject;
+    SocketChannel channel = (SocketChannel) selectionKey.channel();
+    Socket socket = channel.socket();
+    SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
+    SocketAddress localSocketAddress = socket.getLocalSocketAddress();
     _connectionMeter.mark();
     _connections.incrementAndGet();
-    return new ShardServerContext();
+    return new ShardServerContext(localSocketAddress, remoteSocketAddress);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index 4fea26e..a67c6bf 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -50,6 +50,7 @@ import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
 import org.apache.blur.manager.indexserver.BlurServerShutDown;
 import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.metrics.ReporterSetup;
+import org.apache.blur.server.ControllerServerEventHandler;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
@@ -120,9 +121,11 @@ public class ThriftBlurControllerServer extends ThriftServer {
 
     controllerServer.init();
 
-    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(controllerServer, Iface.class);
+    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(controllerServer, Iface.class,
true);
 
     int threadCount = configuration.getInt(BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT, 32);
+    
+    ControllerServerEventHandler eventHandler = new ControllerServerEventHandler();
 
     final ThriftBlurControllerServer server = new ThriftBlurControllerServer();
     server.setNodeName(nodeName);
@@ -130,6 +133,7 @@ public class ThriftBlurControllerServer extends ThriftServer {
     server.setBindAddress(bindAddress);
     server.setBindPort(bindPort);
     server.setThreadCount(threadCount);
+    server.setEventHandler(eventHandler);
     server.setIface(iface);
 
     int baseGuiPort = Integer.parseInt(configuration.get(BLUR_GUI_CONTROLLER_PORT));

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 25f2f0c..d26bdee 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -218,7 +218,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     shardServer.setConfiguration(configuration);
     shardServer.init();
 
-    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(shardServer, Iface.class);
+    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(shardServer, Iface.class, false);
     if (httpServer != null) {
       WebAppContext context = httpServer.getContext();
       context.addServlet(new ServletHolder(new TServlet(new Blur.Processor<Blur.Iface>(iface),

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index a26d3db..ee0ac9b 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -33,9 +33,11 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +48,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.regex.Pattern;
 
@@ -56,6 +59,8 @@ import org.apache.blur.manager.results.BlurResultComparator;
 import org.apache.blur.manager.results.BlurResultIterable;
 import org.apache.blur.manager.results.BlurResultPeekableIteratorComparator;
 import org.apache.blur.manager.results.PeekableIterator;
+import org.apache.blur.server.ControllerServerContext;
+import org.apache.blur.server.ShardServerContext;
 import org.apache.blur.thirdparty.thrift_0_9_0.TBase;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
@@ -74,6 +79,7 @@ import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.RowMutationType;
 import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.util.ResetableTMemoryBuffer;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -104,6 +110,9 @@ import com.yammer.metrics.core.MetricName;
 
 public class BlurUtil {
 
+  private static final Log REQUEST_LOG = LogFactory.getLog("REQUEST_LOG");
+  private static final Log RESPONSE_LOG = LogFactory.getLog("RESPONSE_LOG");
+
   private static final Object[] EMPTY_OBJECT_ARRAY = new Object[] {};
   private static final Class<?>[] EMPTY_PARAMETER_TYPES = new Class[] {};
   private static final Log LOG = LogFactory.getLog(BlurUtil.class);
@@ -113,30 +122,149 @@ public class BlurUtil {
   public static final Comparator<? super PeekableIterator<BlurResult, BlurException>>
HITS_PEEKABLE_ITERATOR_COMPARATOR = new BlurResultPeekableIteratorComparator();
   public static final Comparator<? super BlurResult> HITS_COMPARATOR = new BlurResultComparator();
   public static final Term PRIME_DOC_TERM = new Term(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE);
+  
+  static class LoggerArgsState {
+    public LoggerArgsState(int size) {
+      _buffer = new ResetableTMemoryBuffer(size);
+      _tjsonProtocol = new TJSONProtocol(_buffer);
+    }
+    TJSONProtocol _tjsonProtocol;
+    ResetableTMemoryBuffer _buffer;
+    StringBuilder _builder = new StringBuilder();
+  }
 
   @SuppressWarnings("unchecked")
-  public static <T extends Iface> T recordMethodCallsAndAverageTimes(final T t, Class<T>
clazz) {
+  public static <T extends Iface> T recordMethodCallsAndAverageTimes(final T t, Class<T>
clazz, final boolean controller) {
     final Map<String, Histogram> histogramMap = new ConcurrentHashMap<String, Histogram>();
     Method[] declaredMethods = Iface.class.getDeclaredMethods();
     for (Method m : declaredMethods) {
       String name = m.getName();
       histogramMap.put(name, Metrics.newHistogram(new MetricName(ORG_APACHE_BLUR, BLUR, name,
THRIFT_CALLS)));
     }
+    final String prefix = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
     InvocationHandler handler = new InvocationHandler() {
+      private final AtomicLong _requestCounter = new AtomicLong();
+      private ThreadLocal<LoggerArgsState> _loggerArgsState = new ThreadLocal<LoggerArgsState>()
{
+        @Override
+        protected LoggerArgsState initialValue() {
+          return new LoggerArgsState(1024);
+        }
+      };
+
       @Override
       public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        long requestNumber = _requestCounter.incrementAndGet();
+        String requestId = prefix + "-" + requestNumber;
+        String connectionString;
+        if (controller) {
+          ControllerServerContext controllerServerContext = ControllerServerContext.getShardServerContext();
+          connectionString = controllerServerContext.getConnectionString();
+        } else {
+          ShardServerContext shardServerContext = ShardServerContext.getShardServerContext();
+          connectionString = shardServerContext.getConnectionString();
+        }
+        String argsStr = null;
         long start = System.nanoTime();
+        String name = method.getName();
+        boolean error = false;
+        LoggerArgsState loggerArgsState = null;
         try {
+          if (REQUEST_LOG.isInfoEnabled()) {
+            if (argsStr == null) {
+              loggerArgsState = _loggerArgsState.get();
+              argsStr = getArgsStr(args, name, loggerArgsState);
+            }
+            REQUEST_LOG.info(requestId + "\t" + connectionString + "\t" + name + "\t" + argsStr);
+          }
           return method.invoke(t, args);
         } catch (InvocationTargetException e) {
+          error = true;
           throw e.getTargetException();
         } finally {
           long end = System.nanoTime();
-          String name = method.getName();
+          double ms = (end - start) / 1000000.0;
+          if (RESPONSE_LOG.isInfoEnabled()) {
+            if (argsStr == null) {
+              if (loggerArgsState == null) {
+                loggerArgsState = _loggerArgsState.get();
+              }
+              argsStr = getArgsStr(args, name, loggerArgsState);
+            }
+            if (error) {
+              RESPONSE_LOG.info(requestId + "\t" + connectionString + "\tERROR\t" + name
+ "\t" + ms + "\t" + argsStr);
+            } else {
+              RESPONSE_LOG
+                  .info(requestId + "\t" + connectionString + "\tSUCCESS\t" + name + "\t"
+ ms + "\t" + argsStr);
+            }
+          }
           Histogram histogram = histogramMap.get(name);
           histogram.update((end - start) / 1000);
         }
       }
+
+      private String getArgsStr(Object[] args, String name, LoggerArgsState loggerArgsState)
{
+        String argsStr;
+        if (name.equals("mutate")) {
+          RowMutation rowMutation = (RowMutation) args[0];
+          if (rowMutation == null) {
+            argsStr = "[null]";
+          } else {
+            argsStr = "[" + rowMutation.getTable() + "," + rowMutation.getRowId() + "]";
+          }
+        } else if (name.equals("mutateBatch")) {
+          argsStr = "[Batch Update]";
+        } else {
+          argsStr = getArgsStr(args, loggerArgsState);
+        }
+        return argsStr;
+      }
+
+      private String getArgsStr(Object[] args, LoggerArgsState loggerArgsState) {
+        if (args == null) {
+          return null;
+        }
+        StringBuilder builder = loggerArgsState._builder;
+        builder.setLength(0);
+        for (Object o : args) {
+          if (builder.length() == 0) {
+            builder.append('[');
+          } else {
+            builder.append(',');
+          }
+          builder.append(getArgsStr(o, loggerArgsState));
+        }
+        if (builder.length() != 0) {
+          builder.append(']');
+        }
+        return builder.toString();
+      }
+
+      @SuppressWarnings("rawtypes")
+      private String getArgsStr(Object o, LoggerArgsState loggerArgsState) {
+        if (o == null) {
+          return null;
+        }
+        if (o instanceof TBase) {
+          return getArgsStr((TBase) o, loggerArgsState);
+        }
+        return o.toString();
+      }
+
+      @SuppressWarnings("rawtypes")
+      private String getArgsStr(TBase o, LoggerArgsState loggerArgsState) {
+        ResetableTMemoryBuffer buffer = loggerArgsState._buffer;
+        TJSONProtocol tjsonProtocol = loggerArgsState._tjsonProtocol;
+        buffer.resetBuffer();
+        tjsonProtocol.reset();
+        try {
+          o.write(tjsonProtocol);
+        } catch (TException e) {
+          LOG.error("Unknown error tyring to write object [{0}] to json.", e, o);
+        }
+        byte[] array = buffer.getArray();
+        int length = buffer.length();
+        return new String(array, 0, length);
+      }
     };
     return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, handler);
   }
@@ -673,10 +801,11 @@ public class BlurUtil {
   public static String getPid() {
     return ManagementFactory.getRuntimeMXBean().getName();
   }
-  
-//  public static <T> BlurIterator<T, BlurException> convert(final Iterator<T>
iterator) {
-//    return convert(iterator, BlurException.class);
-//  }
+
+  // public static <T> BlurIterator<T, BlurException> convert(final Iterator<T>
+  // iterator) {
+  // return convert(iterator, BlurException.class);
+  // }
 
   public static <T, E extends Exception> BlurIterator<T, E> convert(final Iterator<T>
iterator) {
     return new BlurIterator<T, E>() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TServerEventHandler.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TServerEventHandler.java
b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TServerEventHandler.java
index d0ebfe7..d617e64 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TServerEventHandler.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TServerEventHandler.java
@@ -38,9 +38,10 @@ public interface TServerEventHandler {
 
   /**
    * Called when a new client has connected and is about to being processing.
+   * @param frameBuffer 
    */
   ServerContext createContext(TProtocol input,
-                              TProtocol output);
+                              TProtocol output, Object frameBuffer);
 
   /**
    * Called when a client has finished request-handling to delete server

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TSimpleServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TSimpleServer.java
b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TSimpleServer.java
index bad4a60..e75518b 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TSimpleServer.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TSimpleServer.java
@@ -74,7 +74,7 @@ public class TSimpleServer extends TServer {
           inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
           outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
           if (eventHandler_ != null) {
-            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
+            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol,
null);
           }
           while (true) {
             if (eventHandler_ != null) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java
b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java
index fc744f1..07cd950 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java
@@ -193,7 +193,7 @@ public class TThreadPoolServer extends TServer {
 
         eventHandler = getEventHandler();
         if (eventHandler != null) {
-          connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
+          connectionContext = eventHandler.createContext(inputProtocol, outputProtocol, null);
         }
         // we check stopped_ first to make sure we're not supposed to be shutting
         // down. this is necessary for graceful shutdown.

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
b/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
index 5f8708f..5fee759 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
@@ -318,7 +318,7 @@ public abstract class AbstractNonblockingServer extends TServer {
       outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
 
       if (eventHandler_ != null) {
-        context_ = eventHandler_.createContext(inProt_, outProt_);
+        context_ = eventHandler_.createContext(inProt_, outProt_, selectionKey);
       } else {
         context_  = null;
       }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-thrift/src/main/java/org/apache/blur/thrift/util/ResetableTByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/ResetableTByteArrayOutputStream.java
b/blur-thrift/src/main/java/org/apache/blur/thrift/util/ResetableTByteArrayOutputStream.java
new file mode 100644
index 0000000..990d8f0
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/ResetableTByteArrayOutputStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thrift.util;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
+
+/**
+ * Class that allows access to the underlying buf without doing deep copies on
+ * it.
+ * 
+ */
+public class ResetableTByteArrayOutputStream extends TByteArrayOutputStream {
+  public ResetableTByteArrayOutputStream(int size) {
+    super(size);
+  }
+
+  public ResetableTByteArrayOutputStream() {
+    super();
+  }
+
+  public byte[] get() {
+    return buf;
+  }
+
+  public int len() {
+    return count;
+  }
+
+  public void resetBuffer() {
+    count = 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/blur-thrift/src/main/java/org/apache/blur/thrift/util/ResetableTMemoryBuffer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/ResetableTMemoryBuffer.java
b/blur-thrift/src/main/java/org/apache/blur/thrift/util/ResetableTMemoryBuffer.java
new file mode 100644
index 0000000..b28409e
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/ResetableTMemoryBuffer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thrift.util;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransport;
+
+/**
+ * Memory buffer-based implementation of the TTransport interface.
+ */
+public class ResetableTMemoryBuffer extends TTransport {
+  /**
+   * Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The
+   * internal buffer will grow as necessary to accommodate the size of the data
+   * being written to it.
+   */
+  public ResetableTMemoryBuffer(int size) {
+    arr_ = new ResetableTByteArrayOutputStream(size);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() {
+    /* Do nothing */
+  }
+
+  @Override
+  public void close() {
+    /* Do nothing */
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) {
+    byte[] src = arr_.get();
+    int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len);
+    if (amtToRead > 0) {
+      System.arraycopy(src, pos_, buf, off, amtToRead);
+      pos_ += amtToRead;
+    }
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) {
+    arr_.write(buf, off, len);
+  }
+
+  /**
+   * Output the contents of the memory buffer as a String, using the supplied
+   * encoding
+   * 
+   * @param enc
+   *          the encoding to use
+   * @return the contents of the memory buffer as a String
+   */
+  public String toString(String enc) throws UnsupportedEncodingException {
+    return arr_.toString(enc);
+  }
+
+  public String inspect() {
+    String buf = "";
+    byte[] bytes = arr_.toByteArray();
+    for (int i = 0; i < bytes.length; i++) {
+      buf += (pos_ == i ? "==>" : "") + Integer.toHexString(bytes[i] & 0xff) + " ";
+    }
+    return buf;
+  }
+
+  // The contents of the buffer
+  private TByteArrayOutputStream arr_;
+
+  // Position to read next byte from
+  private int pos_;
+
+  public int length() {
+    return arr_.size();
+  }
+
+  public byte[] getArray() {
+    return arr_.get();
+  }
+
+  public void resetBuffer() {
+    ((ResetableTByteArrayOutputStream) arr_).resetBuffer();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a455ee70/distribution/src/main/scripts/conf/log4j.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/scripts/conf/log4j.xml b/distribution/src/main/scripts/conf/log4j.xml
index 518413a..787fc13 100644
--- a/distribution/src/main/scripts/conf/log4j.xml
+++ b/distribution/src/main/scripts/conf/log4j.xml
@@ -21,10 +21,9 @@ under the License.
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
 
-
 	<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
 		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="%-5p %d{yyyyMMdd_HH:mm:ss:sss_z} [%t] %c{2}:
%m%n" />
+			<param name="ConversionPattern" value="%-5p %d{yyyyMMdd_HH:mm:ss:SSS_z} [%t] %c{2}:
%m%n" />
 		</layout>
 	</appender>
 
@@ -37,7 +36,33 @@ under the License.
 		<param name="DatePattern" value="'.'yyyyMMdd" />
 		<param name="Append" value="true" />
 		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="%-5p %d{yyyyMMdd_HH:mm:ss:sss_z} [%t] %c{2}:
%m%n" />
+			<param name="ConversionPattern" value="%-5p %d{yyyyMMdd_HH:mm:ss:SSS_z} [%t] %c{2}:
%m%n" />
+		</layout>
+	</appender>
+	
+	<appender name="ASYNC_REQUEST_LOG_FILE" class="org.apache.log4j.AsyncAppender">
+		<appender-ref ref="REQUEST_LOG_FILE" />
+	</appender>
+	
+	<appender name="REQUEST_LOG_FILE" class="org.apache.log4j.DailyRollingFileAppender">
+		<param name="File" value="${blur.logs.dir}/request_${blur.name}.log" />
+		<param name="DatePattern" value="'.'yyyyMMdd" />
+		<param name="Append" value="true" />
+		<layout class="org.apache.log4j.PatternLayout">
+			<param name="ConversionPattern" value="%d{yyyyMMdd_HH:mm:ss:SSS_z}\t%m\t%t%n" />
+		</layout>
+	</appender>
+	
+	<appender name="ASYNC_RESPONSE_LOG_FILE" class="org.apache.log4j.AsyncAppender">
+		<appender-ref ref="RESPONSE_LOG_FILE" />
+	</appender>
+	
+	<appender name="RESPONSE_LOG_FILE" class="org.apache.log4j.DailyRollingFileAppender">
+		<param name="File" value="${blur.logs.dir}/response_${blur.name}.log" />
+		<param name="DatePattern" value="'.'yyyyMMdd" />
+		<param name="Append" value="true" />
+		<layout class="org.apache.log4j.PatternLayout">
+			<param name="ConversionPattern" value="%d{yyyyMMdd_HH:mm:ss:SSS_z}\t%m\t%t%n" />
 		</layout>
 	</appender>
 	
@@ -45,6 +70,18 @@ under the License.
     	<level value="ERROR" />
 	    <appender-ref ref="ASYNC"/>
 	</logger>
+	
+	<logger name="REQUEST_LOG" additivity="false">
+		<!-- Make value = "INFO"to enable -->
+    	<level value="ERROR" />
+	    <appender-ref ref="ASYNC_REQUEST_LOG_FILE"/>
+	</logger>
+	
+	<logger name="RESPONSE_LOG" additivity="false">
+		<!-- Make value = "INFO"to enable -->
+    	<level value="ERROR" />
+	    <appender-ref ref="ASYNC_RESPONSE_LOG_FILE"/>
+	</logger>
 
 	<!--root>
 	For production use, the async configuration will be better for performance at the cost of
not seeing the final flushed log events.


Mime
View raw message