cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/9] Upgrade thrift version to 0.9.0
Date Mon, 14 Jan 2013 20:30:11 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b153d456/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
index a199a27..4606fbc 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
@@ -17,16 +17,8 @@
  */
 package org.apache.cassandra.thrift;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -37,11 +29,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TNonblockingTransport;
 import org.apache.thrift.transport.TTransportException;
 
 /**
@@ -51,12 +42,9 @@ import org.apache.thrift.transport.TTransportException;
  * it is spread across multiple threads. Number of selector thread can be the
  * number of CPU available.
  */
-public class CustomTHsHaServer extends TNonblockingServer
+public class CustomTHsHaServer extends THsHaServer
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(CustomTHsHaServer.class.getName());
-    private final Set<SelectorThread> ioThreads = new HashSet<SelectorThread>();
-    private volatile boolean stopped = true;
-    private final ExecutorService invoker;
 
     /**
      * All the arguments to Non Blocking Server will apply here. In addition,
@@ -64,290 +52,19 @@ public class CustomTHsHaServer extends TNonblockingServer
      * will process the data. threads for selection usually are equal to the
      * number of cpu's
      */
-    public CustomTHsHaServer(Args args, ExecutorService invoker, int threadCount)
+    public CustomTHsHaServer(Args args)
     {
         super(args);
-        this.invoker = invoker;
-        // Create all the Network IO Threads.
-        for (int i = 0; i < threadCount; ++i)
-            ioThreads.add(new SelectorThread("Selector-Thread-" + i));
     }
 
-    /** @inheritDoc */
-    @Override
-    public void serve()
+    protected boolean requestInvoke(FrameBuffer frameBuffer)
     {
-        if (!startListening())
-            return;
-        if (!startThreads())
-            return;
-        setServing(true);
-        joinSelector();
-        invoker.shutdown();
-        setServing(false);
-        stopListening();
-    }
-
-    /**
-     * Save the remote socket as a thead local for future use of client state.
-     */
-    protected class Invocation implements Runnable
-    {
-        private final FrameBuffer frameBuffer;
-        private final SelectorThread thread;
-
-        public Invocation(final FrameBuffer frameBuffer, SelectorThread thread)
-        {
-            this.frameBuffer = frameBuffer;
-            this.thread = thread;
-        }
-
-        public void run()
-        {
-            TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_;
-            ThriftSessionManager.instance.setCurrentSocket(socket.getSocketChannel().socket().getRemoteSocketAddress());
-            frameBuffer.invoke();
-            // this is how we let the same selector thread change the selection type.
-            thread.requestSelectInterestChange(frameBuffer);
-        }
-    }
-
-    protected boolean startThreads()
-    {
-        stopped = false;
-        // start all the threads.
-        for (SelectorThread thread : ioThreads)
-            thread.start();
+        TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_;
+        ThriftSessionManager.instance.setCurrentSocket(socket.getSocketChannel().socket().getRemoteSocketAddress());
+        frameBuffer.invoke();
         return true;
     }
 
-    @Override
-    protected void joinSelector()
-    {
-        try
-        {
-            // wait till all done with stuff's
-            for (SelectorThread thread : ioThreads)
-                thread.join();
-        }
-        catch (InterruptedException e)
-        {
-            LOGGER.error("Interrupted while joining threads!", e);
-        }
-    }
-
-    /**
-     * Stop serving and shut everything down.
-     */
-    @Override
-    public void stop()
-    {
-        stopListening();
-        stopped = true;
-        for (SelectorThread thread : ioThreads)
-            thread.wakeupSelector();
-        joinSelector();
-    }
-
-    /**
-     * IO Threads will perform expensive IO operations...
-     */
-    protected class SelectorThread extends Thread
-    {
-        private final Selector selector;
-        private final TNonblockingServerTransport serverTransport;
-        private final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
-
-        public SelectorThread(String name)
-        {
-            super(name);
-            try
-            {
-                this.selector = SelectorProvider.provider().openSelector();
-                this.serverTransport = (TNonblockingServerTransport) serverTransport_;
-                this.serverTransport.registerSelector(selector);
-            }
-            catch (IOException ex)
-            {
-                throw new RuntimeException("Couldnt open the NIO selector", ex);
-            }
-        }
-
-        public void run()
-        {
-            try
-            {
-                while (!stopped)
-                {
-                    select();
-                }
-            }
-            catch (Throwable t)
-            {
-                LOGGER.error("Uncaught Exception: ", t);
-            }
-            finally
-            {
-                try
-                {
-                    selector.close(); // CASSANDRA-3867
-                }
-                catch (IOException e)
-                {
-                    // ignore this exception.
-                }
-            }
-        }
-
-        private void select() throws InterruptedException, IOException
-        {
-            // wait for new keys
-            selector.select();
-            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
-            while (keyIterator.hasNext())
-            {
-                SelectionKey key = keyIterator.next();
-                keyIterator.remove();
-
-                try
-                {
-                    if (!key.isValid())
-                    {
-                        // if invalid cleanup.
-                        cleanupSelectionkey(key);
-                        continue;
-                    }
-
-                    if (key.isAcceptable())
-                        handleAccept();
-                    if (key.isReadable())
-                        handleRead(key);
-                    else if (key.isWritable())
-                        handleWrite(key);
-                    else
-                        LOGGER.debug("Unexpected state " + key.interestOps());
-                }
-                catch (Exception io)
-                {
-                    // just ignore (?)
-                    cleanupSelectionkey(key);
-                }
-            }
-            // process the changes which are inserted after completion.
-            processInterestChanges();
-        }
-
-        private void handleAccept()
-        {
-            SelectionKey clientKey = null;
-            TNonblockingTransport client = null;
-            try
-            {
-                // accept the connection
-                client = (TNonblockingTransport) serverTransport.accept();
-                clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
-                // add this key to the map
-                FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
-                clientKey.attach(frameBuffer);
-            } catch (TTransportException ex)
-            {
-                // ignore this might have been handled by the other threads.
-                // serverTransport.accept() as it returns null as nothing to accept.
-                return;
-            }
-            catch (IOException tte)
-            {
-                // something went wrong accepting.
-                LOGGER.warn("Exception trying to accept!", tte);
-                tte.printStackTrace();
-                if (clientKey != null)
-                    cleanupSelectionkey(clientKey);
-                if (client != null)
-                    client.close();
-            }
-        }
-
-        private void handleRead(SelectionKey key)
-        {
-            FrameBuffer buffer = (FrameBuffer) key.attachment();
-            if (!buffer.read())
-            {
-                cleanupSelectionkey(key);
-                return;
-            }
-
-            if (buffer.isFrameFullyRead())
-            {
-                if (!requestInvoke(buffer, this))
-                    cleanupSelectionkey(key);
-            }
-        }
-
-        private void handleWrite(SelectionKey key)
-        {
-            FrameBuffer buffer = (FrameBuffer) key.attachment();
-            if (!buffer.write())
-                cleanupSelectionkey(key);
-        }
-
-        public void requestSelectInterestChange(FrameBuffer frameBuffer)
-        {
-            synchronized (selectInterestChanges)
-            {
-                selectInterestChanges.add(frameBuffer);
-            }
-            // Wake-up the selector, if it's currently blocked.
-            selector.wakeup();
-        }
-
-        private void processInterestChanges()
-        {
-            synchronized (selectInterestChanges)
-            {
-                for (FrameBuffer fb : selectInterestChanges)
-                    fb.changeSelectInterests();
-                selectInterestChanges.clear();
-            }
-        }
-
-        private void cleanupSelectionkey(SelectionKey key)
-        {
-            FrameBuffer buffer = (FrameBuffer) key.attachment();
-            if (buffer != null)
-                buffer.close();
-            // cancel the selection key
-            key.cancel();
-        }
-
-        public void wakeupSelector()
-        {
-            selector.wakeup();
-        }
-    }
-
-    protected boolean requestInvoke(FrameBuffer frameBuffer, SelectorThread thread)
-    {
-        try
-        {
-            Runnable invocation = new Invocation(frameBuffer, thread);
-            invoker.execute(invocation);
-            return true;
-        }
-        catch (RejectedExecutionException rx)
-        {
-            LOGGER.warn("ExecutorService rejected execution!", rx);
-            return false;
-        }
-    }
-
-    @Override
-    protected void requestSelectInterestChange(FrameBuffer fb)
-    {
-        // Dont change the interest here, this has to be done by the selector
-        // thread because the method is not synchronized with the rest of the
-        // selectors threads.
-    }
-
     public static class Factory implements TServerFactory
     {
         public TServer buildTServer(Args args)
@@ -373,13 +90,14 @@ public class CustomTHsHaServer extends TNonblockingServer
                                                                                TimeUnit.SECONDS,
                                                                                new SynchronousQueue<Runnable>(),
                                                                                new NamedThreadFactory("RPC-Thread"),
"RPC-THREAD-POOL");
-            TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
+           THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)

                                                                                .outputTransportFactory(args.outTransportFactory)
                                                                                .inputProtocolFactory(args.tProtocolFactory)
                                                                                .outputProtocolFactory(args.tProtocolFactory)
-                                                                               .processor(args.processor);
+                                                                               .processor(args.processor)
+                                                                               .executorService(executorService);
             // Check for available processors in the system which will be equal to the IO
Threads.
-            return new CustomTHsHaServer(serverArgs, executorService, FBUtilities.getAvailableProcessors());
+            return new CustomTHsHaServer(serverArgs);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b153d456/src/java/org/apache/cassandra/tools/Shuffle.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/Shuffle.java b/src/java/org/apache/cassandra/tools/Shuffle.java
index 8cf38b9..8395776 100644
--- a/src/java/org/apache/cassandra/tools/Shuffle.java
+++ b/src/java/org/apache/cassandra/tools/Shuffle.java
@@ -538,15 +538,15 @@ public class Shuffle extends AbstractJmxClient
         {
             partitionerName = getThriftClient(hostName, port, framed).describe_partitioner();
         }
+        catch (InvalidRequestException e)
+        {
+            throw new RuntimeException("Error calling describe_partitioner() defies explanation",
e);
+        }
         catch (TException e)
         {
             throw new ShuffleError(
                     String.format("Thrift request to %s:%d failed: %s", hostName, port, e.getMessage()));
         }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException("Error calling describe_partitioner() defies explanation",
e);
-        }
 
         try
         {


Mime
View raw message