Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B7064101E9 for ; Thu, 26 Sep 2013 20:38:58 +0000 (UTC) Received: (qmail 84963 invoked by uid 500); 26 Sep 2013 20:38:57 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 84926 invoked by uid 500); 26 Sep 2013 20:38:56 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 84898 invoked by uid 99); 26 Sep 2013 20:38:56 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 20:38:56 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EAE6B90B24D; Thu, 26 Sep 2013 20:38:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Thu, 26 Sep 2013 20:38:56 -0000 Message-Id: <69af48103edf4dde953fef7986e52afc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: Fixes for speculative retry patch by ayeschenko and jbellis for CASSANDRA-5932 Fixes for speculative retry patch by ayeschenko and jbellis for CASSANDRA-5932 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20c419b9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20c419b9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20c419b9 Branch: refs/heads/trunk Commit: 20c419b9480e0e5b3c1da53a106b2a6760be35b9 Parents: 006eec4 Author: Jonathan Ellis Authored: Thu Sep 26 15:38:13 2013 -0500 Committer: Jonathan Ellis Committed: Thu Sep 26 15:38:36 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/AbstractReadExecutor.java | 338 ++++++++++++------- .../apache/cassandra/service/ReadCallback.java | 2 +- .../apache/cassandra/service/StorageProxy.java | 30 +- 4 files changed, 226 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cc3daf6..3d4d19c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.2 + * Fixes for speculative retry (CASSANDRA-5932) * Improve memory usage of metadata min/max column names (CASSANDRA-6077) * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081) * Fix insertion of collections with CAS (CASSANDRA-6069) http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 2ebc0b3..83368c2 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -18,12 +18,17 @@ package org.apache.cassandra.service; import java.net.InetAddress; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType; import org.apache.cassandra.config.Schema; import org.apache.cassandra.config.ReadRepairDecision; import org.apache.cassandra.db.ColumnFamilyStore; @@ -39,142 +44,225 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy.LocalReadRunnable; import org.apache.cassandra.utils.FBUtilities; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel. + * + * Optionally, may perform additional requests to provide redundancy against replica failure: + * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while + * SpeculatingReadExecutor will wait until it looks like the original request is in danger + * of timing out before performing extra reads. + */ public abstract class AbstractReadExecutor { private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class); - protected final ReadCallback handler; + protected final ReadCommand command; + protected final List targetReplicas; protected final RowDigestResolver resolver; - protected final List unfiltered; - protected final List endpoints; - protected final ColumnFamilyStore cfs; - - AbstractReadExecutor(ColumnFamilyStore cfs, - ReadCommand command, - ConsistencyLevel consistency_level, - List allReplicas, - List queryTargets) - throws UnavailableException + protected final ReadCallback handler; + + AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List targetReplicas) { - unfiltered = allReplicas; - this.endpoints = queryTargets; - this.resolver = new RowDigestResolver(command.ksName, command.key); - this.handler = new ReadCallback(resolver, consistency_level, command, this.endpoints); this.command = command; - this.cfs = cfs; + this.targetReplicas = targetReplicas; + resolver = new RowDigestResolver(command.ksName, command.key); + handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas); + } - handler.assureSufficientLiveNodes(); - assert !handler.endpoints.isEmpty(); + private static boolean isLocalRequest(InetAddress replica) + { + return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS; } - void executeAsync() + protected void makeDataRequests(Iterable endpoints) { - // The data-request message is sent to dataPoint, the node that will actually get the data for us - InetAddress dataPoint = handler.endpoints.get(0); - if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS) - { - logger.trace("reading data locally"); - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler)); - } - else + for (InetAddress endpoint : endpoints) { - logger.trace("reading data from {}", dataPoint); - MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler); + if (isLocalRequest(endpoint)) + { + logger.trace("reading data locally"); + StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler)); + } + else + { + logger.trace("reading data from {}", endpoint); + MessagingService.instance().sendRR(command.createMessage(), endpoint, handler); + } } + } - if (handler.endpoints.size() == 1) - return; - - // send the other endpoints a digest request + protected void makeDigestRequests(Iterable endpoints) + { ReadCommand digestCommand = command.copy(); digestCommand.setDigestQuery(true); - MessageOut message = null; - for (int i = 1; i < handler.endpoints.size(); i++) + MessageOut message = digestCommand.createMessage(); + for (InetAddress endpoint : endpoints) { - InetAddress digestPoint = handler.endpoints.get(i); - if (digestPoint.equals(FBUtilities.getBroadcastAddress())) + if (isLocalRequest(endpoint)) { logger.trace("reading digest locally"); StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler)); } else { - logger.trace("reading digest from {}", digestPoint); - // (We lazy-construct the digest Message object since it may not be necessary if we - // are doing a local digest read, or no digest reads at all.) - if (message == null) - message = digestCommand.createMessage(); - MessagingService.instance().sendRR(message, digestPoint, handler); + logger.trace("reading digest from {}", endpoint); + MessagingService.instance().sendRR(message, endpoint, handler); } } } - void speculate() - { - // noop by default. - } + /** + * Perform additional requests if it looks like the original will time out. May block while it waits + * to see if the original requests are answered first. + */ + public abstract void maybeTryAdditionalReplicas(); - Row get() throws ReadTimeoutException, DigestMismatchException + /** + * Get the replicas involved in the [finished] request. + * + * @return target replicas + the extra replica, *IF* we speculated. + */ + public abstract Iterable getContactedReplicas(); + + /** + * send the initial set of requests + */ + public abstract void executeAsync(); + + /** + * wait for an answer. Blocks until success or timeout, so it is caller's + * responsibility to call maybeTryAdditionalReplicas first. + */ + public Row get() throws ReadTimeoutException, DigestMismatchException { return handler.get(); } - public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistency_level) throws UnavailableException + /** + * @return an executor appropriate for the configured speculative read policy + */ + public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException { Keyspace keyspace = Keyspace.open(command.ksName); List allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.key); - CFMetaData metaData = Schema.instance.getCFMetaData(command.ksName, command.cfName); + ReadRepairDecision repairDecision = Schema.instance.getCFMetaData(command.ksName, command.cfName).newReadRepairDecision(); + List targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision); + + // Throw UAE early if we don't have enough replicas. + consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas); - ReadRepairDecision rrDecision = metaData.newReadRepairDecision(); - - if (rrDecision != ReadRepairDecision.NONE) { + // Fat client. Speculating read executors need access to cfs metrics and sampled latency, and fat clients + // can't provide that. So, for now, fat clients will always use NeverSpeculatingReadExecutor. + if (StorageService.instance.isClientMode()) + return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas); + + if (repairDecision != ReadRepairDecision.NONE) ReadRepairMetrics.attempted.mark(); - } - List queryTargets = consistency_level.filterForQuery(keyspace, allReplicas, rrDecision); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName); + RetryType retryType = cfs.metadata.getSpeculativeRetry().type; - if (StorageService.instance.isClientMode()) + // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. + if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) == allReplicas.size()) + return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas); + + if (targetReplicas.size() == allReplicas.size()) { - return new DefaultReadExecutor(null, command, consistency_level, allReplicas, queryTargets); + // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. + // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy + // (same amount of requests in total, but we turn 1 digest request into a full blown data request). + return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas); } - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName); - - switch (metaData.getSpeculativeRetry().type) + // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. + InetAddress extraReplica = allReplicas.get(targetReplicas.size()); + // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so + // we might have to find a replacement that's not already in targetReplicas. + if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica)) { - case ALWAYS: - return new SpeculateAlwaysExecutor(cfs, command, consistency_level, allReplicas, queryTargets); - case PERCENTILE: - case CUSTOM: - return queryTargets.size() < allReplicas.size() - ? new SpeculativeReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets) - : new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets); - default: - return new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets); + for (InetAddress address : allReplicas) + { + if (!targetReplicas.contains(address)) + { + extraReplica = address; + break; + } + } } + targetReplicas.add(extraReplica); + + if (retryType == RetryType.ALWAYS) + return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas); + else // PERCENTILE or CUSTOM. + return new SpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas); } - private static class DefaultReadExecutor extends AbstractReadExecutor + private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor { - public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List allReplicas, List queryTargets) throws UnavailableException + public NeverSpeculatingReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List targetReplicas) + { + super(command, consistencyLevel, targetReplicas); + } + + public void executeAsync() { - super(cfs, command, consistency_level, allReplicas, queryTargets); + makeDataRequests(targetReplicas.subList(0, 1)); + if (targetReplicas.size() > 1) + makeDigestRequests(targetReplicas.subList(1, targetReplicas.size())); + } + + public void maybeTryAdditionalReplicas() + { + // no-op + } + + public Iterable getContactedReplicas() + { + return targetReplicas; } } - private static class SpeculativeReadExecutor extends AbstractReadExecutor + private static class SpeculatingReadExecutor extends AbstractReadExecutor { - public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List allReplicas, List queryTargets) throws UnavailableException + private final ColumnFamilyStore cfs; + private volatile boolean speculated = false; + + public SpeculatingReadExecutor(ColumnFamilyStore cfs, + ReadCommand command, + ConsistencyLevel consistencyLevel, + List targetReplicas) { - super(cfs, command, consistency_level, allReplicas, queryTargets); - assert handler.endpoints.size() < unfiltered.size(); + super(command, consistencyLevel, targetReplicas); + this.cfs = cfs; } - @Override - void speculate() + public void executeAsync() + { + // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know + // that the last replica in our list is "extra." + List initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1); + + if (handler.blockfor < initialReplicas.size()) + { + // We're hitting additional targets for read repair. Since our "extra" replica is the least- + // preferred by the snitch, we do an extra data read to start with against a replica more + // likely to reply; better to let RR fail than the entire query. + makeDataRequests(initialReplicas.subList(0, 2)); + if (initialReplicas.size() > 2) + makeDigestRequests(initialReplicas.subList(2, initialReplicas.size())); + } + else + { + // not doing read repair; all replies are important, so it doesn't matter which nodes we + // perform data reads against vs digest. + makeDataRequests(initialReplicas.subList(0, 1)); + if (initialReplicas.size() > 1) + makeDigestRequests(initialReplicas.subList(1, initialReplicas.size())); + } + } + + public void maybeTryAdditionalReplicas() { // no latency information, or we're overloaded if (cfs.sampleLatency > TimeUnit.MILLISECONDS.toNanos(command.getTimeout())) @@ -182,69 +270,61 @@ public abstract class AbstractReadExecutor if (!handler.await(cfs.sampleLatency, TimeUnit.NANOSECONDS)) { - InetAddress endpoint = unfiltered.get(handler.endpoints.size()); - - // could be waiting on the data, or on enough digests - ReadCommand scommand = command; + // Could be waiting on the data, or on enough digests. + ReadCommand retryCommand = command; if (resolver.getData() != null) { - scommand = command.copy(); - scommand.setDigestQuery(true); + retryCommand = command.copy(); + retryCommand.setDigestQuery(true); } - logger.trace("Speculating read retry on {}", endpoint); - MessagingService.instance().sendRR(scommand.createMessage(), endpoint, handler); + InetAddress extraReplica = Iterables.getLast(targetReplicas); + logger.trace("speculating read retry on {}", extraReplica); + MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler); + speculated = true; + cfs.metric.speculativeRetry.inc(); } } + + public Iterable getContactedReplicas() + { + return speculated + ? targetReplicas + : targetReplicas.subList(0, targetReplicas.size() - 1); + } } - private static class SpeculateAlwaysExecutor extends AbstractReadExecutor + private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor { - public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List allReplicas, List queryTargets) throws UnavailableException + private final ColumnFamilyStore cfs; + + public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs, + ReadCommand command, + ConsistencyLevel consistencyLevel, + List targetReplicas) { - super(cfs, command, consistency_level, allReplicas, queryTargets); + super(command, consistencyLevel, targetReplicas); + this.cfs = cfs; } - @Override - void executeAsync() + public void maybeTryAdditionalReplicas() { - int limit = unfiltered.size() >= 2 ? 2 : 1; - for (int i = 0; i < limit; i++) - { - InetAddress endpoint = unfiltered.get(i); - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - { - logger.trace("reading full data locally"); - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler)); - } - else - { - logger.trace("reading full data from {}", endpoint); - MessagingService.instance().sendRR(command.createMessage(), endpoint, handler); - } - } - if (handler.endpoints.size() <= limit) - return; + // no-op + } - ReadCommand digestCommand = command.copy(); - digestCommand.setDigestQuery(true); - MessageOut message = digestCommand.createMessage(); - for (int i = limit; i < handler.endpoints.size(); i++) - { - // Send the message - InetAddress endpoint = handler.endpoints.get(i); - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - { - logger.trace("reading data locally, isDigest: {}", command.isDigestQuery()); - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler)); - } - else - { - logger.trace("reading full data from {}, isDigest: {}", endpoint, command.isDigestQuery()); - MessagingService.instance().sendRR(message, endpoint, handler); - } - } + public Iterable getContactedReplicas() + { + return targetReplicas; + } + + @Override + public void executeAsync() + { + makeDataRequests(targetReplicas.subList(0, 2)); + if (targetReplicas.size() > 2) + makeDigestRequests(targetReplicas.subList(2, targetReplicas.size())); + cfs.metric.speculativeRetry.inc(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 58847ba..7f9c192 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -50,7 +50,7 @@ public class ReadCallback implements IAsyncCallback resolver; private final SimpleCondition condition = new SimpleCondition(); final long start; - private final int blockfor; + final int blockfor; final List endpoints; private final IReadCommand command; private final ConsistencyLevel consistencyLevel; http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index e0d5dff..ffc65b9 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1197,10 +1197,11 @@ public class StorageProxy implements StorageProxyMBean * 4. If the digests (if any) match the data return the data * 5. else carry out read repair by getting data from all the nodes. */ - private static List fetchRows(List initialCommands, ConsistencyLevel consistency_level) + private static List fetchRows(List initialCommands, ConsistencyLevel consistencyLevel) throws UnavailableException, ReadTimeoutException { - List rows = new ArrayList(initialCommands.size()); + List rows = new ArrayList<>(initialCommands.size()); + // (avoid allocating a new list in the common case of nothing-to-retry) List commandsToRetry = Collections.emptyList(); do @@ -1217,13 +1218,13 @@ public class StorageProxy implements StorageProxyMBean ReadCommand command = commands.get(i); assert !command.isDigestQuery(); - AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level); + AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel); exec.executeAsync(); readExecutors[i] = exec; } - for (AbstractReadExecutor exec: readExecutors) - exec.speculate(); + for (AbstractReadExecutor exec : readExecutors) + exec.maybeTryAdditionalReplicas(); // read results and make a second pass for any digest mismatches List repairCommands = null; @@ -1238,13 +1239,13 @@ public class StorageProxy implements StorageProxyMBean exec.command.maybeTrim(row); rows.add(row); } + if (logger.isDebugEnabled()) logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start)); - } catch (ReadTimeoutException ex) { - int blockFor = consistency_level.blockFor(Keyspace.open(exec.command.getKeyspace())); + int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace())); int responseCount = exec.handler.getReceivedCount(); String gotData = responseCount > 0 ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)" @@ -1273,14 +1274,14 @@ public class StorageProxy implements StorageProxyMBean if (repairCommands == null) { - repairCommands = new ArrayList(); - repairResponseHandlers = new ArrayList>(); + repairCommands = new ArrayList<>(); + repairResponseHandlers = new ArrayList<>(); } repairCommands.add(exec.command); repairResponseHandlers.add(repairHandler); MessageOut message = exec.command.createMessage(); - for (InetAddress endpoint : exec.handler.endpoints) + for (InetAddress endpoint : exec.getContactedReplicas()) { Tracing.trace("Enqueuing full data read to {}", endpoint); MessagingService.instance().sendRR(message, endpoint, repairHandler); @@ -1288,8 +1289,7 @@ public class StorageProxy implements StorageProxyMBean } } - if (commandsToRetry != Collections.EMPTY_LIST) - commandsToRetry.clear(); + commandsToRetry.clear(); // read the results for the digest mismatch retries if (repairResponseHandlers != null) @@ -1319,8 +1319,8 @@ public class StorageProxy implements StorageProxyMBean catch (TimeoutException e) { Tracing.trace("Timed out on digest mismatch retries"); - int blockFor = consistency_level.blockFor(Keyspace.open(command.getKeyspace())); - throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true); + int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())); + throw new ReadTimeoutException(consistencyLevel, blockFor, blockFor, true); } // retry any potential short reads @@ -1329,7 +1329,7 @@ public class StorageProxy implements StorageProxyMBean { Tracing.trace("Issuing retry for read command"); if (commandsToRetry == Collections.EMPTY_LIST) - commandsToRetry = new ArrayList(); + commandsToRetry = new ArrayList<>(); commandsToRetry.add(retryCommand); continue; }