cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [07/26] cassandra git commit: Thrift removal
Date Tue, 13 Dec 2016 09:27:31 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index b41cc00..0e88929 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -64,7 +64,6 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.thrift.ThriftServer;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
 
@@ -150,7 +149,6 @@ public class CassandraDaemon
 
     static final CassandraDaemon instance = new CassandraDaemon();
 
-    public Server thriftServer;
     private NativeTransportService nativeTransportService;
     private JMXConnectorServer jmxServer;
 
@@ -396,12 +394,6 @@ public class CassandraDaemon
         if (sizeRecorderInterval > 0)
             ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
30, sizeRecorderInterval, TimeUnit.SECONDS);
 
-        // Thrift
-        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
-        int rpcPort = DatabaseDescriptor.getRpcPort();
-        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
-        thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
-
         // Native transport
         nativeTransportService = new NativeTransportService();
 
@@ -492,12 +484,6 @@ public class CassandraDaemon
         }
         else
             logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport())
or nodetool (enablebinary) to start it");
-
-        String rpcFlag = System.getProperty("cassandra.start_rpc");
-        if ((rpcFlag != null && Boolean.parseBoolean(rpcFlag)) || (rpcFlag == null
&& DatabaseDescriptor.startRpc()))
-            thriftServer.start();
-        else
-            logger.info("Not starting RPC server as requested. Use JMX (StorageService->startRPCServer())
or nodetool (enablethrift) to start it");
     }
 
     /**
@@ -510,8 +496,6 @@ public class CassandraDaemon
         // On linux, this doesn't entirely shut down Cassandra, just the RPC server.
         // jsvc takes care of taking the rest down
         logger.info("Cassandra shutting down...");
-        if (thriftServer != null)
-            thriftServer.stop();
         if (nativeTransportService != null)
             nativeTransportService.destroy();
         StorageService.instance.setRpcReady(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 5f01702..52e71ae 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -34,13 +34,13 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.Validation;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.CassandraVersion;
@@ -148,7 +148,7 @@ public class ClientState
     }
 
     /**
-     * @return a ClientState object for external clients (thrift/native protocol users).
+     * @return a ClientState object for external clients (native protocol users).
      */
     public static ClientState forExternalCalls(SocketAddress remoteAddress)
     {
@@ -290,7 +290,7 @@ public class ClientState
     public void hasColumnFamilyAccess(String keyspace, String columnFamily, Permission perm)
     throws UnauthorizedException, InvalidRequestException
     {
-        ThriftValidation.validateColumnFamily(keyspace, columnFamily);
+        Validation.validateColumnFamily(keyspace, columnFamily);
         hasAccess(keyspace, perm, DataResource.table(keyspace, columnFamily));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java b/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
index 2515259..9dc92d7 100644
--- a/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
+++ b/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
@@ -20,8 +20,7 @@ package org.apache.cassandra.service;
 import java.io.IOException;
 
 /**
- * An embedded, in-memory cassandra storage service that listens
- * on the thrift interface as configured in cassandra.yaml
+ * An embedded, in-memory cassandra storage service.
  * This kind of service is useful when running unit tests of
  * services using cassandra for example.
  *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1d83b70..d5a9f47 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1896,9 +1896,10 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
-     * Estimate the number of result rows (either cql3 rows or "thrift" rows, as called for
by the command) per
-     * range in the ring based on our local data.  This assumes that ranges are uniformly
distributed across the cluster
-     * and that the queried data is also uniformly distributed.
+     * Estimate the number of result rows per range in the ring based on our local data.
+     * <p>
+     * This assumes that ranges are uniformly distributed across the cluster and
+     * that the queried data is also uniformly distributed.
      */
     private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace
keyspace)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8719c6c..15fb984 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -88,9 +88,6 @@ import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
 import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.thrift.EndpointDetails;
-import org.apache.cassandra.thrift.TokenRange;
-import org.apache.cassandra.thrift.cassandraConstants;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.*;
@@ -353,37 +350,6 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         return Gossiper.instance.isEnabled();
     }
 
-    // should only be called via JMX
-    public synchronized void startRPCServer()
-    {
-        checkServiceAllowedToStart("thrift");
-
-        if (daemon == null)
-        {
-            throw new IllegalStateException("No configured daemon");
-        }
-        daemon.thriftServer.start();
-    }
-
-    public void stopRPCServer()
-    {
-        if (daemon == null)
-        {
-            throw new IllegalStateException("No configured daemon");
-        }
-        if (daemon.thriftServer != null)
-            daemon.thriftServer.stop();
-    }
-
-    public boolean isRPCServerRunning()
-    {
-        if ((daemon == null) || (daemon.thriftServer == null))
-        {
-            return false;
-        }
-        return daemon.thriftServer.isRunning();
-    }
-
     public synchronized void startNativeTransport()
     {
         checkServiceAllowedToStart("native transport");
@@ -428,11 +394,6 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
             logger.error("Stopping gossiper");
             stopGossiping();
         }
-        if (isRPCServerRunning())
-        {
-            logger.error("Stopping RPC server");
-            stopRPCServer();
-        }
         if (isNativeTransportRunning())
         {
             logger.error("Stopping native transport");
@@ -453,7 +414,6 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
     private void shutdownClientServers()
     {
         setRpcReady(false);
-        stopRPCServer();
         stopNativeTransport();
     }
 
@@ -616,7 +576,6 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
     public synchronized void initServer(int delay) throws ConfigurationException
     {
         logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString());
-        logger.info("Thrift API version: {}", cassandraConstants.VERSION);
         logger.info("CQL supported versions: {} (default: {})",
                 StringUtils.join(ClientState.getCQLSupportedVersion(), ", "), ClientState.DEFAULT_CQL_VERSION);
         logger.info("Native protocol supported versions: {} (default: {})",
@@ -1762,32 +1721,7 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
                         : getRangeToAddressMap(keyspace);
 
         for (Map.Entry<Range<Token>, List<InetAddress>> entry : rangeToAddressMap.entrySet())
-        {
-            Range<Token> range = entry.getKey();
-            List<InetAddress> addresses = entry.getValue();
-            List<String> endpoints = new ArrayList<>(addresses.size());
-            List<String> rpc_endpoints = new ArrayList<>(addresses.size());
-            List<EndpointDetails> epDetails = new ArrayList<>(addresses.size());
-
-            for (InetAddress endpoint : addresses)
-            {
-                EndpointDetails details = new EndpointDetails();
-                details.host = endpoint.getHostAddress();
-                details.datacenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
-                details.rack = DatabaseDescriptor.getEndpointSnitch().getRack(endpoint);
-
-                endpoints.add(details.host);
-                rpc_endpoints.add(getRpcaddress(endpoint));
-
-                epDetails.add(details);
-            }
-
-            TokenRange tr = new TokenRange(tf.toString(range.left.getToken()), tf.toString(range.right.getToken()),
endpoints)
-                                    .setEndpoint_details(epDetails)
-                                    .setRpc_endpoints(rpc_endpoints);
-
-            ranges.add(tr);
-        }
+            ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue()));
 
         return ranges;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 339b991..b569200 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -510,15 +510,6 @@ public interface StorageServiceMBean extends NotificationEmitter
     // to determine if initialization has completed
     public boolean isInitialized();
 
-    // allows a user to disable thrift
-    public void stopRPCServer();
-
-    // allows a user to reenable thrift
-    public void startRPCServer();
-
-    // to determine if thrift is running
-    public boolean isRPCServerRunning();
-
     public void stopNativeTransport();
     public void startNativeTransport();
     public boolean isNativeTransportRunning();
@@ -624,7 +615,7 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void resetLocalSchema() throws IOException;
 
     /**
-     * Enables/Disables tracing for the whole system. Only thrift requests can start tracing
currently.
+     * Enables/Disables tracing for the whole system.
      *
      * @param probability
      *            ]0,1[ will enable tracing on a partial number of requests with the provided
probability. 0 will

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/TokenRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TokenRange.java b/src/java/org/apache/cassandra/service/TokenRange.java
new file mode 100644
index 0000000..0e46910
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/TokenRange.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.IEndpointSnitch;
+
+/**
+ * Holds token range informations for the sake of {@link StorageService#describeRing}.
+ *
+ * This class mostly exists for the sake of {@link StorageService#describeRing},
+ * which used to rely on a thrift class which this is the equivalent of. This is
+ * the reason this class behave how it does and the reason for the format
+ * of {@code toString()} in particular (used by
+ * {@link StorageService#describeRingJMX}). This class probably have no other
+ * good uses than providing backward compatibility.
+ */
+public class TokenRange
+{
+    private final Token.TokenFactory tokenFactory;
+
+    public final Range<Token> range;
+    public final List<EndpointDetails> endpoints;
+
+    private TokenRange(Token.TokenFactory tokenFactory, Range<Token> range, List<EndpointDetails>
endpoints)
+    {
+        this.tokenFactory = tokenFactory;
+        this.range = range;
+        this.endpoints = endpoints;
+    }
+
+    private String toStr(Token tk)
+    {
+        return tokenFactory.toString(tk);
+    }
+
+    public static TokenRange create(Token.TokenFactory tokenFactory, Range<Token> range,
List<InetAddress> endpoints)
+    {
+        List<EndpointDetails> details = new ArrayList<>(endpoints.size());
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        for (InetAddress ep : endpoints)
+            details.add(new EndpointDetails(ep,
+                                            StorageService.instance.getRpcaddress(ep),
+                                            snitch.getDatacenter(ep),
+                                            snitch.getRack(ep)));
+        return new TokenRange(tokenFactory, range, details);
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("TokenRange(");
+
+        sb.append("start_token:").append(toStr(range.left));
+        sb.append(", end_token:").append(toStr(range.right));
+
+        List<String> hosts = new ArrayList<>(endpoints.size());
+        List<String> rpcs = new ArrayList<>(endpoints.size());
+        for (EndpointDetails ep : endpoints)
+        {
+            hosts.add(ep.host.getHostAddress());
+            rpcs.add(ep.rpcAddress);
+        }
+
+        sb.append("endpoints:").append(hosts);
+        sb.append("rpc_endpoints:").append(rpcs);
+        sb.append("endpoint_details:").append(endpoints);
+
+        sb.append(")");
+        return sb.toString();
+    }
+
+    public static class EndpointDetails
+    {
+        public final InetAddress host;
+        public final String rpcAddress;
+        public final String datacenter;
+        public final String rack;
+
+        private EndpointDetails(InetAddress host, String rpcAddress, String datacenter, String
rack)
+        {
+            // dc and rack can be null, but host shouldn't
+            assert host != null;
+            this.host = host;
+            this.rpcAddress = rpcAddress;
+            this.datacenter = datacenter;
+            this.rack = rack;
+        }
+
+        @Override
+        public String toString()
+        {
+            // Format matters for backward compatibility with describeRing()
+            String dcStr = datacenter == null ? "" : String.format(", datacenter:%s", datacenter);
+            String rackStr = rack == null ? "" : String.format(", rack:%s", rack);
+            return String.format("EndpointDetails(host:%s%s%s)", host.getHostAddress(), dcStr,
rackStr);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index 4a9ac39..bcf3979 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -23,10 +23,11 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.LegacyLayout;
+import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -216,12 +217,12 @@ public class PagingState
                     // If the last returned row has no cell, this means in 2.1/2.2 terms
that we stopped on the row
                     // marker. Note that this shouldn't happen if the table is COMPACT.
                     assert !metadata.isCompactTable();
-                    mark = LegacyLayout.encodeCellName(metadata, row.clustering(), ByteBufferUtil.EMPTY_BYTE_BUFFER,
null);
+                    mark = encodeCellName(metadata, row.clustering(), ByteBufferUtil.EMPTY_BYTE_BUFFER,
null);
                 }
                 else
                 {
                     Cell cell = cells.next();
-                    mark = LegacyLayout.encodeCellName(metadata, row.clustering(), cell.column().name.bytes,
cell.column().isComplex() ? cell.path().get(0) : null);
+                    mark = encodeCellName(metadata, row.clustering(), cell.column().name.bytes,
cell.column().isComplex() ? cell.path().get(0) : null);
                 }
             }
             else
@@ -239,10 +240,84 @@ public class PagingState
                 return null;
 
             return protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3)
-                 ? LegacyLayout.decodeClustering(metadata, mark)
+                 ? decodeClustering(metadata, mark)
                  : Clustering.serializer.deserialize(mark, MessagingService.VERSION_30, makeClusteringTypes(metadata));
         }
 
+        // Old (pre-3.0) encoding of cells. We need that for the protocol v3 as that is how
things where encoded
+        private static ByteBuffer encodeCellName(CFMetaData metadata, Clustering clustering,
ByteBuffer columnName, ByteBuffer collectionElement)
+        {
+            boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
+
+            if (!metadata.isCompound())
+            {
+                if (isStatic)
+                    return columnName;
+
+                assert clustering.size() == 1 : "Expected clustering size to be 1, but was
" + clustering.size();
+                return clustering.get(0);
+            }
+
+            // We use comparator.size() rather than clustering.size() because of static clusterings
+            int clusteringSize = metadata.comparator.size();
+            int size = clusteringSize + (metadata.isDense() ? 0 : 1) + (collectionElement
== null ? 0 : 1);
+            if (metadata.isSuper())
+                size = clusteringSize + 1;
+            ByteBuffer[] values = new ByteBuffer[size];
+            for (int i = 0; i < clusteringSize; i++)
+            {
+                if (isStatic)
+                {
+                    values[i] = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                    continue;
+                }
+
+                ByteBuffer v = clustering.get(i);
+                // we can have null (only for dense compound tables for backward compatibility
reasons) but that
+                // means we're done and should stop there as far as building the composite
is concerned.
+                if (v == null)
+                    return CompositeType.build(Arrays.copyOfRange(values, 0, i));
+
+                values[i] = v;
+            }
+
+            if (metadata.isSuper())
+            {
+                // We need to set the "column" (in thrift terms) name, i.e. the value corresponding
to the subcomparator.
+                // What it is depends if this a cell for a declared "static" column or a
"dynamic" column part of the
+                // super-column internal map.
+                assert columnName != null; // This should never be null for supercolumns,
see decodeForSuperColumn() above
+                values[clusteringSize] = columnName.equals(CompactTables.SUPER_COLUMN_MAP_COLUMN)
+                                         ? collectionElement
+                                         : columnName;
+            }
+            else
+            {
+                if (!metadata.isDense())
+                    values[clusteringSize] = columnName;
+                if (collectionElement != null)
+                    values[clusteringSize + 1] = collectionElement;
+            }
+
+            return CompositeType.build(isStatic, values);
+        }
+
+        private static Clustering decodeClustering(CFMetaData metadata, ByteBuffer value)
+        {
+            int csize = metadata.comparator.size();
+            if (csize == 0)
+                return Clustering.EMPTY;
+
+            if (metadata.isCompound() && CompositeType.isStaticName(value))
+                return Clustering.STATIC_CLUSTERING;
+
+            List<ByteBuffer> components = metadata.isCompound()
+                                          ? CompositeType.splitName(value)
+                                          : Collections.singletonList(value);
+
+            return Clustering.make(components.subList(0, Math.min(csize, components.size())).toArray(new
ByteBuffer[csize]));
+        }
+
         @Override
         public final int hashCode()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index 5ba13a4..68547be 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -30,9 +30,6 @@ import org.apache.cassandra.transport.ProtocolVersion;
 
 /**
  * Pages a PartitionRangeReadCommand.
- *
- * Note: this only work for CQL3 queries for now (because thrift queries expect
- * a different limit on the rows than on the columns, which complicates it).
  */
 public class PartitionRangeQueryPager extends AbstractQueryPager
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
deleted file mode 100644
index 15311ab..0000000
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service.pager;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.transport.ProtocolVersion;
-
-/**
- * Static utility methods for paging.
- */
-public class QueryPagers
-{
-    private QueryPagers() {};
-
-    /**
-     * Convenience method that count (live) cells/rows for a given slice of a row, but page
underneath.
-     */
-    public static int countPaged(CFMetaData metadata,
-                                 DecoratedKey key,
-                                 ColumnFilter columnFilter,
-                                 ClusteringIndexFilter filter,
-                                 DataLimits limits,
-                                 ConsistencyLevel consistencyLevel,
-                                 ClientState state,
-                                 final int pageSize,
-                                 int nowInSec,
-                                 boolean isForThrift,
-                                 long queryStartNanoTime) throws RequestValidationException,
RequestExecutionException
-    {
-        SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift,
metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
-        final SinglePartitionPager pager = new SinglePartitionPager(command, null, ProtocolVersion.CURRENT);
-
-        int count = 0;
-        while (!pager.isExhausted())
-        {
-            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state,
queryStartNanoTime))
-            {
-                DataLimits.Counter counter = limits.newCounter(nowInSec, true);
-                PartitionIterators.consume(counter.applyTo(iter));
-                count += counter.counted();
-            }
-        }
-        return count;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index e400fb6..e95c358 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -89,8 +89,9 @@ public class SinglePartitionPager extends AbstractQueryPager
     protected ReadCommand nextPageReadCommand(int pageSize)
     {
         Clustering clustering = lastReturned == null ? null : lastReturned.clustering(command.metadata());
-        DataLimits limits = (lastReturned == null || command.isForThrift()) ? limits().forPaging(pageSize)
-                                                                            : limits().forPaging(pageSize,
key(), remainingInPartition());
+        DataLimits limits = lastReturned == null
+                          ? limits().forPaging(pageSize)
+                          : limits().forPaging(pageSize, key(), remainingInPartition());
 
         return command.forPaging(clustering, limits);
     }


Mime
View raw message