cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisb...@apache.org
Subject [2/2] cassandra git commit: Support a means of logging all queries as they were invoked.
Date Mon, 04 Dec 2017 23:09:47 GMT
Support a means of logging all queries as they were invoked.

Patch by Ariel Weisberg; Reviewed by Blake Eggleston


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae837806
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae837806
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae837806

Branch: refs/heads/cassandra-13983-rebased2
Commit: ae837806bd07dbb8b881960feeeeb90c1a665d93
Parents: db81f6b
Author: Ariel Weisberg <aweisberg@apple.com>
Authored: Fri Oct 27 17:16:45 2017 -0400
Committer: Ariel Weisberg <aweisberg@apple.com>
Committed: Mon Dec 4 18:06:30 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   8 +
 bin/fqltool                                     |  88 +++
 bin/fqltool.bat                                 |  36 +
 build.xml                                       |  17 +
 conf/cassandra.yaml                             |   9 +-
 conf/jvm.options                                |   4 +
 lib/chronicle-bytes-1.10.1.jar                  | Bin 0 -> 273664 bytes
 lib/chronicle-core-1.9.21.jar                   | Bin 0 -> 199833 bytes
 lib/chronicle-queue-4.6.55.jar                  | Bin 0 -> 215247 bytes
 lib/chronicle-threads-1.9.1.jar                 | Bin 0 -> 40530 bytes
 lib/chronicle-wire-1.10.1.jar                   | Bin 0 -> 419054 bytes
 lib/licenses/chronicle-bytes-1.10.1.txt         |  14 +
 lib/licenses/chronicle-core-1.9.21.txt          |  14 +
 lib/licenses/chronicle-queue-4.6.55.txt         |  14 +
 lib/licenses/chronicle-threads-1.9.1.txt        |  14 +
 lib/licenses/chronicle-wire-1.10.1.txt          |  14 +
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |   5 +
 .../db/fullquerylog/FullQueryLogger.java        | 530 +++++++++++++++
 .../apache/cassandra/service/StorageProxy.java  |  23 +
 .../cassandra/service/StorageProxyMBean.java    |  21 +
 .../cassandra/tools/FullQueryLogTool.java       |  93 +++
 .../org/apache/cassandra/tools/NodeTool.java    |   3 +
 .../apache/cassandra/tools/fqltool/Dump.java    | 265 ++++++++
 .../tools/nodetool/DisableFullQueryLog.java     |  33 +
 .../tools/nodetool/EnableFullQueryLog.java      |  49 ++
 .../tools/nodetool/ResetFullQueryLog.java       |  33 +
 .../transport/messages/BatchMessage.java        |  21 +
 .../transport/messages/ExecuteMessage.java      |  11 +
 .../transport/messages/QueryMessage.java        |  11 +
 .../org/apache/cassandra/utils/ObjectSizes.java |   1 +
 .../apache/cassandra/utils/binlog/BinLog.java   | 277 ++++++++
 .../utils/concurrent/WeightedQueue.java         | 333 ++++++++++
 test/unit/org/apache/cassandra/Util.java        |   6 +-
 .../db/fullquerylog/FullQueryLoggerTest.java    | 601 +++++++++++++++++
 .../cassandra/utils/binlog/BinLogTest.java      | 449 +++++++++++++
 .../utils/concurrent/WeightedQueueTest.java     | 656 +++++++++++++++++++
 38 files changed, 3652 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8350848..81da11d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Support a means of logging all queries as they were invoked (CASSANDRA-13983)
  * Presize collections (CASSANDRA-13760)
  * Add GroupCommitLogService (CASSANDRA-13530)
  * Parallelize initial materialized view build (CASSANDRA-12245)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 622cc54..a14f7ba 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -34,6 +34,14 @@ New features
      threads is specified by the property `cassandra.yaml:concurrent_materialized_view_builders`.
      This property can be modified at runtime through both JMX and the new `setconcurrentviewbuilders`
      and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 for more details.
+   - There is now a binary full query log based on Chronicle Queue that can be controlled using
+     nodetool enablefullquerylog, disablefullquerylog, and resetfullquerylog. The log 
+     contains all queries invoked, approximate time they were invoked, any parameters necessary
+     to bind wildcard values, and all query options. A human readable version of the log can be
+     dumped or tailed using the new bin/fqltool utility. The full query log is designed to be safe
+     to use in production and limits utilization of heap memory and disk space with limits
+     you can specify when enabling the log.
+     See nodetool and fqltool help text for more information.
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/bin/fqltool
----------------------------------------------------------------------
diff --git a/bin/fqltool b/bin/fqltool
new file mode 100755
index 0000000..8a05af1
--- /dev/null
+++ b/bin/fqltool
@@ -0,0 +1,88 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    # Locations (in order) to use when searching for an include file.
+    for include in "`dirname "$0"`/cassandra.in.sh" \
+                   "$HOME/.cassandra.in.sh" \
+                   /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x "$JAVA_HOME/bin/java" ]; then
+    JAVA="$JAVA_HOME/bin/java"
+else
+    JAVA="`which java`"
+fi
+
+if [ "x$JAVA" = "x" ]; then
+    echo "Java executable not found (hint: set JAVA_HOME)" >&2
+    exit 1
+fi
+
+if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then
+    echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
+    exit 1
+fi
+
+# Run cassandra-env.sh to pick up JMX_PORT
+if [ -f "$CASSANDRA_CONF/cassandra-env.sh" ]; then
+    JVM_OPTS_SAVE=$JVM_OPTS
+    MAX_HEAP_SIZE_SAVE=$MAX_HEAP_SIZE
+    . "$CASSANDRA_CONF/cassandra-env.sh"
+    MAX_HEAP_SIZE=$MAX_HEAP_SIZE_SAVE
+    JVM_OPTS=$JVM_OPTS_SAVE
+fi
+
+# JMX Port passed via cmd line args (-p 9999 / --port 9999 / --port=9999)
+# should override the value from cassandra-env.sh
+ARGS=""
+JVM_ARGS=""
+while true
+do
+  if [ ! $1 ]; then break; fi
+  case $1 in
+    -D*)
+      JVM_ARGS="$JVM_ARGS $1"
+      ;;
+    *)
+      ARGS="$ARGS $1"
+      ;;
+  esac
+  shift
+done
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+    MAX_HEAP_SIZE="512m"
+fi
+
+"$JAVA" $JAVA_AGENT -ea -da:net.openhft... -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
+        -Dlog4j.configurationFile=log4j2-tools.xml \
+        $JVM_ARGS \
+        org.apache.cassandra.tools.FullQueryLogTool $ARGS
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/bin/fqltool.bat
----------------------------------------------------------------------
diff --git a/bin/fqltool.bat b/bin/fqltool.bat
new file mode 100644
index 0000000..f3103d8
--- /dev/null
+++ b/bin/fqltool.bat
@@ -0,0 +1,36 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one or more
+@REM contributor license agreements. See the NOTICE file distributed with
+@REM this work for additional information regarding copyright ownership.
+@REM The ASF licenses this file to You under the Apache License, Version 2.0
+@REM (the "License"); you may not use this file except in compliance with
+@REM the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing, software
+@REM distributed under the License is distributed on an "AS IS" BASIS,
+@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM See the License for the specific language governing permissions and
+@REM limitations under the License.
+
+@echo off
+if "%OS%" == "Windows_NT" setlocal
+
+pushd "%~dp0"
+call cassandra.in.bat
+
+if NOT DEFINED JAVA_HOME goto :err
+
+set CASSANDRA_PARAMS=%CASSANDRA_PARAMS% -Dcassandra.logdir="%CASSANDRA_HOME%\logs"
+
+"%JAVA_HOME%\bin\java" -cp %CASSANDRA_CLASSPATH% %CASSANDRA_PARAMS% -Dlog4j.configurationFile=log4j2-tools.xml org.apache.cassandra.tools.FullQueryLogTool %*
+goto finally
+
+:err
+echo The JAVA_HOME environment variable must be set to run this program!
+pause
+
+:finally
+ENDLOCAL & set RC=%ERRORLEVEL%
+exit /B %RC%

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7c63c82..5dee75d 100644
--- a/build.xml
+++ b/build.xml
@@ -114,6 +114,13 @@
 
     <property name="ecj.version" value="4.4.2"/>
 
+    <!-- https://mvnrepository.com/artifact/net.openhft/chronicle-bom/1.15.6 -->
+    <property name="chronicle-queue.version" value="4.6.55" />
+    <property name="chronicle-core.version" value="1.9.21" />
+    <property name="chronicle-bytes.version" value="1.10.1" />
+    <property name="chronicle-wire.version" value="1.10.1" />
+    <property name="chronicle-threads.version" value="1.9.1" />
+
     <condition property="maven-ant-tasks.jar.exists">
       <available file="${build.dir}/maven-ant-tasks-${maven-ant-tasks.version}.jar" />
     </condition>
@@ -422,6 +429,11 @@
           <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
           <dependency groupId="io.airlift" artifactId="airline" version="0.8" />
           <dependency groupId="io.netty" artifactId="netty-all" version="4.1.14.Final" />
+          <dependency groupId="net.openhft" artifactId="chronicle-queue" version="${chronicle-queue.version}"/>
+          <dependency groupId="net.openhft" artifactId="chronicle-core" version="${chronicle-core.version}"/>
+          <dependency groupId="net.openhft" artifactId="chronicle-bytes" version="${chronicle-bytes.version}"/>
+          <dependency groupId="net.openhft" artifactId="chronicle-wire" version="${chronicle-wire.version}"/>
+          <dependency groupId="net.openhft" artifactId="chronicle-threads" version="${chronicle-threads.version}"/>
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
 	  <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
@@ -620,6 +632,11 @@
         <dependency groupId="com.github.jbellis" artifactId="jamm"/>
 
         <dependency groupId="io.netty" artifactId="netty-all"/>
+        <dependency groupId="net.openhft" artifactId="chronicle-queue" version="${chronicle-queue.version}"/>
+        <dependency groupId="net.openhft" artifactId="chronicle-core" version="${chronicle-core.version}"/>
+        <dependency groupId="net.openhft" artifactId="chronicle-bytes" version="${chronicle-bytes.version}"/>
+        <dependency groupId="net.openhft" artifactId="chronicle-wire" version="${chronicle-wire.version}"/>
+        <dependency groupId="net.openhft" artifactId="chronicle-threads" version="${chronicle-threads.version}"/>
         <dependency groupId="joda-time" artifactId="joda-time"/>
         <dependency groupId="org.fusesource" artifactId="sigar"/>
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 9cfc21c..ba478e7 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1051,9 +1051,9 @@ transparent_data_encryption_options:
     key_alias: testing:1
     # CBC IV length for AES needs to be 16 bytes (which is also the default size)
     # iv_length: 16
-    key_provider: 
+    key_provider:
       - class_name: org.apache.cassandra.security.JKSKeyProvider
-        parameters: 
+        parameters:
           - keystore: conf/.keystore
             keystore_password: cassandra
             store_type: JCEKS
@@ -1165,3 +1165,8 @@ back_pressure_strategy:
 # level for writes without timing out. This is different from the consistency level requested by
 # each write which may be lower in order to facilitate availability.
 # ideal_consistency_level: EACH_QUORUM
+
+# Path to write full query log data to when the full query log is enabled
+# The full query log will recrusively delete the contents of this path at
+# times. Don't place links in this directory to other parts of the filesystem.
+#full_query_log_dir: /tmp/cassandrafullquerylog

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/conf/jvm.options
----------------------------------------------------------------------
diff --git a/conf/jvm.options b/conf/jvm.options
index d937b08..e875a15 100644
--- a/conf/jvm.options
+++ b/conf/jvm.options
@@ -86,6 +86,10 @@
 # enable assertions. highly suggested for correct application functionality.
 -ea
 
+# disable assertions for net.openhft.** because it runs out of memory by design
+# if enabled and run for more than just brief testing
+-da:net.openhft...
+
 # enable thread priorities, primarily so we can give periodic tasks
 # a lower priority to avoid interfering with client workload
 -XX:+UseThreadPriorities

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/chronicle-bytes-1.10.1.jar
----------------------------------------------------------------------
diff --git a/lib/chronicle-bytes-1.10.1.jar b/lib/chronicle-bytes-1.10.1.jar
new file mode 100644
index 0000000..94645b7
Binary files /dev/null and b/lib/chronicle-bytes-1.10.1.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/chronicle-core-1.9.21.jar
----------------------------------------------------------------------
diff --git a/lib/chronicle-core-1.9.21.jar b/lib/chronicle-core-1.9.21.jar
new file mode 100644
index 0000000..03436d5
Binary files /dev/null and b/lib/chronicle-core-1.9.21.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/chronicle-queue-4.6.55.jar
----------------------------------------------------------------------
diff --git a/lib/chronicle-queue-4.6.55.jar b/lib/chronicle-queue-4.6.55.jar
new file mode 100644
index 0000000..09e2b33
Binary files /dev/null and b/lib/chronicle-queue-4.6.55.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/chronicle-threads-1.9.1.jar
----------------------------------------------------------------------
diff --git a/lib/chronicle-threads-1.9.1.jar b/lib/chronicle-threads-1.9.1.jar
new file mode 100644
index 0000000..5e69515
Binary files /dev/null and b/lib/chronicle-threads-1.9.1.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/chronicle-wire-1.10.1.jar
----------------------------------------------------------------------
diff --git a/lib/chronicle-wire-1.10.1.jar b/lib/chronicle-wire-1.10.1.jar
new file mode 100644
index 0000000..e6d8b33
Binary files /dev/null and b/lib/chronicle-wire-1.10.1.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/licenses/chronicle-bytes-1.10.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/chronicle-bytes-1.10.1.txt b/lib/licenses/chronicle-bytes-1.10.1.txt
new file mode 100644
index 0000000..d8a262e
--- /dev/null
+++ b/lib/licenses/chronicle-bytes-1.10.1.txt
@@ -0,0 +1,14 @@
+
+== Copyright 2016 higherfrequencytrading.com
+
+Licensed under the *Apache License, Version 2.0* (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/licenses/chronicle-core-1.9.21.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/chronicle-core-1.9.21.txt b/lib/licenses/chronicle-core-1.9.21.txt
new file mode 100644
index 0000000..d8a262e
--- /dev/null
+++ b/lib/licenses/chronicle-core-1.9.21.txt
@@ -0,0 +1,14 @@
+
+== Copyright 2016 higherfrequencytrading.com
+
+Licensed under the *Apache License, Version 2.0* (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/licenses/chronicle-queue-4.6.55.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/chronicle-queue-4.6.55.txt b/lib/licenses/chronicle-queue-4.6.55.txt
new file mode 100644
index 0000000..d8a262e
--- /dev/null
+++ b/lib/licenses/chronicle-queue-4.6.55.txt
@@ -0,0 +1,14 @@
+
+== Copyright 2016 higherfrequencytrading.com
+
+Licensed under the *Apache License, Version 2.0* (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/licenses/chronicle-threads-1.9.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/chronicle-threads-1.9.1.txt b/lib/licenses/chronicle-threads-1.9.1.txt
new file mode 100644
index 0000000..d8a262e
--- /dev/null
+++ b/lib/licenses/chronicle-threads-1.9.1.txt
@@ -0,0 +1,14 @@
+
+== Copyright 2016 higherfrequencytrading.com
+
+Licensed under the *Apache License, Version 2.0* (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/lib/licenses/chronicle-wire-1.10.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/chronicle-wire-1.10.1.txt b/lib/licenses/chronicle-wire-1.10.1.txt
new file mode 100644
index 0000000..d8a262e
--- /dev/null
+++ b/lib/licenses/chronicle-wire-1.10.1.txt
@@ -0,0 +1,14 @@
+
+== Copyright 2016 higherfrequencytrading.com
+
+Licensed under the *Apache License, Version 2.0* (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index bc91df9..1db8217 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -364,6 +364,8 @@ public class Config
     public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
     public int repair_command_pool_size = concurrent_validations;
 
+    public String full_query_log_dir = null;
+
     /**
      * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9083550..843bca2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2474,4 +2474,9 @@ public class DatabaseDescriptor
     {
         return conf.repair_command_pool_full_strategy;
     }
+
+    public static String getFullQueryLogPath()
+    {
+        return  conf.full_query_log_dir;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/db/fullquerylog/FullQueryLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/fullquerylog/FullQueryLogger.java b/src/java/org/apache/cassandra/db/fullquerylog/FullQueryLogger.java
new file mode 100644
index 0000000..d7cf0dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/fullquerylog/FullQueryLogger.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.fullquerylog;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.ValueOut;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+import org.github.jamm.MemoryLayoutSpecification;
+
+/**
+ * A logger that logs entire query contents after the query finishes (or times out).
+ */
+public class FullQueryLogger
+{
+    private static final int EMPTY_BYTEBUFFER_SIZE = Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0)));
+    private static final int EMPTY_LIST_SIZE = Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0)));
+    private static final int EMPTY_BYTEBUF_SIZE;
+    private static final int OBJECT_HEADER_SIZE = MemoryLayoutSpecification.SPEC.getObjectHeaderSize();
+    static
+    {
+        int tempSize = 0;
+        ByteBuf buf = CBUtil.allocator.buffer(0, 0);
+        try
+        {
+            tempSize = Ints.checkedCast(ObjectSizes.measure(buf));
+        }
+        finally
+        {
+            buf.release();
+        }
+        EMPTY_BYTEBUF_SIZE = tempSize;
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(FullQueryLogger.class);
+    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+    private static final NoSpamLogger.NoSpamLogStatement droppedSamplesStatement = noSpamLogger.getStatement("Dropped {} binary log samples", 1, TimeUnit.MINUTES);
+
+    public static final FullQueryLogger instance = new FullQueryLogger();
+
+    volatile BinLog binLog;
+    private volatile boolean blocking;
+    private Path path;
+
+    private final AtomicLong droppedSamplesSinceLastLog = new AtomicLong();
+
+    private FullQueryLogger()
+    {
+    }
+
+    /**
+     * Configure the global instance of the FullQueryLogger
+     * @param path Dedicated path where the FQL can store it's files.
+     * @param rollCycle How often to roll FQL log segments so they can potentially be reclaimed
+     * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
+     * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
+     * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+     */
+    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    {
+        Preconditions.checkNotNull(path, "path was null");
+        File pathAsFile = path.toFile();
+        Preconditions.checkNotNull(rollCycle, "rollCycle was null");
+        rollCycle = rollCycle.toUpperCase();
+
+        //Exists and is a directory or can be created
+        Preconditions.checkArgument((pathAsFile.exists() && pathAsFile.isDirectory()) || (!pathAsFile.exists() && pathAsFile.mkdirs()), "path exists and is not a directory or couldn't be created");
+        Preconditions.checkArgument(pathAsFile.canRead() && pathAsFile.canWrite() && pathAsFile.canExecute(), "path is not readable, writable, and executable");
+        Preconditions.checkNotNull(RollCycles.valueOf(rollCycle), "unrecognized roll cycle");
+        Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0");
+        Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
+        logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}", path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+        if (binLog != null)
+        {
+            logger.warn("Full query logger already configured. Ignoring requested configuration.");
+            throw new IllegalStateException("Already configured");
+        }
+
+        if (path.toFile().exists())
+        {
+            Throwable error = cleanDirectory(path.toFile(), null);
+            if (error != null)
+            {
+                throw new RuntimeException(error);
+            }
+        }
+
+        this.path = path;
+        this.blocking = blocking;
+        binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, maxLogSize);
+        binLog.start();
+    }
+
+    /**
+     * Need the path as a parameter as well because if the process is restarted the config file might be the only
+     * location for retrieving the path to the full query log files, but JMX also allows you to specify a path
+     * that isn't persisted anywhere so we have to clean that one a well.
+     */
+    public synchronized void reset(String fullQueryLogPath)
+    {
+        try
+        {
+            Set<File> pathsToClean = Sets.newHashSet();
+
+            //First decide whether to clean the path configured in the YAML
+            if (fullQueryLogPath != null)
+            {
+                File fullQueryLogPathFile = new File(fullQueryLogPath);
+                if (fullQueryLogPathFile.exists())
+                {
+                    pathsToClean.add(fullQueryLogPathFile);
+                }
+            }
+
+            //Then decide whether to clean the last used path, possibly configured by JMX
+            if (path != null)
+            {
+                File pathFile = path.toFile();
+                if (pathFile.exists())
+                {
+                    pathsToClean.add(pathFile);
+                }
+            }
+
+            logger.info("Reset (and deactivation) of full query log requested.");
+            if (binLog != null)
+            {
+                logger.info("Stopping full query log. Cleaning {}.", pathsToClean);
+                binLog.stop();
+                binLog = null;
+            }
+            else
+            {
+                logger.info("Full query log already deactivated. Cleaning {}.", pathsToClean);
+            }
+
+            Throwable accumulate = null;
+            for (File f : pathsToClean)
+            {
+                accumulate = cleanDirectory(f, accumulate);
+            }
+            if (accumulate != null)
+            {
+                throw new RuntimeException(accumulate);
+            }
+        }
+        catch (Exception e)
+        {
+            if (e instanceof RuntimeException)
+            {
+                throw (RuntimeException)e;
+            }
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Stop the full query log leaving behind any generated files.
+     */
+    public synchronized void stop()
+    {
+        try
+        {
+            logger.info("Deactivation of full query log requested.");
+            if (binLog != null)
+            {
+                logger.info("Stopping full query log");
+                binLog.stop();
+                binLog = null;
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Check whether the full query log is enabled.
+     * @return true if records are recorded and false otherwise.
+     */
+    public boolean enabled()
+    {
+        return binLog != null;
+    }
+
+    /**
+     * This is potentially lossy, but it's not super critical as we will always generally know
+     * when this is happening and roughly how bad it is.
+     */
+    private void logDroppedSample()
+    {
+        droppedSamplesSinceLastLog.incrementAndGet();
+        if (droppedSamplesStatement.warn(new Object[] {droppedSamplesSinceLastLog.get()}))
+        {
+            droppedSamplesSinceLastLog.set(0);
+        }
+    }
+
+    /**
+     * Log an invocation of a batch of queries
+     * @param type The type of the batch
+     * @param queries CQL text of the queries
+     * @param values Values to bind to as parameters for the queries
+     * @param queryOptions Options associated with the query invocation
+     * @param batchTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
+     */
+    public void logBatch(String type, List<String> queries,  List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
+    {
+        Preconditions.checkNotNull(type, "type was null");
+        Preconditions.checkNotNull(queries, "queries was null");
+        Preconditions.checkNotNull(values, "value was null");
+        Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+        Preconditions.checkArgument(batchTimeMillis > 0, "batchTimeMillis must be > 0");
+
+        //Don't construct the wrapper if the log is disabled
+        BinLog binLog = this.binLog;
+        if (binLog == null)
+        {
+            return;
+        }
+
+        WeighableMarshallableBatch wrappedBatch = new WeighableMarshallableBatch(type, queries, values, queryOptions, batchTimeMillis);
+        logRecord(wrappedBatch, binLog);
+    }
+
+    void logRecord(AbstractWeighableMarshallable record, BinLog binLog)
+    {
+
+        boolean putInQueue = false;
+        try
+        {
+            if (blocking)
+            {
+                try
+                {
+                    binLog.put(record);
+                    putInQueue = true;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+            else
+            {
+                if (!binLog.offer(record))
+                {
+                    logDroppedSample();
+                }
+                else
+                {
+                    putInQueue = true;
+                }
+            }
+        }
+        finally
+        {
+            if (!putInQueue)
+            {
+                record.release();
+            }
+        }
+    }
+
+    /**
+     * Log a single CQL query
+     * @param query CQL query text
+     * @param queryOptions Options associated with the query invocation
+     * @param queryTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
+     */
+    public void logQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
+    {
+        Preconditions.checkNotNull(query, "query was null");
+        Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+        Preconditions.checkArgument(queryTimeMillis > 0, "queryTimeMillis must be > 0");
+
+        //Don't construct the wrapper if the log is disabled
+        BinLog binLog = this.binLog;
+        if (binLog == null)
+        {
+            return;
+        }
+
+        WeighableMarshallableQuery wrappedQuery = new WeighableMarshallableQuery(query, queryOptions, queryTimeMillis);
+        logRecord(wrappedQuery, binLog);
+    }
+
+    private static abstract class AbstractWeighableMarshallable extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
+    {
+        private final ByteBuf queryOptionsBuffer;
+        private final long timeMillis;
+        private final int protocolVersion;
+
+        private AbstractWeighableMarshallable(QueryOptions queryOptions, long timeMillis)
+        {
+            this.timeMillis = timeMillis;
+            ProtocolVersion version = queryOptions.getProtocolVersion();
+            this.protocolVersion = version.asInt();
+            int optionsSize = QueryOptions.codec.encodedSize(queryOptions, version);
+            queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, optionsSize);
+            /*
+             * Struggled with what tradeoff to make in terms of query options which is potentially large and complicated
+             * There is tension between low garbage production (or allocator overhead), small working set size, and CPU overhead reserializing the
+             * query options into binary format.
+             *
+             * I went with the lowest risk most predictable option which is allocator overhead and CPU overhead
+             * rather then keep the original query message around so I could just serialize that as a memcpy. It's more
+             * instructions when turned on, but it doesn't change memory footprint quite as much and it's more pay for what you use
+             * in terms of query volume. The CPU overhead is spread out across producers so we should at least get
+             * some scaling.
+             *
+             */
+            boolean success = false;
+            try
+            {
+                QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, version);
+                success = true;
+            }
+            finally
+            {
+                if (!success)
+                {
+                    queryOptionsBuffer.release();
+                }
+            }
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("protocol-version").int32(protocolVersion);
+            wire.write("query-options").bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
+            wire.write("query-time").int64(timeMillis);
+        }
+
+        @Override
+        public void release()
+        {
+            queryOptionsBuffer.release();
+        }
+
+        //8-bytes for protocol version (assume alignment cost), 8-byte timestamp, 8-byte object header + other contents
+        @Override
+        public int weight()
+        {
+            return 8 + 8 + OBJECT_HEADER_SIZE + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity();
+        }
+    }
+
+    static class WeighableMarshallableBatch extends AbstractWeighableMarshallable
+    {
+        private final int weight;
+        private final String batchType;
+        private final List<String> queries;
+        private final List<List<ByteBuffer>> values;
+
+        public WeighableMarshallableBatch(String batchType, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
+        {
+           super(queryOptions, batchTimeMillis);
+           this.queries = queries;
+           this.values = values;
+           this.batchType = batchType;
+           boolean success = false;
+           try
+           {
+
+               //weight, batch type, queries, values
+               int weightTemp = 8 + EMPTY_LIST_SIZE + EMPTY_LIST_SIZE;
+               for (int ii = 0; ii < queries.size(); ii++)
+               {
+                   weightTemp += ObjectSizes.sizeOf(queries.get(ii));
+               }
+
+               weightTemp += EMPTY_LIST_SIZE * values.size();
+               for (int ii = 0; ii < values.size(); ii++)
+               {
+                   List<ByteBuffer> sublist = values.get(ii);
+                   weightTemp += EMPTY_BYTEBUFFER_SIZE * sublist.size();
+                   for (int zz = 0; zz < sublist.size(); zz++)
+                   {
+                       weightTemp += sublist.get(zz).capacity();
+                   }
+               }
+               weightTemp += super.weight();
+               weightTemp += ObjectSizes.sizeOf(batchType);
+               weight = weightTemp;
+               success = true;
+           }
+           finally
+           {
+               if (!success)
+               {
+                   release();
+               }
+           }
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("type").text("batch");
+            super.writeMarshallable(wire);
+            wire.write("batch-type").text(batchType);
+            ValueOut valueOut = wire.write("queries");
+            valueOut.int32(queries.size());
+            for (String query : queries)
+            {
+                valueOut.text(query);
+            }
+            valueOut = wire.write("values");
+            valueOut.int32(values.size());
+            for (List<ByteBuffer> subValues : values)
+            {
+                valueOut.int32(subValues.size());
+                for (ByteBuffer value : subValues)
+                {
+                    valueOut.bytes(BytesStore.wrap(value));
+                }
+            }
+        }
+
+        @Override
+        public int weight()
+        {
+            return weight;
+        }
+
+    }
+
+    static class WeighableMarshallableQuery extends AbstractWeighableMarshallable
+    {
+        private final String query;
+
+        public WeighableMarshallableQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
+        {
+            super(queryOptions, queryTimeMillis);
+            this.query = query;
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("type").text("single");
+            super.writeMarshallable(wire);
+            wire.write("query").text(query);
+        }
+
+        @Override
+        public int weight()
+        {
+            return Ints.checkedCast(ObjectSizes.sizeOf(query)) + super.weight();
+        }
+    }
+
+    static Throwable cleanDirectory(File directory, Throwable accumulate)
+    {
+        if (!directory.exists())
+        {
+            return Throwables.merge(accumulate, new RuntimeException(String.format("%s does not exists")));
+        }
+        if (!directory.isDirectory())
+        {
+            return Throwables.merge(accumulate, new RuntimeException(String.format("%s is not a directory", directory)));
+        }
+        for (File f : directory.listFiles())
+        {
+            accumulate = deleteRecursively(f, accumulate);
+        }
+        if (accumulate instanceof FSError)
+        {
+            FileUtils.handleFSError((FSError)accumulate);
+        }
+        return accumulate;
+    }
+
+    private static Throwable deleteRecursively(File fileOrDirectory, Throwable accumulate)
+    {
+        if (fileOrDirectory.isDirectory())
+        {
+            for (File f : fileOrDirectory.listFiles())
+            {
+                accumulate = FileUtils.deleteWithConfirm(f, true, accumulate);
+            }
+        }
+        return FileUtils.deleteWithConfirm(fileOrDirectory, true , accumulate);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 48d1f3f..aa5d0cc 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
@@ -51,6 +53,7 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
+import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
@@ -2860,6 +2863,26 @@ public class StorageProxy implements StorageProxyMBean
         return String.format("Updating ideal consistency level new value: %s old value %s", newCL, original.toString());
     }
 
+    @Override
+    public void configureFullQueryLogger(String path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    {
+        path = path != null ? path : DatabaseDescriptor.getFullQueryLogPath();
+        Preconditions.checkNotNull(path, "cassandra.yaml did not set full_query_log_dir and not set as parameter");
+        FullQueryLogger.instance.configure(Paths.get(path), rollCycle, blocking, maxQueueWeight, maxLogSize);
+    }
+
+    @Override
+    public void resetFullQueryLogger()
+    {
+        FullQueryLogger.instance.reset(DatabaseDescriptor.getFullQueryLogPath());
+    }
+
+    @Override
+    public void stopFullQueryLogger()
+    {
+        FullQueryLogger.instance.stop();
+    }
+
     public int getOtcBacklogExpirationInterval() {
         return DatabaseDescriptor.getOtcBacklogExpirationInterval();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index d271769..173d43f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -71,4 +71,25 @@ public interface StorageProxyMBean
 
     public String getIdealConsistencyLevel();
     public String setIdealConsistencyLevel(String cl);
+
+    /**
+     * Start the fully query logger.
+     * @param path Path where the full query log will be stored. If null cassandra.yaml value is used.
+     * @param rollCycle How often to create a new file for query data (MINUTELY, DAILY, HOURLY)
+     * @param blocking Whether threads submitting queries to the query log should block if they can't be drained to the filesystem or alternatively drops samples and log
+     * @param maxQueueWeight How many bytes of query data to queue before blocking or dropping samples
+     * @param maxLogSize How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted.
+     */
+    public void configureFullQueryLogger(String path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize);
+
+    /**
+     * Disable the full query logger if it is enabled.
+     * Also delete any generated files in the last used full query log path as well as the one configure in cassandra.yaml
+     */
+    public void resetFullQueryLogger();
+
+    /**
+     * Stop logging queries but leave any generated files on disk.
+     */
+    public void stopFullQueryLogger();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
new file mode 100644
index 0000000..0d170d9
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.util.List;
+
+import com.google.common.base.Throwables;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Help;
+import io.airlift.airline.ParseArgumentsMissingException;
+import io.airlift.airline.ParseArgumentsUnexpectedException;
+import io.airlift.airline.ParseCommandMissingException;
+import io.airlift.airline.ParseCommandUnrecognizedException;
+import io.airlift.airline.ParseOptionConversionException;
+import io.airlift.airline.ParseOptionMissingException;
+import io.airlift.airline.ParseOptionMissingValueException;
+import org.apache.cassandra.tools.fqltool.Dump;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static com.google.common.collect.Lists.newArrayList;
+
+public class FullQueryLogTool
+{
+    public static void main(String... args)
+    {
+        List<Class<? extends Runnable>> commands = newArrayList(
+                Help.class,
+                Dump.class
+        );
+
+        Cli.CliBuilder<Runnable> builder = Cli.builder("fqltool");
+
+        builder.withDescription("Manipulate the contents of full query log files")
+                 .withDefaultCommand(Help.class)
+                 .withCommands(commands);
+
+        Cli<Runnable> parser = builder.build();
+
+        int status = 0;
+        try
+        {
+            parser.parse(args).run();
+        } catch (IllegalArgumentException |
+                IllegalStateException |
+                ParseArgumentsMissingException |
+                ParseArgumentsUnexpectedException |
+                ParseOptionConversionException |
+                ParseOptionMissingException |
+                ParseOptionMissingValueException |
+                ParseCommandMissingException |
+                ParseCommandUnrecognizedException e)
+        {
+            badUse(e);
+            status = 1;
+        } catch (Throwable throwable)
+        {
+            err(Throwables.getRootCause(throwable));
+            status = 2;
+        }
+
+        System.exit(status);
+    }
+
+    private static void badUse(Exception e)
+    {
+        System.out.println("fqltool: " + e.getMessage());
+        System.out.println("See 'fqltool help' or 'fqltool help <command>'.");
+    }
+
+    private static void err(Throwable e)
+    {
+        System.err.println("error: " + e.getMessage());
+        System.err.println("-- StackTrace --");
+        System.err.println(getStackTraceAsString(e));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 0db422e..59d4ead 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -76,6 +76,8 @@ public class NodeTool
                 EnableGossip.class,
                 DisableGossip.class,
                 EnableHandoff.class,
+                EnableFullQueryLog.class,
+                DisableFullQueryLog.class,
                 GcStats.class,
                 GetBatchlogReplayTrottle.class,
                 GetCompactionThreshold.class,
@@ -100,6 +102,7 @@ public class NodeTool
                 Refresh.class,
                 RemoveNode.class,
                 Assassinate.class,
+                ResetFullQueryLog.class,
                 Repair.class,
                 RepairAdmin.class,
                 ReplayBatchlog.class,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/tools/fqltool/Dump.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/Dump.java
new file mode 100644
index 0000000..9d70b8e
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/Dump.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.File;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.threads.Pauser;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * Dump the contents of a list of paths containing full query logs
+ */
+@Command(name = "dump", description = "Dump the contents of a full query log")
+public class Dump implements Runnable
+{
+    static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray();
+
+    @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true)
+    private List<String> arguments = new ArrayList<>();
+
+    @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.")
+    private String rollCycle = "HOURLY";
+
+    @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records")
+    private boolean follow = false;
+
+    @Override
+    public void run()
+    {
+        dump(arguments, rollCycle, follow);
+    }
+
+    public static void dump(List<String> arguments, String rollCycle, boolean follow)
+    {
+        StringBuilder sb = new StringBuilder();
+        ReadMarshallable reader = wireIn -> {
+            sb.setLength(0);
+            String type = wireIn.read("type").text();
+            sb.append("Type: ").append(type).append(System.lineSeparator());
+            int protocolVersion = wireIn.read("protocol-version").int32();
+            sb.append("Protocol version: ").append(protocolVersion).append(System.lineSeparator());
+            QueryOptions options = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read("query-options").bytesStore().toTemporaryDirectByteBuffer()), ProtocolVersion.decode(protocolVersion));
+            sb.append("Query time: ").append(wireIn.read("query-time").int64()).append(System.lineSeparator());
+            if (type.equals("single"))
+            {
+                sb.append("Query: ").append(wireIn.read("query").text()).append(System.lineSeparator());
+                List<ByteBuffer> values = options.getValues() != null ? options.getValues() : Collections.EMPTY_LIST;
+                sb.append("Values: ").append(System.lineSeparator());
+                valuesToStringBuilder(values, sb);
+            }
+            else
+            {
+                sb.append("Batch type: ").append(wireIn.read("batch-type").text()).append(System.lineSeparator());
+                ValueIn in = wireIn.read("queries");
+                int numQueries = in.int32();
+                List<String> queries = new ArrayList<>();
+                for (int ii = 0; ii < numQueries; ii++)
+                {
+                    queries.add(in.text());
+                }
+                in = wireIn.read("values");
+                int numValues = in.int32();
+                List<List<ByteBuffer>> values = new ArrayList<>();
+                for (int ii = 0; ii < numValues; ii++)
+                {
+                    List<ByteBuffer> subValues = new ArrayList<>();
+                    values.add(subValues);
+                    int numSubValues = in.int32();
+                    for (int zz = 0; zz < numSubValues; zz++)
+                    {
+                        subValues.add(ByteBuffer.wrap(in.bytes()));
+                    }
+                    sb.append("Query: ").append(queries.get(ii)).append(System.lineSeparator());
+                    sb.append("Values: ").append(System.lineSeparator());
+                    valuesToStringBuilder(subValues, sb);
+                }
+            }
+            sb.append(System.lineSeparator());
+            System.out.print(sb.toString());
+            System.out.flush();
+        };
+
+        //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency
+        Pauser pauser = Pauser.millis(100);
+        List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList());
+        List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList());
+        boolean hadWork = true;
+        while (hadWork)
+        {
+            hadWork = false;
+            for (ExcerptTailer tailer : tailers)
+            {
+                while (tailer.readDocument(reader))
+                {
+                    hadWork = true;
+                }
+            }
+
+            if (follow)
+            {
+                if (!hadWork)
+                {
+                    //Chronicle queue doesn't support blocking so use this backoff strategy
+                    pauser.pause();
+                }
+                //Don't terminate the loop even if there wasn't work
+                hadWork = true;
+            }
+        }
+    }
+
+    private static void valuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb)
+    {
+        boolean first = true;
+        for (ByteBuffer value : values)
+        {
+            Bytes bytes = Bytes.wrapForRead(value);
+            long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition());
+            toHexString(bytes, bytes.readPosition(), maxLength2, sb);
+            if (maxLength2 < bytes.readLimit() - bytes.readPosition())
+            {
+                sb.append("... truncated").append(System.lineSeparator());
+            }
+
+            if (first)
+            {
+                first = false;
+            }
+            else
+            {
+                sb.append("-----").append(System.lineSeparator());
+            }
+        }
+    }
+
+    //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy
+    /*
+     * Copyright 2016 higherfrequencytrading.com
+     *
+     * Licensed under the Apache License, Version 2.0 (the "License");
+     * you may not use this file except in compliance with the License.
+     * You may obtain a copy of the License at
+     *
+     *     http://www.apache.org/licenses/LICENSE-2.0
+     *
+     * Unless required by applicable law or agreed to in writing, software
+     * distributed under the License is distributed on an "AS IS" BASIS,
+     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     * See the License for the specific language governing permissions and
+     * limitations under the License.
+     */
+    /**
+     * display the hex data of {@link Bytes} from the position() to the limit()
+     *
+     * @param bytes the buffer you wish to toString()
+     * @return hex representation of the buffer, from example [0D ,OA, FF]
+     */
+    public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder)
+    throws BufferUnderflowException
+    {
+        if (len == 0)
+            return "";
+
+        int width = 16;
+        int[] lastLine = new int[width];
+        String sep = "";
+        long position = bytes.readPosition();
+        long limit = bytes.readLimit();
+
+        try {
+            bytes.readPositionRemaining(offset, len);
+
+            long start = offset / width * width;
+            long end = (offset + len + width - 1) / width * width;
+            for (long i = start; i < end; i += width) {
+                // check for duplicate rows
+                if (i + width < end) {
+                    boolean same = true;
+
+                    for (int j = 0; j < width && i + j < offset + len; j++) {
+                        int ch = bytes.readUnsignedByte(i + j);
+                        same &= (ch == lastLine[j]);
+                        lastLine[j] = ch;
+                    }
+                    if (i > start && same) {
+                        sep = "........\n";
+                        continue;
+                    }
+                }
+                builder.append(sep);
+                sep = "";
+                String str = Long.toHexString(i);
+                for (int j = str.length(); j < 8; j++)
+                    builder.append('0');
+                builder.append(str);
+                for (int j = 0; j < width; j++) {
+                    if (j == width / 2)
+                        builder.append(' ');
+                    if (i + j < offset || i + j >= offset + len) {
+                        builder.append("   ");
+
+                    } else {
+                        builder.append(' ');
+                        int ch = bytes.readUnsignedByte(i + j);
+                        builder.append(HEXI_DECIMAL[ch >> 4]);
+                        builder.append(HEXI_DECIMAL[ch & 15]);
+                    }
+                }
+                builder.append(' ');
+                for (int j = 0; j < width; j++) {
+                    if (j == width / 2)
+                        builder.append(' ');
+                    if (i + j < offset || i + j >= offset + len) {
+                        builder.append(' ');
+
+                    } else {
+                        int ch = bytes.readUnsignedByte(i + j);
+                        if (ch < ' ' || ch > 126)
+                            ch = '\u00B7';
+                        builder.append((char) ch);
+                    }
+                }
+                builder.append("\n");
+            }
+            return builder.toString();
+        } finally {
+            bytes.readLimit(limit);
+            bytes.readPosition(position);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/tools/nodetool/DisableFullQueryLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/DisableFullQueryLog.java b/src/java/org/apache/cassandra/tools/nodetool/DisableFullQueryLog.java
new file mode 100644
index 0000000..802e854
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/DisableFullQueryLog.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "disablefullquerylog", description = "Disable the full query log")
+public class DisableFullQueryLog extends NodeToolCmd
+{
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        probe.getSpProxy().stopFullQueryLogger();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java b/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java
new file mode 100644
index 0000000..624a301
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "enablefullquerylog", description = "Enable full query logging")
+public class EnableFullQueryLog extends NodeToolCmd
+{
+    @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file (MINUTELY, HOURLY, DAILY). Default HOURLY.")
+    private String rollCycle = "HOURLY";
+
+    @Option(title = "blocking", name = {"--blocking"}, description = "If the queue is full whether to block producers or drop samples. Default true.")
+    private boolean blocking = true;
+
+    @Option(title = "max_queue_weight", name = {"--max-queue-weight"}, description = "Maximum number of bytes of query data to queue to disk before blocking or dropping samples. Default 256 megabytes.")
+    private int maxQueueWeight = 256 * 1024 * 1024;
+
+    @Option(title = "max_log_size", name = {"--max-log-size"}, description = "How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted. Default 16 gigabytes.")
+    private long maxLogSize = 16L * 1024L * 1024L * 1024L;
+
+    @Option(title = "path", name = {"--path"}, description = "Path to store the full query log at. Will have it's contents recursively deleted. If not set the value from cassandra.yaml will be used.")
+    private String path = null;
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        probe.getSpProxy().configureFullQueryLogger(path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/tools/nodetool/ResetFullQueryLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/ResetFullQueryLog.java b/src/java/org/apache/cassandra/tools/nodetool/ResetFullQueryLog.java
new file mode 100644
index 0000000..cfc20ab
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/ResetFullQueryLog.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "resetfullquerylog", description = "Stop the full query log and clean files in the configured full query log directory from cassandra.yaml as well as JMX")
+public class ResetFullQueryLog extends NodeToolCmd
+{
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        probe.getSpProxy().resetFullQueryLogger();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index afd7659..dcaa8da 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport.messages;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
@@ -29,6 +30,7 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
@@ -174,20 +176,30 @@ public class BatchMessage extends Message.Request
 
             QueryHandler handler = ClientState.getCQLQueryHandler();
             List<ParsedStatement.Prepared> prepared = new ArrayList<>(queryOrIdList.size());
+            boolean fullQueryLogEnabled = FullQueryLogger.instance.enabled();
+            List<String> queryStrings = fullQueryLogEnabled ? new ArrayList<>(queryOrIdList.size()) : Collections.EMPTY_LIST;
             for (int i = 0; i < queryOrIdList.size(); i++)
             {
                 Object query = queryOrIdList.get(i);
+                String queryString;
                 ParsedStatement.Prepared p;
                 if (query instanceof String)
                 {
                     p = QueryProcessor.parseStatement((String)query,
                                                       state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
+                    queryString = (String)query;
                 }
                 else
                 {
                     p = handler.getPrepared((MD5Digest)query);
                     if (p == null)
                         throw new PreparedQueryNotFoundException((MD5Digest)query);
+                    queryString = p.rawCQLStatement;
+                }
+
+                if (fullQueryLogEnabled)
+                {
+                    queryStrings.add(queryString);
                 }
 
                 List<ByteBuffer> queryValues = values.get(i);
@@ -215,7 +227,16 @@ public class BatchMessage extends Message.Request
             // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a clearly wrong one).
             BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none());
+            long fqlTime = 0;
+            if (fullQueryLogEnabled)
+            {
+                fqlTime = System.currentTimeMillis();
+            }
             Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime);
+            if (fullQueryLogEnabled)
+            {
+                FullQueryLogger.instance.logBatch(batchType.name(), queryStrings, values, options, fqlTime);
+            }
 
             if (tracingId != null)
                 response.setTracingId(tracingId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 0b93d16..e969134 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
+import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -162,7 +163,17 @@ public class ExecuteMessage extends Message.Request
             // Some custom QueryHandlers are interested by the bound names. We provide them this information
             // by wrapping the QueryOptions.
             QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames);
+            boolean fqlEnabled = FullQueryLogger.instance.enabled();
+            long fqlTime = 0;
+            if (fqlEnabled)
+            {
+                fqlTime = System.currentTimeMillis();
+            }
             Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime);
+            if (fqlEnabled)
+            {
+                FullQueryLogger.instance.logQuery(prepared.rawCQLStatement, options, fqlTime);
+            }
 
             if (response instanceof ResultMessage.Rows)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 4c761dd..8f64033 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
@@ -113,7 +114,17 @@ public class QueryMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
             }
 
+            boolean fqlEnabled = FullQueryLogger.instance.enabled();
+            long fqlTime = 0;
+            if (fqlEnabled)
+            {
+                fqlTime = System.currentTimeMillis();
+            }
             Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime);
+            if (fqlEnabled)
+            {
+                FullQueryLogger.instance.logQuery(query, options, fqlTime);
+            }
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/utils/ObjectSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ObjectSizes.java b/src/java/org/apache/cassandra/utils/ObjectSizes.java
index dea2bac..10dad05 100644
--- a/src/java/org/apache/cassandra/utils/ObjectSizes.java
+++ b/src/java/org/apache/cassandra/utils/ObjectSizes.java
@@ -138,6 +138,7 @@ public class ObjectSizes
      * @param str String to calculate memory size of
      * @return Total in-memory size of the String
      */
+    //@TODO hard coding this to 2 isn't necessarily correct in Java 9
     public static long sizeOf(String str)
     {
         return STRING_EMPTY_SIZE + sizeOfArray(str.length(), 2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/utils/binlog/BinLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLog.java b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
new file mode 100644
index 0000000..070a151
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.binlog;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptAppender;
+import net.openhft.chronicle.queue.RollCycle;
+import net.openhft.chronicle.queue.impl.StoreFileListener;
+import net.openhft.chronicle.wire.WireOut;
+import net.openhft.chronicle.wire.WriteMarshallable;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+
+/**
+ * Bin log is a is quick and dirty binary log that is kind of a NIH version of binary logging with a traditional logging
+ * framework. It's goal is good enough performance, predictable footprint, simplicity in terms of implementation and configuration
+ * and most importantly minimal impact on producers of log records.
+ *
+ * Performance safety is accomplished by feeding items to the binary log using a weighted queue and dropping records if the binary log falls
+ * sufficiently far behind.
+ *
+ * Simplicity and good enough perforamance is achieved by using a single log writing thread as well as Chronicle Queue
+ * to handle writing the log, making it available for readers, as well as log rolling.
+ *
+ */
+public class BinLog implements Runnable, StoreFileListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(BinLog.class);
+
+    private ChronicleQueue queue;
+    private ExcerptAppender appender;
+    @VisibleForTesting
+    Thread binLogThread = new NamedThreadFactory("Binary Log thread").newThread(this);
+    final WeightedQueue<ReleaseableWriteMarshallable> sampleQueue;
+    private final long maxLogSize;
+
+    /**
+     * The files in the chronicle queue that have already rolled
+     */
+    private Queue<File> chronicleStoreFiles = new ConcurrentLinkedQueue<>();
+
+    /**
+     * The number of bytes in store files that have already rolled
+     */
+    private long bytesInStoreFiles;
+
+    private static final ReleaseableWriteMarshallable NO_OP = new ReleaseableWriteMarshallable()
+    {
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+        }
+
+        @Override
+        public void release()
+        {
+        }
+    };
+
+    private volatile boolean shouldContinue = true;
+
+    /**
+     *
+     * @param path Path to store the BinLog. Can't be shared with anything else.
+     * @param rollCycle How often to roll the log file so it can potentially be deleted
+     * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
+     * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+     */
+    public BinLog(Path path, RollCycle rollCycle, int maxQueueWeight, long maxLogSize)
+    {
+        Preconditions.checkNotNull(path, "path was null");
+        Preconditions.checkNotNull(rollCycle, "rollCycle was null");
+        Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0");
+        Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
+        ChronicleQueueBuilder builder = ChronicleQueueBuilder.single(path.toFile());
+        builder.rollCycle(rollCycle);
+        builder.storeFileListener(this);
+        queue = builder.build();
+        appender = queue.acquireAppender();
+        sampleQueue = new WeightedQueue<>(maxQueueWeight);
+        this.maxLogSize = maxLogSize;
+    }
+
+    /**
+     * Start the consumer thread that writes log records. Can only be done once.
+     */
+    public void start()
+    {
+        if (!shouldContinue)
+        {
+            throw new IllegalStateException("Can't reuse stopped BinLog");
+        }
+        binLogThread.start();
+    }
+
+    /**
+     * Stop the consumer thread that writes log records. Can be called multiple times.
+     * @throws InterruptedException
+     */
+    public synchronized void stop() throws InterruptedException
+    {
+        if (!shouldContinue)
+        {
+            return;
+        }
+
+        shouldContinue = false;
+        sampleQueue.put(NO_OP);
+        binLogThread.join();
+        appender = null;
+        queue = null;
+    }
+
+    /**
+     * Offer a record to the log. If the in memory queue is full the record will be dropped and offer will return false.
+     * @param record The record to write to the log
+     * @return true if the record was queued and false otherwise
+     */
+    public boolean offer(ReleaseableWriteMarshallable record)
+    {
+        if (!shouldContinue)
+        {
+            return false;
+        }
+
+        return sampleQueue.offer(record);
+    }
+
+    /**
+     * Put a record into the log. If the in memory queue is full the putting thread will be blocked until there is space or it is interrupted.
+     * @param record The record to write to the log
+     * @throws InterruptedException
+     */
+    public void put(ReleaseableWriteMarshallable record) throws InterruptedException
+    {
+        if (!shouldContinue)
+        {
+            return;
+        }
+
+        //Resolve potential deadlock at shutdown when queue is full
+        while (shouldContinue)
+        {
+            if (sampleQueue.offer(record, 1, TimeUnit.SECONDS))
+            {
+                return;
+            }
+        }
+    }
+
+    private void processTasks(List<ReleaseableWriteMarshallable> tasks)
+    {
+        for (int ii = 0; ii < tasks.size(); ii++)
+        {
+            WriteMarshallable t = tasks.get(ii);
+            //Don't write an empty document
+            if (t == NO_OP)
+            {
+                continue;
+            }
+
+            appender.writeDocument(t);
+        }
+    }
+
+    @Override
+    public void run()
+    {
+        List<ReleaseableWriteMarshallable> tasks = new ArrayList<>(16);
+        while (shouldContinue)
+        {
+            try
+            {
+                tasks.clear();
+                ReleaseableWriteMarshallable task = sampleQueue.take();
+                tasks.add(task);
+                sampleQueue.drainTo(tasks, 15);
+
+                processTasks(tasks);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Unexpected exception in binary log thread", t);
+            }
+            finally
+            {
+                for (int ii = 0; ii < tasks.size(); ii++)
+                {
+                    tasks.get(ii).release();
+                }
+            }
+        }
+
+        //Clean up the buffers on thread exit, finalization will check again once this
+        //is no longer reachable ensuring there are no stragglers in the queue.
+        finalize();
+    }
+
+    /**
+     * Track store files as they are added and their storage impact. Delete them if over storage limit.
+     * @param cycle
+     * @param file
+     */
+    public synchronized void onReleased(int cycle, File file)
+    {
+        chronicleStoreFiles.offer(file);
+        //This isn't accurate because the files are sparse, but it's at least pessimistic
+        bytesInStoreFiles += file.length();
+        logger.debug("Chronicle store file {} rolled file size {}", file.getPath(), file.length());
+        while (bytesInStoreFiles > maxLogSize & !chronicleStoreFiles.isEmpty())
+        {
+            File toDelete = chronicleStoreFiles.poll();
+            long toDeleteLength = toDelete.length();
+            if (!toDelete.delete())
+            {
+                logger.error("Failed to delete chronicle store file: {} store file size: {} bytes in store files: {}. " +
+                             "You will need to clean this up manually or reset full query logging.",
+                             toDelete.getPath(), toDeleteLength, bytesInStoreFiles);
+            }
+            else
+            {
+                bytesInStoreFiles -= toDeleteLength;
+                logger.info("Deleted chronicle store file: {} store file size: {} bytes in store files: {} max log size: {}.", file.getPath(), toDeleteLength, bytesInStoreFiles, maxLogSize);
+            }
+        }
+    }
+
+    /**
+     * There is a race where we might not release a buffer, going to let finalization
+     * catch it since it shouldn't happen to a lot of buffers. Only test code would run
+     * into it anyways.
+     */
+    @Override
+    public void finalize()
+    {
+        ReleaseableWriteMarshallable toRelease;
+        while (((toRelease = sampleQueue.poll()) != null))
+        {
+            toRelease.release();
+        }
+    }
+
+    public abstract static class ReleaseableWriteMarshallable implements WriteMarshallable
+    {
+        protected abstract void release();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message