cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/6] Custom CQL protocol
Date Thu, 05 Jul 2012 16:22:39 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 6cf24f7..181920b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.CounterColumn;
@@ -72,7 +73,6 @@ import org.apache.cassandra.utils.Pair;
 public class SelectStatement implements CQLStatement
 {
     private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
-    private final static ByteBuffer countColumn = ByteBufferUtil.bytes("count");
 
     private final int boundTerms;
     public final CFDefinition cfDef;
@@ -120,7 +120,12 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by RawStatement.prepare()
     }
 
-    public CqlResult execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException
+    public ResultMessage.Rows execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        return new ResultMessage.Rows(executeInternal(state, variables));
+    }
+
+    public ResultSet executeInternal(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException
     {
         List<Row> rows;
         if (isKeyRange())
@@ -132,45 +137,16 @@ public class SelectStatement implements CQLStatement
             rows = getSlice(variables);
         }
 
-        CqlResult result = new CqlResult();
-        result.type = CqlResultType.ROWS;
-
         // Even for count, we need to process the result as it'll group some column together in sparse column families
-        CqlMetadata schema = createSchema();
-        List<CqlRow> cqlRows = process(rows, schema, variables);
-
-        // count resultset is a single column named "count"
-        if (parameters.isCount)
-        {
-            result.schema = new CqlMetadata(Collections.<ByteBuffer, String>emptyMap(),
-                                            Collections.<ByteBuffer, String>emptyMap(),
-                                            "AsciiType",
-                                            "LongType");
-            List<Column> columns = Collections.singletonList(new Column(countColumn).setValue(ByteBufferUtil.bytes((long) cqlRows.size())));
-            result.rows = Collections.singletonList(new CqlRow(countColumn, columns));
-            return result;
-        }
-        else
-        {
-            // otherwise create resultset from query results
-            result.schema = schema;
-            result.rows = cqlRows;
-            return result;
-        }
+        ResultSet rset = process(rows, variables);
+        rset = parameters.isCount ? rset.makeCountResult() : rset;
+        return rset;
     }
 
-    public List<CqlRow> process(List<Row> rows) throws InvalidRequestException
+    public ResultSet process(List<Row> rows) throws InvalidRequestException
     {
         assert !parameters.isCount; // not yet needed
-        return process(rows, createSchema(), Collections.<ByteBuffer>emptyList());
-    }
-
-    private CqlMetadata createSchema()
-    {
-        return new CqlMetadata(new HashMap<ByteBuffer, String>(),
-                               new HashMap<ByteBuffer, String>(),
-                               TypeParser.getShortName(cfDef.cfm.comparator),
-                               TypeParser.getShortName(cfDef.cfm.getDefaultValidator()));
+        return process(rows, Collections.<ByteBuffer>emptyList());
     }
 
     public String keyspace()
@@ -192,8 +168,8 @@ public class SelectStatement implements CQLStatement
         // ...a range (slice) of column names
         if (isColumnRange())
         {
-            ByteBuffer start = getRequestedBound(isReversed ? Bound.END : Bound.START, variables);
-            ByteBuffer finish = getRequestedBound(isReversed ? Bound.START : Bound.END, variables);
+            ByteBuffer start = getColumnStart(variables);
+            ByteBuffer finish = getColumnEnd(variables);
 
             SliceQueryFilter filter = new SliceQueryFilter(start, finish, isReversed, getLimit());
             QueryProcessor.validateSliceFilter(cfDef.cfm, filter);
@@ -505,6 +481,16 @@ public class SelectStatement implements CQLStatement
         return builder.build();
     }
 
+    public ByteBuffer getColumnStart(List<ByteBuffer> variables) throws InvalidRequestException
+    {
+        return getRequestedBound(isReversed ? Bound.END : Bound.START, variables);
+    }
+
+    public ByteBuffer getColumnEnd(List<ByteBuffer> variables) throws InvalidRequestException
+    {
+        return getRequestedBound(isReversed ? Bound.START : Bound.END, variables);
+    }
+
     private List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
     {
         if (metadataRestrictions.isEmpty())
@@ -560,61 +546,63 @@ public class SelectStatement implements CQLStatement
              : c.value();
     }
 
-    private Column makeReturnColumn(Selector s, IColumn c)
+    private void addReturnValue(ResultSet cqlRows, Selector s, IColumn c)
     {
-        Column cqlCol;
-        if (s.hasFunction())
+        if (c == null || c.isMarkedForDelete())
         {
-            cqlCol = new Column(ByteBufferUtil.bytes(s.toString()));
-            if (c == null || c.isMarkedForDelete())
-                return cqlCol;
+            cqlRows.addColumnValue(null);
+            return;
+        }
 
+        if (s.hasFunction())
+        {
             switch (s.function())
             {
                 case WRITE_TIME:
-                    cqlCol.setValue(ByteBufferUtil.bytes(c.timestamp()));
+                    cqlRows.addColumnValue(ByteBufferUtil.bytes(c.timestamp()));
                     break;
                 case TTL:
                     if (c instanceof ExpiringColumn)
                     {
                         int ttl = ((ExpiringColumn)c).getLocalDeletionTime() - (int) (System.currentTimeMillis() / 1000);
-                        cqlCol.setValue(ByteBufferUtil.bytes(ttl));
+                        cqlRows.addColumnValue(ByteBufferUtil.bytes(ttl));
+                    }
+                    else
+                    {
+                        cqlRows.addColumnValue(null);
                     }
                     break;
             }
         }
         else
         {
-            cqlCol = new Column(s.id().key);
-            if (c == null || c.isMarkedForDelete())
-                return cqlCol;
-            cqlCol.setValue(value(c)).setTimestamp(c.timestamp());
+            cqlRows.addColumnValue(value(c));
         }
-        return cqlCol;
     }
 
-    private void addToSchema(CqlMetadata schema, Pair<CFDefinition.Name, Selector> p)
+    private ResultSet createResult(List<Pair<CFDefinition.Name, Selector>> selection)
     {
-        if (p.right.hasFunction())
+        List<ColumnSpecification> names = new ArrayList<ColumnSpecification>(selection.size());
+        for (Pair<CFDefinition.Name, Selector> p : selection)
         {
-            ByteBuffer nameAsRequested = ByteBufferUtil.bytes(p.right.toString());
-            schema.name_types.put(nameAsRequested, TypeParser.getShortName(cfDef.definitionType));
-            switch (p.right.function())
+            if (p.right.hasFunction())
             {
-                case WRITE_TIME:
-                    schema.value_types.put(nameAsRequested, TypeParser.getShortName(LongType.instance));
-                    break;
-                case TTL:
-                    schema.value_types.put(nameAsRequested, TypeParser.getShortName(Int32Type.instance));
-                    break;
+                switch (p.right.function())
+                {
+                    case WRITE_TIME:
+                        names.add(new ColumnSpecification(p.left.ksName, p.left.cfName, new ColumnIdentifier(p.right.toString(), true), LongType.instance));
+                        break;
+                    case TTL:
+                        names.add(new ColumnSpecification(p.left.ksName, p.left.cfName, new ColumnIdentifier(p.right.toString(), true), Int32Type.instance));
+                        break;
+                }
+            }
+            else
+            {
+                names.add(p.left);
             }
         }
-        else
-        {
-            ByteBuffer nameAsRequested = p.right.id().key;
-            schema.name_types.put(nameAsRequested, TypeParser.getShortName(cfDef.getNameComparatorForResultSet(p.left)));
-            schema.value_types.put(nameAsRequested, TypeParser.getShortName(p.left.type));
-        }
+        return new ResultSet(names);
     }
 
     private Iterable<IColumn> columnsInOrder(final ColumnFamily cf, final List<ByteBuffer> variables) throws InvalidRequestException
@@ -657,15 +645,10 @@ public class SelectStatement implements CQLStatement
         };
     }
 
-    private List<CqlRow> process(List<Row> rows, CqlMetadata schema, List<ByteBuffer> variables) throws InvalidRequestException
+    private ResultSet process(List<Row> rows, List<ByteBuffer> variables) throws InvalidRequestException
     {
-        List<CqlRow> cqlRows = new ArrayList<CqlRow>();
         List<Pair<CFDefinition.Name, Selector>> selection = getExpandedSelection();
-        List<Column> thriftColumns = null;
-
-        // Add schema only once
-        for (Pair<CFDefinition.Name, Selector> p : selection)
-            addToSchema(schema, p);
+        ResultSet cqlRows = createResult(selection);
 
         for (org.apache.cassandra.db.Row row : rows)
         {
@@ -681,8 +664,6 @@ public class SelectStatement implements CQLStatement
                     if (c.isMarkedForDelete())
                         continue;
 
-                    thriftColumns = new ArrayList<Column>(selection.size());
-
                     ByteBuffer[] components = null;
 
                     if (cfDef.isComposite)
@@ -703,32 +684,26 @@ public class SelectStatement implements CQLStatement
                     {
                         CFDefinition.Name name = p.left;
                         Selector selector = p.right;
-
-                        addToSchema(schema, p);
-                        Column col;
                         switch (name.kind)
                         {
                             case KEY_ALIAS:
-                                col = new Column(selector.id().key);
-                                col.setValue(row.key.key).setTimestamp(-1L);
+                                cqlRows.addColumnValue(row.key.key);
                                 break;
                             case COLUMN_ALIAS:
-                                col = new Column(selector.id().key);
-                                col.setTimestamp(c.timestamp());
                                 if (cfDef.isComposite)
                                 {
                                     if (name.position < components.length)
-                                        col.setValue(components[name.position]);
+                                        cqlRows.addColumnValue(components[name.position]);
                                     else
-                                        col.setValue(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+                                        cqlRows.addColumnValue(null);
                                 }
                                 else
                                 {
-                                    col.setValue(c.name());
+                                    cqlRows.addColumnValue(c.name());
                                 }
                                 break;
                             case VALUE_ALIAS:
-                                col = makeReturnColumn(selector, c);
+                                addReturnValue(cqlRows, selector, c);
                                 break;
                             case COLUMN_METADATA:
                                 // This should not happen for compact CF
@@ -736,9 +711,7 @@ public class SelectStatement implements CQLStatement
                             default:
                                 throw new AssertionError();
                         }
-                        thriftColumns.add(col);
                     }
-                    cqlRows.add(new CqlRow(row.key.key, thriftColumns));
                 }
             }
             else if (cfDef.isComposite)
@@ -758,7 +731,7 @@ public class SelectStatement implements CQLStatement
                     // If current differs from previous, we've just finished a group
                     if (previous != null && !isSameRow(previous, current))
                     {
-                        cqlRows.add(handleGroup(selection, row.key.key, previous, group, schema));
+                        handleGroup(selection, row.key.key, previous, group, cqlRows);
                         group = new HashMap<ByteBuffer, IColumn>();
                     }
 
@@ -768,7 +741,7 @@ public class SelectStatement implements CQLStatement
                 }
                 // Handle the last group
                 if (previous != null)
-                    cqlRows.add(handleGroup(selection, row.key.key, previous, group, schema));
+                    handleGroup(selection, row.key.key, previous, group, cqlRows);
             }
             else
             {
@@ -776,34 +749,29 @@ public class SelectStatement implements CQLStatement
                     continue;
 
                 // Static case: One cqlRow for all columns
-                thriftColumns = new ArrayList<Column>(selection.size());
-
                 // Respect selection order
                 for (Pair<CFDefinition.Name, Selector> p : selection)
                 {
                     CFDefinition.Name name = p.left;
                     Selector selector = p.right;
-
                     if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS)
                     {
-                        thriftColumns.add(new Column(selector.id().key).setValue(row.key.key).setTimestamp(-1L));
+                        cqlRows.addColumnValue(row.key.key);
                         continue;
                     }
 
                     IColumn c = row.cf.getColumn(name.name.key);
-                    thriftColumns.add(makeReturnColumn(selector, c));
+                    addReturnValue(cqlRows, selector, c);
                 }
-                cqlRows.add(new CqlRow(row.key.key, thriftColumns));
             }
         }
 
         // Internal calls always return columns in the comparator order, even when reverse was set
         if (isReversed)
-            Collections.reverse(cqlRows);
+            cqlRows.reverse();
 
         // Trim result if needed to respect the limit
-        cqlRows = cqlRows.size() > parameters.limit ? cqlRows.subList(0, parameters.limit) : cqlRows;
-
+        cqlRows.trim(parameters.limit);
         return cqlRows;
     }
 
@@ -829,41 +797,32 @@ public class SelectStatement implements CQLStatement
         return true;
     }
 
-    private CqlRow handleGroup(List<Pair<CFDefinition.Name, Selector>> selection, ByteBuffer key, ByteBuffer[] components, Map<ByteBuffer, IColumn> columns, CqlMetadata schema)
+    private void handleGroup(List<Pair<CFDefinition.Name, Selector>> selection, ByteBuffer key, ByteBuffer[] components, Map<ByteBuffer, IColumn> columns, ResultSet cqlRows)
     {
-        List<Column> thriftColumns = new ArrayList<Column>(selection.size());
-
         // Respect requested order
         for (Pair<CFDefinition.Name, Selector> p : selection)
         {
             CFDefinition.Name name = p.left;
             Selector selector = p.right;
-
-            Column col;
             switch (name.kind)
             {
                 case KEY_ALIAS:
-                    col = new Column(selector.id().key);
-                    col.setValue(key).setTimestamp(-1L);
+                    cqlRows.addColumnValue(key);
                     break;
                 case COLUMN_ALIAS:
-                    col = new Column(selector.id().key);
-                    col.setValue(components[name.position]);
-                    col.setTimestamp(-1L);
+                    cqlRows.addColumnValue(components[name.position]);
                     break;
                 case VALUE_ALIAS:
                     // This should not happen for SPARSE
                     throw new AssertionError();
                 case COLUMN_METADATA:
                     IColumn c = columns.get(name.name.key);
-                    col = makeReturnColumn(selector, c);
+                    addReturnValue(cqlRows, selector, c);
                     break;
                 default:
                     throw new AssertionError();
             }
-            thriftColumns.add(col);
         }
-        return new CqlRow(key, thriftColumns);
     }
 
     public static class RawStatement extends CFStatement
@@ -1070,7 +1029,7 @@ public class SelectStatement implements CQLStatement
             if (stmt.keyRestriction != null && stmt.keyRestriction.onToken && stmt.keyRestriction.isEquality() && stmt.keyRestriction.eqValues.size() > 1)
                 throw new InvalidRequestException("Select using the token() function don't support IN clause");
 
-            return new ParsedStatement.Prepared(stmt, Arrays.<CFDefinition.Name>asList(names));
+            return new ParsedStatement.Prepared(stmt, Arrays.<ColumnSpecification>asList(names));
         }
 
         private static boolean isReversedType(CFDefinition.Name name)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index bc12a33..8322af1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.CqlResult;
@@ -53,7 +54,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public CqlResult execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException
+    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index b30ffed..3167b40 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -320,7 +320,7 @@ public class UpdateStatement extends ModificationStatement
             processKeys(cfDef, whereClause, processedKeys, boundNames);
         }
 
-        return new ParsedStatement.Prepared(this, Arrays.<CFDefinition.Name>asList(boundNames));
+        return new ParsedStatement.Prepared(this, Arrays.<ColumnSpecification>asList(boundNames));
     }
 
     public ParsedStatement.Prepared prepare() throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index d63a72a..05b87cd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.InvalidRequestException;
@@ -48,9 +49,9 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
     }
 
-    public CqlResult execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException
+    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException
     {
         state.setKeyspace(keyspace);
-        return null;
+        return new ResultMessage.SetKeyspace(keyspace);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
deleted file mode 100644
index 8c3cc51..0000000
--- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.Iterables;
-import org.apache.log4j.PropertyConfigurator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.Mx4jTool;
-
-/**
- * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
- * service, which defines not only a way to activate and deactivate it, but also
- * hooks into its lifecycle methods (see {@link #setup()}, {@link #start()},
- * {@link #stop()} and {@link #setup()}).
- *
- */
-public abstract class AbstractCassandraDaemon implements CassandraDaemon
-{
-    /**
-     * Initialize logging in such a way that it checks for config changes every 10 seconds.
-     */
-    public static void initLog4j()
-    {
-        if (System.getProperty("log4j.defaultInitOverride","false").equalsIgnoreCase("true"))
-        {
-            String config = System.getProperty("log4j.configuration", "log4j-server.properties");
-            URL configLocation = null;
-            try
-            {
-                // try loading from a physical location first.
-                configLocation = new URL(config);
-            }
-            catch (MalformedURLException ex)
-            {
-                // then try loading from the classpath.
-                configLocation = AbstractCassandraDaemon.class.getClassLoader().getResource(config);
-            }
-
-            if (configLocation == null)
-                throw new RuntimeException("Couldn't figure out log4j configuration: "+config);
-
-            // Now convert URL to a filename
-            String configFileName = null;
-            try
-            {
-                // first try URL.getFile() which works for opaque URLs (file:foo) and paths without spaces
-                configFileName = configLocation.getFile();
-                File configFile = new File(configFileName);
-                // then try alternative approach which works for all hierarchical URLs with or without spaces
-                if (!configFile.exists())
-                    configFileName = new File(configLocation.toURI()).getCanonicalPath();
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException("Couldn't convert log4j configuration location to a valid file", e);
-            }
-
-            PropertyConfigurator.configureAndWatch(configFileName, 10000);
-            org.apache.log4j.Logger.getLogger(AbstractCassandraDaemon.class).info("Logging initialized");
-        }
-    }
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraDaemon.class);
-
-    static final AtomicInteger exceptions = new AtomicInteger();
-
-    protected InetAddress listenAddr;
-    protected int listenPort;
-    protected volatile boolean isRunning = false;
-
-    /**
-     * This is a hook for concrete daemons to initialize themselves suitably.
-     *
-     * Subclasses should override this to finish the job (listening on ports, etc.)
-     *
-     * @throws IOException
-     */
-    protected void setup() throws IOException
-    {
-        logger.info("JVM vendor/version: {}/{}", System.getProperty("java.vm.name"), System.getProperty("java.version") );
-        logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory());
-        logger.info("Classpath: {}", System.getProperty("java.class.path"));
-        CLibrary.tryMlockall();
-
-        listenPort = DatabaseDescriptor.getRpcPort();
-        listenAddr = DatabaseDescriptor.getRpcAddress();
-
-        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
-        {
-            public void uncaughtException(Thread t, Throwable e)
-            {
-                exceptions.incrementAndGet();
-                logger.error("Exception in thread " + t, e);
-                for (Throwable e2 = e; e2 != null; e2 = e2.getCause())
-                {
-                    // some code, like FileChannel.map, will wrap an OutOfMemoryError in another exception
-                    if (e2 instanceof OutOfMemoryError)
-                        System.exit(100);
-                }
-            }
-        });
-
-        // check all directories(data, commitlog, saved cache) for existence and permission
-        Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
-                                                 Arrays.asList(new String[] {DatabaseDescriptor.getCommitLogLocation(),
-                                                                             DatabaseDescriptor.getSavedCachesLocation()}));
-        for (String dataDir : dirs)
-        {
-            logger.debug("Checking directory {}", dataDir);
-            File dir = new File(dataDir);
-            if (dir.exists())
-                assert dir.isDirectory() && dir.canRead() && dir.canWrite() && dir.canExecute()
-                    : String.format("Directory %s is not accessible.", dataDir);
-        }
-
-        // Migrate sstables from pre-#2749 to the correct location
-        if (Directories.sstablesNeedsMigration())
-            Directories.migrateSSTables();
-
-        if (CacheService.instance == null) // should never happen
-            throw new RuntimeException("Failed to initialize Cache Service.");
-
-        // check the system table to keep user from shooting self in foot by changing partitioner, cluster name, etc.
-        // we do a one-off scrub of the system table first; we can't load the list of the rest of the tables,
-        // until system table is opened.
-        for (CFMetaData cfm : Schema.instance.getTableMetaData(Table.SYSTEM_TABLE).values())
-            ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_TABLE, cfm.cfName);
-        try
-        {
-            SystemTable.checkHealth();
-        }
-        catch (ConfigurationException e)
-        {
-            logger.error("Fatal exception during initialization", e);
-            System.exit(100);
-        }
-
-        // load keyspace descriptions.
-        try
-        {
-            DatabaseDescriptor.loadSchemas();
-        }
-        catch (IOException e)
-        {
-            logger.error("Fatal exception during initialization", e);
-            System.exit(100);
-        }
-
-        // clean up debris in the rest of the tables
-        for (String table : Schema.instance.getTables())
-        {
-            for (CFMetaData cfm : Schema.instance.getTableMetaData(table).values())
-            {
-                ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName);
-            }
-        }
-
-        // initialize keyspaces
-        for (String table : Schema.instance.getTables())
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("opening keyspace " + table);
-            Table.open(table);
-        }
-
-        if (CacheService.instance.keyCache.size() > 0)
-            logger.info("completed pre-loading ({} keys) key cache.", CacheService.instance.keyCache.size());
-
-        if (CacheService.instance.rowCache.size() > 0)
-            logger.info("completed pre-loading ({} keys) row cache.", CacheService.instance.rowCache.size());
-
-        try
-        {
-            GCInspector.instance.start();
-        }
-        catch (Throwable t)
-        {
-            logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)");
-        }
-
-        // replay the log if necessary
-        CommitLog.instance.recover();
-
-        SystemTable.finishStartup();
-
-        // start server internals
-        StorageService.instance.registerDaemon(this);
-        try
-        {
-            StorageService.instance.initServer();
-        }
-        catch (ConfigurationException e)
-        {
-            logger.error("Fatal configuration error", e);
-            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
-            System.exit(1);
-        }
-
-        Mx4jTool.maybeLoad();
-    }
-
-    /**
-     * Initialize the Cassandra Daemon based on the given <a
-     * href="http://commons.apache.org/daemon/jsvc.html">Commons
-     * Daemon</a>-specific arguments. To clarify, this is a hook for JSVC.
-     *
-     * @param arguments
-     *            the arguments passed in from JSVC
-     * @throws IOException
-     */
-    public void init(String[] arguments) throws IOException
-    {
-        setup();
-    }
-
-    /**
-     * Start the Cassandra Daemon, assuming that it has already been
-     * initialized via {@link #init(String[])}
-     *
-     * Hook for JSVC
-     *
-     * @throws IOException
-     */
-    public void start()
-    {
-        if (Boolean.parseBoolean(System.getProperty("cassandra.start_rpc", "true")))
-        {
-            startRPCServer();
-        }
-        else
-        {
-            logger.info("Not starting RPC server as requested. Use JMX (StorageService->startRPCServer()) to start it");
-        }
-    }
-
-    /**
-     * Stop the daemon, ideally in an idempotent manner.
-     *
-     * Hook for JSVC
-     */
-    public void stop()
-    {
-        // this doesn't entirely shut down Cassandra, just the RPC server.
-        // jsvc takes care of taking the rest down
-        logger.info("Cassandra shutting down...");
-        stopRPCServer();
-    }
-
-    /**
-     * Start the underlying RPC server in idempotent manner.
-     */
-    public void startRPCServer()
-    {
-        if (!isRunning)
-        {
-            startServer();
-            isRunning = true;
-        }
-    }
-
-    /**
-     * Stop the underlying RPC server in idempotent manner.
-     */
-    public void stopRPCServer()
-    {
-        if (isRunning)
-        {
-            stopServer();
-            isRunning = false;
-        }
-    }
-
-    /**
-     * Returns whether the underlying RPC server is running or not.
-     */
-    public boolean isRPCServerRunning()
-    {
-        return isRunning;
-    }
-
-    /**
-     * Start the underlying RPC server.
-     * This method shoud be able to restart a server stopped through stopServer().
-     * Should throw a RuntimeException if the server cannot be started
-     */
-    protected abstract void startServer();
-
-    /**
-     * Stop the underlying RPC server.
-     * This method should be able to stop server started through startServer().
-     * Should throw a RuntimeException if the server cannot be stopped
-     */
-    protected abstract void stopServer();
-
-
-    /**
-     * Clean up all resources obtained during the lifetime of the daemon. This
-     * is a hook for JSVC.
-     */
-    public void destroy()
-    {}
-
-    /**
-     * A convenience method to initialize and start the daemon in one shot.
-     */
-    public void activate()
-    {
-        String pidFile = System.getProperty("cassandra-pidfile");
-
-        try
-        {
-            setup();
-
-            if (pidFile != null)
-            {
-                new File(pidFile).deleteOnExit();
-            }
-
-            if (System.getProperty("cassandra-foreground") == null)
-            {
-                System.out.close();
-                System.err.close();
-            }
-
-            start();
-        }
-        catch (Throwable e)
-        {
-            logger.error("Exception encountered during startup", e);
-
-            // try to warn user on stdout too, if we haven't already detached
-            e.printStackTrace();
-            System.out.println("Exception encountered during startup: " + e.getMessage());
-
-            System.exit(3);
-        }
-    }
-
-    /**
-     * A convenience method to stop and destroy the daemon in one shot.
-     */
-    public void deactivate()
-    {
-        stop();
-        destroy();
-    }
-
-    /**
-     * A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool
-     * interface (for integration with Avro), and performs ClientState cleanup.
-     *
-     * (Note that the tasks being executed perform their own while-command-process
-     * loop until the client disconnects.)
-     */
-    public static class CleaningThreadPool extends ThreadPoolExecutor
-    {
-        private final ThreadLocal<ClientState> state;
-        public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread, int maxWorkerThreads)
-        {
-            super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("Thrift"));
-            this.state = state;
-        }
-
-        @Override
-        protected void afterExecute(Runnable r, Throwable t)
-        {
-            super.afterExecute(r, t);
-            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
-            state.get().logout();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index e4e23bf..af0cd4f 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -17,15 +17,236 @@
  */
 package org.apache.cassandra.service;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Iterables;
+import org.apache.log4j.PropertyConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.thrift.ThriftServer;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.Mx4jTool;
 
 /**
- * The <code>CassandraDaemon</code> interface captures the lifecycle of a
- * Cassandra daemon that runs on a single node.
- *
+ * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
+ * service, which defines not only a way to activate and deactivate it, but also
+ * hooks into its lifecycle methods (see {@link #setup()}, {@link #start()},
+ * {@link #stop()} and {@link #setup()}).
  */
-public interface CassandraDaemon
+public class CassandraDaemon
 {
+    static
+    {
+        initLog4j();
+    }
+
+    /**
+     * Initialize logging in such a way that it checks for config changes every 10 seconds.
+     */
+    public static void initLog4j()
+    {
+        if (System.getProperty("log4j.defaultInitOverride","false").equalsIgnoreCase("true"))
+        {
+            String config = System.getProperty("log4j.configuration", "log4j-server.properties");
+            URL configLocation = null;
+            try
+            {
+                // try loading from a physical location first.
+                configLocation = new URL(config);
+            }
+            catch (MalformedURLException ex)
+            {
+                // then try loading from the classpath.
+                configLocation = CassandraDaemon.class.getClassLoader().getResource(config);
+            }
+
+            if (configLocation == null)
+                throw new RuntimeException("Couldn't figure out log4j configuration: "+config);
+
+            // Now convert URL to a filename
+            String configFileName = null;
+            try
+            {
+                // first try URL.getFile() which works for opaque URLs (file:foo) and paths without spaces
+                configFileName = configLocation.getFile();
+                File configFile = new File(configFileName);
+                // then try alternative approach which works for all hierarchical URLs with or without spaces
+                if (!configFile.exists())
+                    configFileName = new File(configLocation.toURI()).getCanonicalPath();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException("Couldn't convert log4j configuration location to a valid file", e);
+            }
+
+            PropertyConfigurator.configureAndWatch(configFileName, 10000);
+            org.apache.log4j.Logger.getLogger(CassandraDaemon.class).info("Logging initialized");
+        }
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
+
+    private static final CassandraDaemon instance = new CassandraDaemon();
+
+    static final AtomicInteger exceptions = new AtomicInteger();
+
+    public Server thriftServer;
+    public Server nativeServer;
+
+    /**
+     * This is a hook for concrete daemons to initialize themselves suitably.
+     *
+     * Subclasses should override this to finish the job (listening on ports, etc.)
+     *
+     * @throws IOException
+     */
+    protected void setup() throws IOException
+    {
+        logger.info("JVM vendor/version: {}/{}", System.getProperty("java.vm.name"), System.getProperty("java.version") );
+        logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory());
+        logger.info("Classpath: {}", System.getProperty("java.class.path"));
+        CLibrary.tryMlockall();
+
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
+        {
+            public void uncaughtException(Thread t, Throwable e)
+            {
+                exceptions.incrementAndGet();
+                logger.error("Exception in thread " + t, e);
+                for (Throwable e2 = e; e2 != null; e2 = e2.getCause())
+                {
+                    // some code, like FileChannel.map, will wrap an OutOfMemoryError in another exception
+                    if (e2 instanceof OutOfMemoryError)
+                        System.exit(100);
+                }
+            }
+        });
+
+        // check all directories(data, commitlog, saved cache) for existence and permission
+        Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
+                                                 Arrays.asList(new String[] {DatabaseDescriptor.getCommitLogLocation(),
+                                                                             DatabaseDescriptor.getSavedCachesLocation()}));
+        for (String dataDir : dirs)
+        {
+            logger.debug("Checking directory {}", dataDir);
+            File dir = new File(dataDir);
+            if (dir.exists())
+                assert dir.isDirectory() && dir.canRead() && dir.canWrite() && dir.canExecute()
+                    : String.format("Directory %s is not accessible.", dataDir);
+        }
+
+        // Migrate sstables from pre-#2749 to the correct location
+        if (Directories.sstablesNeedsMigration())
+            Directories.migrateSSTables();
+
+        if (CacheService.instance == null) // should never happen
+            throw new RuntimeException("Failed to initialize Cache Service.");
+
+        // check the system table to keep user from shooting self in foot by changing partitioner, cluster name, etc.
+        // we do a one-off scrub of the system table first; we can't load the list of the rest of the tables,
+        // until system table is opened.
+        for (CFMetaData cfm : Schema.instance.getTableMetaData(Table.SYSTEM_TABLE).values())
+            ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_TABLE, cfm.cfName);
+        try
+        {
+            SystemTable.checkHealth();
+        }
+        catch (ConfigurationException e)
+        {
+            logger.error("Fatal exception during initialization", e);
+            System.exit(100);
+        }
+
+        // load keyspace descriptions.
+        try
+        {
+            DatabaseDescriptor.loadSchemas();
+        }
+        catch (IOException e)
+        {
+            logger.error("Fatal exception during initialization", e);
+            System.exit(100);
+        }
+
+        // clean up debris in the rest of the tables
+        for (String table : Schema.instance.getTables())
+        {
+            for (CFMetaData cfm : Schema.instance.getTableMetaData(table).values())
+            {
+                ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName);
+            }
+        }
+
+        // initialize keyspaces
+        for (String table : Schema.instance.getTables())
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("opening keyspace " + table);
+            Table.open(table);
+        }
+
+        if (CacheService.instance.keyCache.size() > 0)
+            logger.info("completed pre-loading ({} keys) key cache.", CacheService.instance.keyCache.size());
+
+        if (CacheService.instance.rowCache.size() > 0)
+            logger.info("completed pre-loading ({} keys) row cache.", CacheService.instance.rowCache.size());
+
+        try
+        {
+            GCInspector.instance.start();
+        }
+        catch (Throwable t)
+        {
+            logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)");
+        }
+
+        // replay the log if necessary
+        CommitLog.instance.recover();
+
+        SystemTable.finishStartup();
+
+        // start server internals
+        StorageService.instance.registerDaemon(this);
+        try
+        {
+            StorageService.instance.initServer();
+        }
+        catch (ConfigurationException e)
+        {
+            logger.error("Fatal configuration error", e);
+            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
+            System.exit(1);
+        }
+
+        Mx4jTool.maybeLoad();
+
+        // Thift
+        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
+        int rpcPort = DatabaseDescriptor.getRpcPort();
+        thriftServer = new ThriftServer(rpcAddr, rpcPort);
+
+        // Native transport
+        InetAddress nativeAddr = DatabaseDescriptor.getNativeTransportAddress();
+        int nativePort = DatabaseDescriptor.getNativeTransportPort();
+        nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+    }
+
     /**
      * Initialize the Cassandra Daemon based on the given <a
      * href="http://commons.apache.org/daemon/jsvc.html">Commons
@@ -35,38 +256,130 @@ public interface CassandraDaemon
      *            the arguments passed in from JSVC
      * @throws IOException
      */
-    public void init(String[] arguments) throws IOException;
+    public void init(String[] arguments) throws IOException
+    {
+        setup();
+    }
 
     /**
      * Start the Cassandra Daemon, assuming that it has already been
-     * initialized (via {@link CassandraDaemon#init(String[])})
+     * initialized via {@link #init(String[])}
+     *
+     * Hook for JSVC
+     *
      * @throws IOException
      */
-    public void start() throws IOException;
+    public void start()
+    {
+        String rpcFlag = System.getProperty("cassandra.start_rpc");
+        if ((rpcFlag != null && Boolean.parseBoolean(rpcFlag)) || (rpcFlag == null && DatabaseDescriptor.startRpc()))
+            thriftServer.start();
+        else
+            logger.info("Not starting RPC server as requested. Use JMX (StorageService->startRPCServer()) to start it");
+
+        String nativeFlag = System.getProperty("cassandra.start_native_transport");
+        if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
+            nativeServer.start();
+        else
+            logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) to start it");
+    }
 
     /**
      * Stop the daemon, ideally in an idempotent manner.
+     *
+     * Hook for JSVC
      */
-    public void stop();
+    public void stop()
+    {
+        // this doesn't entirely shut down Cassandra, just the RPC server.
+        // jsvc takes care of taking the rest down
+        logger.info("Cassandra shutting down...");
+        thriftServer.stop();
+        nativeServer.stop();
+    }
+
 
     /**
-     * Clean up all resources obtained during the lifetime of the daemon. Just
-     * to clarify, this is a hook for JSVC.
+     * Clean up all resources obtained during the lifetime of the daemon. This
+     * is a hook for JSVC.
      */
-    public void destroy();
-
-    public void startRPCServer();
-    public void stopRPCServer();
-    public boolean isRPCServerRunning();
+    public void destroy()
+    {}
 
     /**
      * A convenience method to initialize and start the daemon in one shot.
      */
-    public void activate();
+    public void activate()
+    {
+        String pidFile = System.getProperty("cassandra-pidfile");
+
+        try
+        {
+            setup();
+
+            if (pidFile != null)
+            {
+                new File(pidFile).deleteOnExit();
+            }
+
+            if (System.getProperty("cassandra-foreground") == null)
+            {
+                System.out.close();
+                System.err.close();
+            }
+
+            start();
+        }
+        catch (Throwable e)
+        {
+            logger.error("Exception encountered during startup", e);
+
+            // try to warn user on stdout too, if we haven't already detached
+            e.printStackTrace();
+            System.out.println("Exception encountered during startup: " + e.getMessage());
+
+            System.exit(3);
+        }
+    }
 
     /**
      * A convenience method to stop and destroy the daemon in one shot.
      */
-    public void deactivate();
+    public void deactivate()
+    {
+        stop();
+        destroy();
+    }
+
+    public static void stop(String[] args)
+    {
+        instance.deactivate();
+    }
+
+    public static void main(String[] args)
+    {
+        instance.activate();
+    }
+
+    public interface Server
+    {
+        /**
+         * Start the server.
+         * This method shoud be able to restart a server stopped through stop().
+         * Should throw a RuntimeException if the server cannot be started
+         */
+        public void start();
+
+        /**
+         * Stop the server.
+         * This method should be able to stop server started through start().
+         * Should throw a RuntimeException if the server cannot be stopped
+         */
+        public void stop();
 
+        /**
+         * Returns whether the server is currently running.
+         */
+        public boolean isRunning();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 551c60e..89b3715 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -206,6 +206,11 @@ public class ClientState
         hasAccess(user, perms, perm, resource);
     }
 
+    public boolean isLogged()
+    {
+        return user != null;
+    }
+
     private void validateLogin() throws InvalidRequestException
     {
         if (user == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java b/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
index 2447e03..6e4dffa 100644
--- a/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
+++ b/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 
-import org.apache.cassandra.thrift.CassandraDaemon;
+import org.apache.cassandra.service.CassandraDaemon;
 
 /**
  * An embedded, in-memory cassandra storage service that listens

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f72ea4b..74da07e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -100,8 +100,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
      * This pool is used by tasks that can have longer execution times, and usually are non periodic.
      */
     public static final DebuggableScheduledThreadPoolExecutor tasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
-
-    /**
+/**
      * tasks that do not need to be waited for on shutdown/drain
      */
     public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
@@ -262,18 +261,18 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     {
         if (daemon == null)
         {
-            throw new IllegalStateException("No configured RPC daemon");
+            throw new IllegalStateException("No configured daemon");
         }
-        daemon.startRPCServer();
+        daemon.thriftServer.start();
     }
 
     public void stopRPCServer()
     {
         if (daemon == null)
         {
-            throw new IllegalStateException("No configured RPC daemon");
+            throw new IllegalStateException("No configured daemon");
         }
-        daemon.stopRPCServer();
+        daemon.thriftServer.stop();
     }
 
     public boolean isRPCServerRunning()
@@ -282,7 +281,34 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         {
             return false;
         }
-        return daemon.isRPCServerRunning();
+        return daemon.thriftServer.isRunning();
+    }
+
+    public void startNativeTransport()
+    {
+        if (daemon == null)
+        {
+            throw new IllegalStateException("No configured daemon");
+        }
+        daemon.nativeServer.start();
+    }
+
+    public void stopNativeTransport()
+    {
+        if (daemon == null)
+        {
+            throw new IllegalStateException("No configured  daemon");
+        }
+        daemon.nativeServer.stop();
+    }
+
+    public boolean isNativeTransportRunning()
+    {
+        if (daemon == null)
+        {
+            return false;
+        }
+        return daemon.nativeServer.isRunning();
     }
 
     public void stopClient()
@@ -3064,7 +3090,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
     public int getExceptionCount()
     {
-        return AbstractCassandraDaemon.exceptions.get();
+        return CassandraDaemon.exceptions.get();
     }
 
     public void rescheduleFailedDeletions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index e9a84a7..ff1ec03 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -370,6 +370,10 @@ public interface StorageServiceMBean
     // to determine if thrift is running
     public boolean isRPCServerRunning();
 
+    public void stopNativeTransport();
+    public void startNativeTransport();
+    public boolean isNativeTransportRunning();
+
     // allows a node that have been started without joining the ring to join it
     public void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException;
     public boolean isJoined();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
deleted file mode 100644
index 669c981..0000000
--- a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.thrift;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.service.AbstractCassandraDaemon;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
-
-/**
- * This class supports two methods for creating a Cassandra node daemon,
- * invoking the class's main method, and using the jsvc wrapper from
- * commons-daemon, (for more information on using this class with the
- * jsvc wrapper, see the
- * <a href="http://commons.apache.org/daemon/jsvc.html">Commons Daemon</a>
- * documentation).
- */
-
-public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassandraDaemon
-{
-    protected static CassandraDaemon instance;
-
-    static
-    {
-        AbstractCassandraDaemon.initLog4j();
-    }
-
-    private static final Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
-    private final static String SYNC = "sync";
-    private final static String ASYNC = "async";
-    private final static String HSHA = "hsha";
-    public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA);
-    private ThriftServer server;
-
-    protected void startServer()
-    {
-        if (server == null)
-        {
-            server = new ThriftServer(listenAddr, listenPort);
-            server.start();
-        }
-    }
-
-    protected void stopServer()
-    {
-        if (server != null)
-        {
-            server.stopServer();
-            try
-            {
-                server.join();
-            }
-            catch (InterruptedException e)
-            {
-                logger.error("Interrupted while waiting thrift server to stop", e);
-            }
-            server = null;
-        }
-    }
-
-    public static void stop(String[] args)
-    {
-        instance.stopServer();
-        instance.deactivate();
-    }
-
-    public static void main(String[] args)
-    {
-        instance = new CassandraDaemon();
-        instance.activate();
-    }
-
-    /**
-     * Simple class to run the thrift connection accepting code in separate
-     * thread of control.
-     */
-    private static class ThriftServer extends Thread
-    {
-        private TServer serverEngine;
-
-        public ThriftServer(InetAddress listenAddr, int listenPort)
-        {
-            // now we start listening for clients
-            final CassandraServer cassandraServer = new CassandraServer();
-            Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
-
-            // Transport
-            logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
-
-            // Protocol factory
-            TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
-
-            // Transport factory
-            int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
-            TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
-            TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
-            logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
-
-            if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
-            {
-                TServerTransport serverTransport;
-                try
-                {
-                    serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort),
-                                                              DatabaseDescriptor.getRpcKeepAlive(),
-                                                              DatabaseDescriptor.getRpcSendBufferSize(),
-                                                              DatabaseDescriptor.getRpcRecvBufferSize());
-                }
-                catch (TTransportException e)
-                {
-                    throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
-                }
-                // ThreadPool Server and will be invocation per connection basis...
-                TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
-                                                                         .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
-                                                                         .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
-                                                                         .inputTransportFactory(inTransportFactory)
-                                                                         .outputTransportFactory(outTransportFactory)
-                                                                         .inputProtocolFactory(tProtocolFactory)
-                                                                         .outputProtocolFactory(tProtocolFactory)
-                                                                         .processor(processor);
-                ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
-                serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
-                logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort));
-            }
-            else
-            {
-                TNonblockingServerTransport serverTransport;
-                try
-                {
-                    serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort),
-                                                                             DatabaseDescriptor.getRpcKeepAlive(),
-                                                                             DatabaseDescriptor.getRpcSendBufferSize(),
-                                                                             DatabaseDescriptor.getRpcRecvBufferSize());
-                }
-                catch (TTransportException e)
-                {
-                    throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
-                }
-
-                if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
-                {
-                    // This is single threaded hence the invocation will be all
-                    // in one thread.
-                    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
-                                                                                                     .outputTransportFactory(outTransportFactory)
-                                                                                                     .inputProtocolFactory(tProtocolFactory)
-                                                                                                     .outputProtocolFactory(tProtocolFactory)
-                                                                                                     .processor(processor);
-                    logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort));
-                    serverEngine = new CustomTNonBlockingServer(serverArgs);
-                }
-                else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
-                {
-                    // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
-                    ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
-                                                                                       DatabaseDescriptor.getRpcMaxThreads(),
-                                                                                       60L,
-                                                                                       TimeUnit.SECONDS,
-                                                                                       new SynchronousQueue<Runnable>(),
-                                                                                       new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
-                    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
-                                                                                       .outputTransportFactory(outTransportFactory)
-                                                                                       .inputProtocolFactory(tProtocolFactory)
-                                                                                       .outputProtocolFactory(tProtocolFactory)
-                                                                                       .processor(processor);
-                    logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
-                    // Check for available processors in the system which will be equal to the IO Threads.
-                    serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
-                }
-            }
-        }
-
-        public void run()
-        {
-            logger.info("Listening for thrift clients...");
-            serverEngine.serve();
-        }
-
-        public void stopServer()
-        {
-            logger.info("Stop listening to thrift clients");
-            serverEngine.stop();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index f661e05..efe0372 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -32,7 +32,6 @@ import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.antlr.runtime.RecognitionException;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql.CQLStatement;
@@ -1234,20 +1233,11 @@ public class CassandraServer implements Cassandra.Iface
 
         String queryString = uncompress(query,compression);
 
-        try
-        {
-            ClientState cState = state();
-            if (cState.getCQLVersion().major == 2)
-                return QueryProcessor.process(queryString, state());
-            else
-                return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState);
-        }
-        catch (RecognitionException e)
-        {
-            InvalidRequestException ire = new InvalidRequestException("Invalid or malformed CQL query string");
-            ire.initCause(e);
-            throw ire;
-        }
+        ClientState cState = state();
+        if (cState.getCQLVersion().major == 2)
+            return QueryProcessor.process(queryString, state());
+        else
+            return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState).toThriftResult();
     }
 
     public CqlPreparedResult prepare_cql_query(ByteBuffer query, Compression compression)
@@ -1257,20 +1247,11 @@ public class CassandraServer implements Cassandra.Iface
 
         String queryString = uncompress(query,compression);
 
-        try
-        {
-            ClientState cState = state();
-            if (cState.getCQLVersion().major == 2)
-                return QueryProcessor.prepare(queryString, cState);
-            else
-                return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState);
-        }
-        catch (RecognitionException e)
-        {
-            InvalidRequestException ire = new InvalidRequestException("Invalid or malformed CQL query string");
-            ire.initCause(e);
-            throw ire;
-        }
+        ClientState cState = state();
+        if (cState.getCQLVersion().major == 2)
+            return QueryProcessor.prepare(queryString, cState);
+        else
+            return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState).toThriftPreparedResult();
     }
 
     public CqlResult execute_prepared_cql_query(int itemId, List<ByteBuffer> bindVariables)
@@ -1297,7 +1278,7 @@ public class CassandraServer implements Cassandra.Iface
                 throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
             logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
 
-            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables);
+            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables).toThriftResult();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftServer.java b/src/java/org/apache/cassandra/thrift/ThriftServer.java
new file mode 100644
index 0000000..5c96e47
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/ThriftServer.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.ClientState;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+public class ThriftServer implements CassandraDaemon.Server
+{
+    private static final Logger logger = LoggerFactory.getLogger(ThriftServer.class);
+    private final static String SYNC = "sync";
+    private final static String ASYNC = "async";
+    private final static String HSHA = "hsha";
+    public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA);
+
+    private final InetAddress address;
+    private final int port;
+    private volatile ThriftServerThread server;
+
+    public ThriftServer(InetAddress address, int port)
+    {
+        this.address = address;
+        this.port = port;
+    }
+
+    public void start()
+    {
+        if (server == null)
+        {
+            server = new ThriftServerThread(address, port);
+            server.start();
+        }
+    }
+
+    public void stop()
+    {
+        if (server != null)
+        {
+            server.stopServer();
+            try
+            {
+                server.join();
+            }
+            catch (InterruptedException e)
+            {
+                logger.error("Interrupted while waiting thrift server to stop", e);
+            }
+            server = null;
+        }
+    }
+
+    public boolean isRunning()
+    {
+        return server != null;
+    }
+
+    /**
+     * Simple class to run the thrift connection accepting code in separate
+     * thread of control.
+     */
+    private static class ThriftServerThread extends Thread
+    {
+        private TServer serverEngine;
+
+        public ThriftServerThread(InetAddress listenAddr, int listenPort)
+        {
+            // now we start listening for clients
+            final CassandraServer cassandraServer = new CassandraServer();
+            Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
+
+            // Transport
+            logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
+
+            // Protocol factory
+            TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
+
+            // Transport factory
+            int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
+            TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+            TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+            logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
+
+            if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
+            {
+                TServerTransport serverTransport;
+                try
+                {
+                    serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort),
+                                                              DatabaseDescriptor.getRpcKeepAlive(),
+                                                              DatabaseDescriptor.getRpcSendBufferSize(),
+                                                              DatabaseDescriptor.getRpcRecvBufferSize());
+                }
+                catch (TTransportException e)
+                {
+                    throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+                }
+                // ThreadPool Server and will be invocation per connection basis...
+                TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+                                                                         .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+                                                                         .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+                                                                         .inputTransportFactory(inTransportFactory)
+                                                                         .outputTransportFactory(outTransportFactory)
+                                                                         .inputProtocolFactory(tProtocolFactory)
+                                                                         .outputProtocolFactory(tProtocolFactory)
+                                                                         .processor(processor);
+                ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
+                serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
+                logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort));
+            }
+            else
+            {
+                TNonblockingServerTransport serverTransport;
+                try
+                {
+                    serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort),
+                                                                             DatabaseDescriptor.getRpcKeepAlive(),
+                                                                             DatabaseDescriptor.getRpcSendBufferSize(),
+                                                                             DatabaseDescriptor.getRpcRecvBufferSize());
+                }
+                catch (TTransportException e)
+                {
+                    throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+                }
+
+                if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
+                {
+                    // This is single threaded hence the invocation will be all
+                    // in one thread.
+                    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+                                                                                                     .outputTransportFactory(outTransportFactory)
+                                                                                                     .inputProtocolFactory(tProtocolFactory)
+                                                                                                     .outputProtocolFactory(tProtocolFactory)
+                                                                                                     .processor(processor);
+                    logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort));
+                    serverEngine = new CustomTNonBlockingServer(serverArgs);
+                }
+                else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
+                {
+                    // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
+                    ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+                                                                                       DatabaseDescriptor.getRpcMaxThreads(),
+                                                                                       60L,
+                                                                                       TimeUnit.SECONDS,
+                                                                                       new SynchronousQueue<Runnable>(),
+                                                                                       new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+                    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+                                                                                       .outputTransportFactory(outTransportFactory)
+                                                                                       .inputProtocolFactory(tProtocolFactory)
+                                                                                       .outputProtocolFactory(tProtocolFactory)
+                                                                                       .processor(processor);
+                    logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
+                    // Check for available processors in the system which will be equal to the IO Threads.
+                    serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
+                }
+            }
+        }
+
+        public void run()
+        {
+            logger.info("Listening for thrift clients...");
+            serverEngine.serve();
+        }
+
+        public void stopServer()
+        {
+            logger.info("Stop listening to thrift clients");
+            serverEngine.stop();
+        }
+    }
+
+    /**
+     * A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool
+     * interface (for integration with Avro), and performs ClientState cleanup.
+     *
+     * (Note that the tasks being executed perform their own while-command-process
+     * loop until the client disconnects.)
+     */
+    private static class CleaningThreadPool extends ThreadPoolExecutor
+    {
+        private final ThreadLocal<ClientState> state;
+        public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread, int maxWorkerThreads)
+        {
+            super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("Thrift"));
+            this.state = state;
+        }
+
+        @Override
+        protected void afterExecute(Runnable r, Throwable t)
+        {
+            super.afterExecute(r, t);
+            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
+            state.get().logout();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 0ac6a80..b10643a 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.service.AbstractCassandraDaemon;
+import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.utils.OutputHandler;
 import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
 
@@ -39,7 +39,7 @@ public class StandaloneScrubber
 {
     static
     {
-        AbstractCassandraDaemon.initLog4j();
+        CassandraDaemon.initLog4j();
     }
 
     private static final String TOOL_NAME = "sstablescrub";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
new file mode 100644
index 0000000..1a6719b
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+public interface CBCodec<T>
+{
+    public T decode(ChannelBuffer body);
+    public ChannelBuffer encode(T t);
+}


Mime
View raw message