Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D5E1E200C80 for ; Thu, 25 May 2017 16:33:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D47EB160BCA; Thu, 25 May 2017 14:33:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F3E97160BC7 for ; Thu, 25 May 2017 16:33:53 +0200 (CEST) Received: (qmail 52456 invoked by uid 500); 25 May 2017 14:33:53 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 52447 invoked by uid 99); 25 May 2017 14:33:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 May 2017 14:33:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 19E3CE0016; Thu, 25 May 2017 14:33:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <9bf4325ded5042b0a2f702a52ce3c977@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: cc Date: Thu, 25 May 2017 14:33:53 +0000 (UTC) archived-at: Thu, 25 May 2017 14:33:55 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5075-cc ff0a2dd8a -> ab5aead4d cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab5aead4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab5aead4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab5aead4 Branch: refs/heads/ignite-5075-cc Commit: ab5aead4dcb651001c362326e6a0b50350b31c2e Parents: ff0a2dd Author: sboikov Authored: Thu May 25 17:33:46 2017 +0300 Committer: sboikov Committed: Thu May 25 17:33:46 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryEntry.java | 59 ++++++++++---------- .../CacheContinuousQueryEventBuffer.java | 7 +++ 2 files changed, 35 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ab5aead4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 9db92b2..28fdee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -51,6 +51,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private static final byte FILTERED_ENTRY = 0b0010; /** */ + private static final byte KEEP_BINARY = 0b0100; + + /** */ private static final EventType[] EVT_TYPE_VALS = EventType.values(); /** @@ -105,9 +108,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { @GridToStringInclude private AffinityTopologyVersion topVer; - /** Keep binary. */ - private boolean keepBinary; - /** */ private long filteredCnt; @@ -124,6 +124,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param keepBinary Keep binary flag. * @param part Partition. * @param updateCntr Update partition counter. * @param topVer Topology version if applicable. @@ -146,7 +147,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { this.part = part; this.updateCntr = updateCntr; this.topVer = topVer; - this.keepBinary = keepBinary; + + if (keepBinary) + flags |= KEEP_BINARY; } /** @@ -231,7 +234,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { return this; CacheContinuousQueryEntry e = new CacheContinuousQueryEntry( - cacheId, null, null, null, null, keepBinary, part, updateCntr, topVer); + cacheId, + null, + null, + null, + null, + false, + part, + updateCntr, + topVer); e.flags = flags; @@ -256,7 +267,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @return Keep binary flag. */ boolean isKeepBinary() { - return keepBinary; + return (flags & KEEP_BINARY) != 0; } /** @@ -370,42 +381,36 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 4: - if (!writer.writeBoolean("keepBinary", keepBinary)) - return false; - - writer.incrementState(); - - case 5: if (!writer.writeMessage("key", isFiltered() ? null : key)) return false; writer.incrementState(); - case 6: + case 5: if (!writer.writeMessage("newVal", isFiltered() ? null : newVal)) return false; writer.incrementState(); - case 7: + case 6: if (!writer.writeMessage("oldVal", isFiltered() ? null : oldVal)) return false; writer.incrementState(); - case 8: + case 7: if (!writer.writeInt("part", part)) return false; writer.incrementState(); - case 9: + case 8: if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); - case 10: + case 9: if (!writer.writeLong("updateCntr", updateCntr)) return false; @@ -457,14 +462,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 4: - keepBinary = reader.readBoolean("keepBinary"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: key = reader.readMessage("key"); if (!reader.isLastRead()) @@ -472,7 +469,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 6: + case 5: newVal = reader.readMessage("newVal"); if (!reader.isLastRead()) @@ -480,7 +477,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 7: + case 6: oldVal = reader.readMessage("oldVal"); if (!reader.isLastRead()) @@ -488,7 +485,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 8: + case 7: part = reader.readInt("part"); if (!reader.isLastRead()) @@ -496,7 +493,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 9: + case 8: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -504,7 +501,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 10: + case 9: updateCntr = reader.readLong("updateCntr"); if (!reader.isLastRead()) @@ -519,7 +516,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ab5aead4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index f496c8c..fd4029c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -96,6 +96,13 @@ public class CacheContinuousQueryEventBuffer { else ret = entries; + if (!pending.isEmpty()) { + if (ret == null) + ret = new ArrayList<>(); + + ret.addAll(pending.values()); + } + return ret; }