Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 78E8410134 for ; Tue, 8 Apr 2014 23:29:43 +0000 (UTC) Received: (qmail 95059 invoked by uid 500); 8 Apr 2014 23:29:41 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 94974 invoked by uid 500); 8 Apr 2014 23:29:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 94927 invoked by uid 99); 8 Apr 2014 23:29:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Apr 2014 23:29:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9737294EF04; Tue, 8 Apr 2014 23:29:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jasobrown@apache.org To: commits@cassandra.apache.org Date: Tue, 08 Apr 2014 23:29:41 -0000 Message-Id: <36f3d69f590e49a3a49b684b8390d107@git.apache.org> In-Reply-To: <53d575dd3735419bbaee8f5678983ab8@git.apache.org> References: <53d575dd3735419bbaee8f5678983ab8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1 Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: src/java/org/apache/cassandra/net/OutboundTcpConnection.java Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66b3b2bf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66b3b2bf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66b3b2bf Branch: refs/heads/trunk Commit: 66b3b2bf08577e7a6256318f1fc42e91b0c5d232 Parents: 76db70e 47ace44 Author: Jason Brown Authored: Tue Apr 8 16:28:33 2014 -0700 Committer: Jason Brown Committed: Tue Apr 8 16:28:33 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/net/OutboundTcpConnection.java | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b3b2bf/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b3b2bf/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 2bfa491,8b8872b..1781a5d --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@@ -142,39 -132,26 +142,39 @@@ public class OutboundTcpConnection exte throw new AssertionError(e); } - BlockingQueue tmp = backlog; - backlog = active; - active = tmp; } + currentMsgBufferCount = drainedMessages.size(); - MessageOut m = qm.message; - if (m == CLOSE_SENTINEL) + int count = drainedMessages.size(); + for (QueuedMessage qm : drainedMessages) { - disconnect(); - if (isStopped) - break; - continue; + try + { + MessageOut m = qm.message; + if (m == CLOSE_SENTINEL) + { + disconnect(); + if (isStopped) + break outer; + continue; + } - if (qm.timestamp < System.currentTimeMillis() - m.getTimeout()) ++ if (qm.isTimedOut(m.getTimeout())) + dropped.incrementAndGet(); + else if (socket != null || connect()) + writeConnected(qm, count == 1 && backlog.size() == 0); + else + // clear out the queue, else gossip messages back up. + backlog.clear(); + } + catch (Exception e) + { + // really shouldn't get here, as exception handling in writeConnected() is reasonably robust + // but we want to catch anything bad we don't drop the messages in the current batch + logger.error("error processing a message intended for {}", poolReference.endPoint(), e); + } + currentMsgBufferCount = --count; } - if (qm.isTimedOut(m.getTimeout())) - dropped.incrementAndGet(); - else if (socket != null || connect()) - writeConnected(qm); - else - // clear out the queue, else gossip messages back up. - active.clear(); + drainedMessages.clear(); } }