Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 412FC10CF6 for ; Fri, 5 Dec 2014 10:04:11 +0000 (UTC) Received: (qmail 47527 invoked by uid 500); 5 Dec 2014 10:04:11 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 47499 invoked by uid 500); 5 Dec 2014 10:04:11 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 47486 invoked by uid 99); 5 Dec 2014 10:04:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2014 10:04:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 05 Dec 2014 10:03:07 +0000 Received: (qmail 45612 invoked by uid 99); 5 Dec 2014 10:03:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2014 10:03:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2184E8BD4CF; Fri, 5 Dec 2014 10:03:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 05 Dec 2014 10:03:19 -0000 Message-Id: <07e4958126ea4a12966ccc6a7e8dbc93@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/32] incubator-ignite git commit: # Renaming X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java new file mode 100644 index 0000000..696f33f --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java @@ -0,0 +1,99 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.streamer.index; + +import org.gridgain.grid.*; + +/** + * Represents an actual instance of an index. Used by a {@link org.gridgain.grid.streamer.StreamerWindow} + * to perform event indexing. + *

+ * To configure index for a streamer window, use + * {@link org.gridgain.grid.streamer.window.StreamerWindowAdapter#setIndexes(StreamerIndexProvider[])}. + */ +public interface StreamerIndexProvider extends StreamerIndexProviderMBean { + /** + * Gets index name. + * + * @return Name of the index. + */ + public String getName(); + + /** + * Gets user view for this index. This view is a snapshot + * of a current index state. Once returned, it does not + * change over time. + * + * @return User view for this index. + */ + public StreamerIndex index(); + + /** + * Initializes the index. + */ + public void initialize(); + + /** + * Resets the index to an initial empty state. + */ + public void reset(); + + /** + * Disposes the index. + */ + public void dispose(); + + /** + * Adds an event to index. + * + * @param sync Index update synchronizer. + * @param evt Event to add to an index. + * @throws GridException If failed to add event to an index. + */ + public void add(StreamerIndexUpdateSync sync, E evt) throws GridException; + + /** + * Removes an event from index. + * + * @param sync Index update synchronizer. + * @param evt Event to remove from index. + * @throws GridException If failed to add event to an index. + */ + public void remove(StreamerIndexUpdateSync sync, E evt) throws GridException; + + /** + * Gets event indexing policy, which defines how events + * are tracked within an index. + * + * @return index policy. + */ + public StreamerIndexPolicy getPolicy(); + + /** + * Checks whether this index is unique or not. If it is, equal events + * are not allowed, which means that if a newly-added event is found + * to be equal to one of the already present events + * ({@link Object#equals(Object)} returns {@code true}), an exception + * is thrown. + * + * @return {@code True} for unique index. + */ + public boolean isUnique(); + + /** + * Finalizes an update operation. + * + * @param sync Index update synchronizer. + * @param evt Updated event. + * @param rollback Rollback flag. If {@code true}, a rollback was made. + * @param rmv Remove flag. If {@code true}, the event was removed from index. + */ + public void endUpdate(StreamerIndexUpdateSync sync, E evt, boolean rollback, boolean rmv); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderAdapter.java new file mode 100644 index 0000000..9f3448c --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderAdapter.java @@ -0,0 +1,788 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.streamer.index; + +import com.romix.scala.*; +import com.romix.scala.collection.concurrent.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; +import org.pcollections.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.gridgain.grid.streamer.index.StreamerIndexPolicy.*; + +/** + * Convenient {@link StreamerIndexProvider} adapter implementing base configuration methods. + */ +public abstract class StreamerIndexProviderAdapter implements StreamerIndexProvider { + /** */ + protected final IgniteClosure, V> entryToVal = + new C1, V>() { + @Override public V apply(StreamerIndexEntry e) { + return e.value(); + } + }; + + /** */ + protected final IgniteClosure, K> entryToKey = + new C1, K>() { + @Override public K apply(StreamerIndexEntry e) { + return e.key(); + } + }; + + /** Keys currently being updated. */ + private final ConcurrentMap locks = new ConcurrentHashMap8<>(); + + /** Index name. */ + private String name; + + /** Index policy. */ + private StreamerIndexPolicy plc = EVENT_TRACKING_OFF; + + /** Index updater. */ + private StreamerIndexUpdater updater; + + /** */ + private final LongAdder evtsCnt = new LongAdder(); + + /** Read write lock. */ + private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock(); + + /** */ + private boolean unique; + + /** */ + private final ThreadLocal threadLocKey = new ThreadLocal<>(); + + /** */ + private final ConcurrentMap, StreamerIndexUpdateSync> idxLocks = new ConcurrentHashMap8<>(); + + /** */ + private boolean keyCheck = true; + + /** + * Sets index name. + * + * @param name Index name. + */ + public void setName(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String getName() { + return name; + } + + /** + * Sets index policy. + * + * @param plc Policy. + */ + public void setPolicy(StreamerIndexPolicy plc) { + this.plc = plc; + } + + /** {@inheritDoc} */ + @Override public StreamerIndexPolicy getPolicy() { + return plc; + } + + /** + * Sets unique flag. + * + * @param unique {@code True} for unique index. + */ + public void setUnique(boolean unique) { + this.unique = unique; + } + + /** {@inheritDoc} */ + @Override public boolean isUnique() { + return unique; + } + + /** + * Sets index updater. + * + * @param updater Updater. + */ + public void setUpdater(StreamerIndexUpdater updater) { + this.updater = updater; + } + + /** + * Gets index updater. + * + * @return Updater. + */ + public StreamerIndexUpdater getUpdater() { + return updater; + } + + /** {@inheritDoc} */ + @Override public void dispose() { + // No-op. + } + + /** + * Add event to the index. + * + * @param sync Sync. + * @param evt Event. + */ + @Override public void add(StreamerIndexUpdateSync sync, E evt) throws GridException { + assert evt != null; + + if (threadLocKey.get() != null) + throw new IllegalStateException("Previous operation has not been finished: " + threadLocKey.get()); + + K key = updater.indexKey(evt); + + if (key == null) + return; // Ignore event. + + validateIndexKey(key); + + readLock(); + + threadLocKey.set(key); + + lockKey(key, sync); + + add(evt, key, sync); + } + + /** + * Remove event from the index. + * + * @param sync Sync. + * @param evt Event. + */ + @Override public void remove(StreamerIndexUpdateSync sync, E evt) throws GridException { + assert evt != null; + + if (threadLocKey.get() != null) + throw new IllegalStateException("Previous operation has not been finished: " + threadLocKey.get()); + + K key = updater.indexKey(evt); + + assert key != null; + + validateIndexKey(key); + + readLock(); + + threadLocKey.set(key); + + lockKey(key, sync); + + remove(evt, key, sync); + } + + /** {@inheritDoc} */ + @Override public void endUpdate(StreamerIndexUpdateSync sync, E evt, boolean rollback, boolean rmv) { + K key = threadLocKey.get(); + + if (key == null) + return; + + if (!rollback) { + if (rmv) + evtsCnt.decrement(); + else + evtsCnt.increment(); + } + + threadLocKey.remove(); + + endUpdate0(sync, evt, key, rollback); + + unlockKey(key, sync); + + readUnlock(); + } + + /** + * @param sync Sync. + * @param evt Event. + * @param key Key. + * @param rollback Rollback flag. + */ + protected abstract void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback); + + /** {@inheritDoc} */ + @Override public void reset() { + writeLock(); + + try { + reset0(); + } + finally { + writeUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public StreamerIndex index() { + writeLock(); + + try { + return index0(); + } + finally { + writeUnlock(); + } + } + + /** + * Called on reset. + */ + protected abstract void reset0(); + + /** + * @return Index + */ + protected abstract StreamerIndex index0(); + + /** + * + */ + protected void readLock() { + rwLock.readLock(); + } + + /** + * + */ + protected void readUnlock() { + rwLock.readUnlock(); + } + + /** + * + */ + protected void writeLock() { + rwLock.writeLock(); + } + + /** + * + */ + protected void writeUnlock() { + rwLock.writeUnlock(); + } + + /** + * @return Events count, + */ + protected int eventsCount() { + return evtsCnt.intValue(); + } + + /** + * Add event to the index. + * + * @param evt Event. + * @param key key. + * @param sync Sync. + * @throws GridException If failed. + */ + protected abstract void add(E evt, K key, StreamerIndexUpdateSync sync) throws GridException; + + /** + * Remove event from the index. + * + * @param evt Event. + * @param key Key. + * @param sync Sync. + * @throws GridException If failed. + */ + protected abstract void remove(E evt, K key, StreamerIndexUpdateSync sync) throws GridException; + + /** + * Lock updates on particular key. + * + * @param key Key. + * @param sync Sync. + * @throws GridException If failed. + */ + private void lockKey(K key, StreamerIndexUpdateSync sync) throws GridException { + assert key != null; + assert sync != null; + + while (true) { + StreamerIndexUpdateSync old = locks.putIfAbsent(key, sync); + + if (old != null) { + try { + old.await(); + } + catch (InterruptedException e) { + throw new GridException("Failed to lock on key (thread has been interrupted): " + key, e); + } + + // No point to replace or remove sync here. + // Owner will first remove it, then will finish the sync. + } + else + break; + } + } + + /** + * Unlock updates on particular key. + * + * @param key Key. + * @param sync Sync. + */ + private void unlockKey(K key, StreamerIndexUpdateSync sync) { + assert key != null; + + locks.remove(key, sync); + } + + /** + * Lock updates on particular key. + * + * @param key Key. + * @param sync Sync. + * @throws GridException If failed. + */ + protected void lockIndexKey(IndexKey key, StreamerIndexUpdateSync sync) throws GridException { + assert key != null; + assert sync != null; + assert isUnique(); + + while (true) { + StreamerIndexUpdateSync old = idxLocks.putIfAbsent(key, sync); + + if (old != null) { + try { + old.await(); + } + catch (InterruptedException e) { + throw new GridException("Failed to lock on key (thread has been interrupted): " + key, e); + } + + // No point to replace or remove sync here. + // Owner will first remove it, then will finish the sync. + } + else + break; + } + } + + /** + * Unlock updates on particular key. + * + * @param key Key. + * @param sync Sync. + */ + protected void unlockIndexKey(IndexKey key, StreamerIndexUpdateSync sync) { + assert key != null; + assert isUnique(); + + idxLocks.remove(key, sync); + } + + /** + * @param key Key, + * @param val Value. + * @param idxKey Index key. + * @param evt Event. + * @return Entry. + */ + protected Entry newEntry(K key, V val, @Nullable IndexKey idxKey, E evt) { + StreamerIndexPolicy plc = getPolicy(); + + switch (plc) { + case EVENT_TRACKING_OFF: + return new NonTrackingEntry<>(key, val, idxKey); + + case EVENT_TRACKING_ON: + return new EventTrackingEntry<>(addToCollection(null, evt), key, val, idxKey); + + default: + assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc; + + return new DedupTrackingEntry<>(addToMap(null, evt), key, val, idxKey); + } + } + + /** + * @param oldEntry Old entry. + * @param key Key, + * @param val Value. + * @param idxKey Index key. + * @param evt Event. + * @return Entry. + */ + protected Entry addEvent(StreamerIndexEntry oldEntry, K key, V val, + @Nullable IndexKey idxKey, E evt) { + StreamerIndexPolicy plc = getPolicy(); + + switch (plc) { + case EVENT_TRACKING_OFF: + return new NonTrackingEntry<>(key, val, idxKey); + + case EVENT_TRACKING_ON: + return new EventTrackingEntry<>(addToCollection(oldEntry.events(), evt), key, val, idxKey); + + default: + assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc; + + return new DedupTrackingEntry<>(addToMap(((DedupTrackingEntry)oldEntry).rawEvents(), evt), + key, val, idxKey); + } + } + + /** + * @param oldEntry Old entry. + * @param key Key, + * @param val Value. + * @param idxKey Index key. + * @param evt Event. + * @return Entry. + */ + protected Entry removeEvent(StreamerIndexEntry oldEntry, K key, V val, + @Nullable IndexKey idxKey, E evt) { + StreamerIndexPolicy plc = getPolicy(); + + switch (plc) { + case EVENT_TRACKING_OFF: + return new NonTrackingEntry<>(key, val, idxKey); + + case EVENT_TRACKING_ON: + Collection oldEvts = oldEntry.events(); + + assert oldEvts != null; // Event tracking is on. + + Collection newEvts = removeFromCollection(oldEvts, evt); + + return new EventTrackingEntry<>(newEvts != null ? newEvts : oldEvts, key, val, idxKey); + + default: + assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc; + + Map oldMap = ((DedupTrackingEntry)oldEntry).rawEvents(); + + assert oldMap != null; // Event tracking is on. + + Map newMap = removeFromMap(oldMap, evt); + + return new DedupTrackingEntry<>(newMap != null ? newMap : oldMap, key, val, idxKey); + } + } + + /** + * @param col Collection. + * @param evt Event. + * @return Cloned collection. + */ + protected static Collection addToCollection(@Nullable Collection col, E evt) { + PVector res = col == null ? TreePVector.empty() : (PVector)col; + + return res.plus(evt); + } + + /** + * @param map Collection. + * @param evt Event. + * @return Cloned map. + */ + protected static Map addToMap(@Nullable Map map, E evt) { + HashPMap res = map == null ? HashTreePMap.empty() : (HashPMap)map; + + Integer cnt = res.get(evt); + + return cnt != null ? res.minus(evt).plus(evt, cnt + 1) : res.plus(evt, 1); + } + + /** + * @param col Collection. + * @param evt Event. + * @return Cloned collection. + */ + @Nullable protected static Collection removeFromCollection(@Nullable Collection col, E evt) { + if (col == null) + return null; + + PVector res = (PVector)col; + + res = res.minus(evt); + + return res.isEmpty() ? null : res; + } + + /** + * @param map Collection. + * @param evt Event. + * @return Cloned map. + */ + @Nullable protected static Map removeFromMap(@Nullable Map map, E evt) { + if (map == null) + return null; + + HashPMap res = (HashPMap)map; + + Integer cnt = res.get(evt); + + return cnt == null ? res : cnt == 1 ? res.minus(evt) : res.minus(evt).plus(evt, cnt - 1); + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public String updaterClass() { + return updater.getClass().getName(); + } + + /** {@inheritDoc} */ + @Override public boolean unique() { + return unique; + } + + /** {@inheritDoc} */ + @Override public StreamerIndexPolicy policy() { + return plc; + } + + /** {@inheritDoc} */ + @Override public int size() { + return index0().size(); + } + + /** + * Validates that given index key has overridden equals and hashCode methods. + * + * @param key Index key. + * @throws IllegalArgumentException If validation fails. + */ + private void validateIndexKey(@Nullable Object key) { + if (keyCheck) { + keyCheck = false; + + if (key == null) + return; + + if (!U.overridesEqualsAndHashCode(key)) + throw new IllegalArgumentException("Index key must override hashCode() and equals() methods: " + + key.getClass().getName()); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StreamerIndexProviderAdapter.class, this); + } + + /** + * Streamer window index key. + */ + protected interface IndexKey { + /** + * @return Value associated with this key. + */ + public V value(); + } + + /** + * Utility method to safely get values from TrieMap. + * See: https://github.com/romix/java-concurrent-hash-trie-map/issues/4 + * + * @param key Key. + * @param map Trie map. + * @return Value from map. + */ + @SuppressWarnings({"IfMayBeConditional", "TypeMayBeWeakened"}) + protected static V trieGet(K key, TrieMap map) { + Object r = map.get(key); + + if(r instanceof Some) + return ((Some)r).get (); + else if(r instanceof None) + return null; + else + return (V)r; + } + + /** + * Streamer window index entry. + */ + protected abstract static class Entry implements StreamerIndexEntry { + /** */ + private final K key; + + /** */ + private final V val; + + /** */ + private final IndexKey idxKey; + + /** + * @param key Key. + * @param val Value. + * @param idxKey Key index. + */ + Entry(K key, V val, @Nullable IndexKey idxKey) { + assert key != null; + assert val != null; + assert idxKey == null || idxKey.value() == val : "Keys are invalid [idxKey=" + idxKey + ", val=" + val +']'; + + this.key = key; + this.val = val; + this.idxKey = idxKey; + } + + /** {@inheritDoc} */ + @Override public K key() { + return key; + } + + /** {@inheritDoc} */ + @Override public V value() { + return val; + } + + /** + * @return Internal key. + */ + @Nullable public IndexKey keyIndex() { + return idxKey; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (!(obj instanceof Entry)) + return false; + + StreamerIndexEntry e = (StreamerIndexEntry)obj; + + return key.equals(e.key()); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Entry.class, this, "identity", System.identityHashCode(this)); + } + } + + /** + * Entry with index policy {@link StreamerIndexPolicy#EVENT_TRACKING_OFF}. + */ + protected static class NonTrackingEntry extends Entry { + /** + * @param key Key. + * @param val Value. + * @param idxKey Key index. + */ + public NonTrackingEntry(K key, V val, @Nullable IndexKey idxKey) { + super(key, val, idxKey); + } + + /** {@inheritDoc} */ + @Override public Collection events() { + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NonTrackingEntry.class, this, super.toString()); + } + } + + /** + * Entry with index policy {@link StreamerIndexPolicy#EVENT_TRACKING_ON}. + */ + protected static class EventTrackingEntry extends Entry { + /** */ + private final Collection evts; + + /** + * @param evts Events. + * @param key Key. + * @param val Value. + * @param idxKey Key index. + */ + public EventTrackingEntry(Collection evts, K key, V val, @Nullable IndexKey idxKey) { + super(key, val, idxKey); + + assert evts == null || !evts.isEmpty() : "Invalid events: " + evts; + + this.evts = evts; + } + + /** {@inheritDoc} */ + @Override public Collection events() { + return evts; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(EventTrackingEntry.class, this, "evtCnt", evts.size(), "parent", super.toString()); + } + } + + /** + * Entry with index policy {@link StreamerIndexPolicy#EVENT_TRACKING_ON_DEDUP}. + */ + protected static class DedupTrackingEntry extends Entry { + /** */ + private final Map evts; + + /** + * @param evts Events. + * @param key Key. + * @param val Value. + * @param idxKey Key index. + */ + public DedupTrackingEntry(Map evts, K key, V val, @Nullable IndexKey idxKey) { + super(key, val, idxKey); + + assert evts == null || !evts.isEmpty() : "Invalid events: " + evts; + + this.evts = evts; + } + + /** {@inheritDoc} */ + @Override public Collection events() { + return Collections.unmodifiableSet(evts.keySet()); + } + + /** + * @return Events. + */ + @Nullable public Map rawEvents() { + return evts; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DedupTrackingEntry.class, this, "evtCnt", evts.size(), "parent", super.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderMBean.java new file mode 100644 index 0000000..cf89c29 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderMBean.java @@ -0,0 +1,66 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.streamer.index; + +import org.apache.ignite.mbean.*; +import org.jetbrains.annotations.*; + +/** + * Streamer window index provider MBean. + */ +public interface StreamerIndexProviderMBean { + /** + * Index name. + * + * @return Index name. + */ + @IgniteMBeanDescription("Index name.") + @Nullable public String name(); + + /** + * Gets index updater class name. + * + * @return Index updater class. + */ + @IgniteMBeanDescription("Index updater class name.") + public String updaterClass(); + + /** + * Gets index unique flag. + * + * @return Index unique flag. + */ + @IgniteMBeanDescription("Index unique flag.") + public boolean unique(); + + /** + * Returns {@code true} if index supports sorting and therefore can perform range operations. + * + * @return Index sorted flag. + */ + @IgniteMBeanDescription("Index sorted flag.") + public boolean sorted(); + + /** + * Gets index policy. + * + * @return Index policy. + */ + @IgniteMBeanDescription("Index policy.") + public StreamerIndexPolicy policy(); + + /** + * Gets current index size. + * + * @return Current index size. + */ + @IgniteMBeanDescription("Current index size.") + public int size(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdateSync.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdateSync.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdateSync.java new file mode 100644 index 0000000..dfa761c --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdateSync.java @@ -0,0 +1,69 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.streamer.index; + +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Streamer index update synchronizer. + *

+ * Used in {@link StreamerIndexProvider} to synchronize + * operations on window index. + * + * @see StreamerIndexProvider + * + */ +public class StreamerIndexUpdateSync { + /** */ + private volatile int res; + + /** + * Waits for a notification from another thread, which + * should call {@link #finish(int)} with an operation result. + * That result is returned by this method. + * + * @return Operation results, passed to {@link #finish(int)}. + * @throws InterruptedException If wait was interrupted. + */ + public int await() throws InterruptedException { + int res0 = res; + + if (res0 == 0) { + synchronized (this) { + while ((res0 = res) == 0) + wait(); + } + } + + assert res0 != 0; + + return res0; + } + + /** + * Notifies all waiting threads to finish waiting. + * + * @param res Operation result to return from {@link #await()}. + */ + public void finish(int res) { + assert res != 0; + + synchronized (this) { + this.res = res; + + notifyAll(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StreamerIndexUpdateSync.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdater.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdater.java new file mode 100644 index 0000000..77177f9 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdater.java @@ -0,0 +1,80 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.streamer.index; + +import org.gridgain.grid.*; +import org.jetbrains.annotations.*; + +/** + * Index updater. The main responsibility of index updater is to maintain index values + * up to date whenever events are added or removed from window. + *

+ * Updater is provided to index provider in configuration usually via + * {@link StreamerIndexProviderAdapter#setUpdater(StreamerIndexUpdater)} method. + */ +public interface StreamerIndexUpdater { + /** + * Given an event, extract index key. For example, if you have a 'Person' object + * with field 'age' and need to index based on this field, then this method + * should return the value of age field. + *

+ * If {@code null} is returned then event will be ignored by the index. + * + * @param evt Event being added or removed from the window. + * @return Index key for this event. + */ + @Nullable public K indexKey(E evt); + + /** + * Gets initial value for the index or {@code null} if event should be ignored. + * This method is called every time when an entry is added to the window in + * order to get initial value for given key. + * + * @param evt Event being added to or removed from window. + * @param key Index key return by {@link #indexKey(Object)} method. + * @return Initial value for given key, if {@code null} then event will be + * ignored and index entry will not be created. + */ + @Nullable public V initialValue(E evt, K key); + + /** + * Callback invoked whenever an event is being added to the window. Given a key and + * a current index value for this key, the implementation should return the new + * value for this key. If returned value is {@code null}, then current entry will + * be removed from the index. + *

+ * If index is sorted, then sorting happens based on the returned value. + * + * @param entry Current index entry. + * @param evt New event. + * @return New index value for given key, if {@code null}, then current + * index entry will be removed the index. + * @throws GridException If entry should not be added to index (e.g. if uniqueness is violated). + */ + @Nullable public V onAdded(StreamerIndexEntry entry, E evt) throws GridException; + + /** + * Callback invoked whenever an event is being removed from the window and has + * index entry for given key. If there was no entry for given key, then + * {@code onRemoved()} will not be called. + *

+ * Given a key and a current index value for this key, the implementation should return the new + * value for this key. If returned value is {@code null}, then current entry will + * be removed from the index. + *

+ * If index is sorted, then sorting happens based on the returned value. + * + * @param entry Current index entry. + * @param evt Event being removed from the window. + * @return New index value for given key, if {@code null}, then current + * index entry will be removed the index. + */ + @Nullable public V onRemoved(StreamerIndexEntry entry, E evt); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/GridStreamerHashIndexProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/GridStreamerHashIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/GridStreamerHashIndexProvider.java deleted file mode 100644 index e6a8618..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/GridStreamerHashIndexProvider.java +++ /dev/null @@ -1,493 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.streamer.index.hash; - -import com.romix.scala.collection.concurrent.*; -import org.gridgain.grid.*; -import org.gridgain.grid.streamer.index.*; -import org.gridgain.grid.streamer.index.tree.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.gridgain.grid.streamer.index.GridStreamerIndexPolicy.*; - -/** - * Hash index implementation of a {@link GridStreamerIndexProvider}. - *

- * This implementation uses a concurrent hash map, which is extremely - * efficient for CRUD operations. It does not, however, maintain the - * ordering of entries, so, operations which imply ordering are not - * supported. - *

- * If ordering is required, consider using {@link GridStreamerTreeIndexProvider}. - * - * @see GridStreamerTreeIndexProvider - * - */ -public class GridStreamerHashIndexProvider extends GridStreamerIndexProviderAdapter { - /** */ - private TrieMap> key2Entry; - - /** */ - private final ThreadLocal> state = new ThreadLocal<>(); - - /** {@inheritDoc} */ - @Override protected GridStreamerIndex index0() { - return new Index<>(); - } - - /** {@inheritDoc} */ - @Override public void initialize() { - key2Entry = new TrieMap<>(); - } - - /** {@inheritDoc} */ - @Override public void reset0() { - // This will recreate maps. - initialize(); - } - - /** {@inheritDoc} */ - @Override protected void add(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException { - State state0 = state.get(); - - if (state0 != null) - throw new IllegalStateException("Previous operation has not been finished: " + state0); - - Entry oldEntry = trieGet(key, key2Entry); - - GridStreamerIndexUpdater updater = getUpdater(); - - if (oldEntry == null) { - V val = updater.initialValue(evt, key); - - if (val == null) - return; // Ignore event. - - state0 = new State<>(null, null, false); - - state.set(state0); - - Entry newEntry = newEntry(key, val, null, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - // Put new entry. - Entry rmv = key2Entry.put(key, newEntry); - - assert rmv == null; - - // Update passed. - state0.finished(true); - } - else { - if (isUnique()) - throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key + ']'); - - V val = updater.onAdded(oldEntry, evt); - - if (val == null) { - remove(evt, key, sync); - - return; - } - - state0 = new State<>(oldEntry, null, false); - - state.set(state0); - - Entry newEntry = addEvent(oldEntry, key, val, null, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - // Replace former entry with the new one. - Entry rmv = key2Entry.put(key, newEntry); - - assert rmv != null; - - // Update passed. - state0.finished(true); - } - } - - /** {@inheritDoc} */ - @Override protected void remove(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException { - State state0 = state.get(); - - if (state0 != null) - throw new IllegalStateException("Previous operation has not been finished: " + state0); - - Entry oldEntry = trieGet(key, key2Entry); - - if (oldEntry == null) - return; - - GridStreamerIndexUpdater updater = getUpdater(); - - V val = updater.onRemoved(oldEntry, evt); - - if (val == null) { - state0 = new State<>(oldEntry, null, false); - - state.set(state0); - - boolean b = key2Entry.remove(key, oldEntry); - - assert b; - - state0.finished(true); - } - else { - state0 = new State<>(oldEntry, null, false); - - state.set(state0); - - Entry newEntry = removeEvent(oldEntry, key, val, null, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - // Replace former entry with the new one. - Entry rmv = key2Entry.put(key, newEntry); - - assert rmv != null; - - state0.finished(true); - } - } - - /** {@inheritDoc} */ - @Override protected void endUpdate0(GridStreamerIndexUpdateSync sync, E evt, K key, boolean rollback) { - State state0 = state.get(); - - if (state0 == null) - return; - - state.remove(); - - if (rollback && state0.finished()) { - Entry oldEntry = state0.oldEntry(); - Entry newEntry = state0.newEntry(); - - // Rollback after index was updated. - if (oldEntry != null && newEntry != null) { - boolean b = key2Entry.replace(key, newEntry, oldEntry); - - assert b; - } - else if (newEntry == null) { - // Old was removed. Need to put it back. - Entry old = key2Entry.put(key, oldEntry); - - assert old == null; - } - else { - assert oldEntry == null; - - // New entry was added. Remove it. - boolean b = key2Entry.remove(key, newEntry); - - assert b; - } - } - } - - /** {@inheritDoc} */ - @Override public boolean sorted() { - return false; - } - - /** - * - */ - private class Index> implements GridStreamerIndex { - /** */ - private final TrieMap> key2Entry0 = key2Entry.readOnlySnapshot(); - - /** */ - private final int evtsCnt = eventsCount(); - - /** {@inheritDoc} */ - @Nullable @Override public String name() { - return getName(); - } - - /** {@inheritDoc} */ - @Override public boolean unique() { - return isUnique(); - } - - /** {@inheritDoc} */ - @Override public boolean sorted() { - return false; - } - - /** {@inheritDoc} */ - @Override public GridStreamerIndexPolicy policy() { - return getPolicy(); - } - - /** {@inheritDoc} */ - @Override public int size() { - return key2Entry0.size(); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridStreamerIndexEntry entry(K key) { - A.notNull(key, "key"); - - return trieGet(key, key2Entry0); - } - - /** {@inheritDoc} */ - @Override public Collection> entries(int cnt) { - A.ensure(cnt >= 0, "cnt >= 0"); - - Collection vals = Collections.unmodifiableCollection(key2Entry0.values()); - - return (Collection>)(cnt == 0 ? vals : F.limit(vals, cnt)); - } - - /** {@inheritDoc} */ - @Override public Set keySet(int cnt) { - A.ensure(cnt >= 0, "cnt >= 0"); - - return cnt == 0 ? Collections.unmodifiableSet(key2Entry0.keySet()) : - F.limit(key2Entry0.keySet(), cnt); - } - - /** {@inheritDoc} */ - @Override public Collection values(int cnt) { - Collection> col = entries(cnt); - - return F.viewReadOnly(col, entryToVal); - } - - /** {@inheritDoc} */ - @Override public Collection events(int cnt) { - A.ensure(cnt >= 0, "cnt >= 0"); - - if (getPolicy() == EVENT_TRACKING_OFF) - throw new IllegalStateException("Event tracking is off: " + this); - - Collection evts = new AbstractCollection() { - @NotNull @Override public Iterator iterator() { - return new Iterator() { - private final Iterator> entryIter = key2Entry0.values().iterator(); - - private Iterator evtIter; - - private boolean moved = true; - - private boolean more; - - @Override public boolean hasNext() { - if (!moved) - return more; - - moved = false; - - if (evtIter != null && evtIter.hasNext()) - return more = true; - - while (entryIter.hasNext()) { - evtIter = eventsIterator(entryIter.next()); - - if (evtIter.hasNext()) - return more = true; - } - - return more = false; - } - - @Override public E next() { - if (hasNext()) { - moved = true; - - return evtIter.next(); - } - - throw new NoSuchElementException(); - } - - @Override public void remove() { - assert false; - } - }; - } - - @Override public int size() { - return evtsCnt; - } - - /** - * @param entry Entry. - * @return Events iterator. - */ - @SuppressWarnings("fallthrough") - Iterator eventsIterator(GridStreamerIndexEntry entry) { - switch (getPolicy()) { - case EVENT_TRACKING_ON: - case EVENT_TRACKING_ON_DEDUP: - Collection evts = entry.events(); - - assert evts != null; - - return evts.iterator(); - - default: - assert false; - - throw new IllegalStateException("Event tracking is off: " + Index.this); - } - } - }; - - - return cnt == 0 ? evts : F.limit(evts, cnt); - } - - /** {@inheritDoc} */ - @Override public Set> entrySet(V val) { - return entrySet(true, val, true, val, true); - } - - /** {@inheritDoc} */ - @Override public Set> entrySet(final boolean asc, @Nullable final V fromVal, - final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Set keySet(V val) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Set keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl, - @Nullable final V toVal, final boolean toIncl) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Collection values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, - boolean toIncl) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Collection events(V val) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Collection events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl, - @Nullable final V toVal, final boolean toIncl) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridStreamerIndexEntry firstEntry() { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridStreamerIndexEntry lastEntry() { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Iterator> iterator() { - return entries(0).iterator(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Index.class, this, "provider", GridStreamerHashIndexProvider.this); - } - } - - /** - * - */ - private static class State { - /** */ - private Entry oldEntry; - - /** */ - private Entry newEntry; - - /** */ - private boolean finished; - - /** - * @param oldEntry Old. - * @param newEntry New. - * @param finished Finished. - */ - private State(@Nullable Entry oldEntry, @Nullable Entry newEntry, boolean finished) { - this.oldEntry = oldEntry; - this.newEntry = newEntry; - this.finished = finished; - } - - /** - * @return Old. - */ - Entry oldEntry() { - return oldEntry; - } - - /** - * @param oldEntry Old. - */ - void oldEntry(Entry oldEntry) { - this.oldEntry = oldEntry; - } - - /** - * @return New. - */ - Entry newEntry() { - return newEntry; - } - - /** - * @param newEntry New. - */ - void newEntry(Entry newEntry) { - this.newEntry = newEntry; - } - - /** - * @return Finished. - */ - boolean finished() { - return finished; - } - - /** - * @param finished Finished. - */ - void finished(boolean finished) { - this.finished = finished; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(State.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/StreamerHashIndexProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/StreamerHashIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/StreamerHashIndexProvider.java new file mode 100644 index 0000000..8b7abc9 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/StreamerHashIndexProvider.java @@ -0,0 +1,492 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.streamer.index.hash; + +import com.romix.scala.collection.concurrent.*; +import org.gridgain.grid.*; +import org.gridgain.grid.streamer.index.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.gridgain.grid.streamer.index.StreamerIndexPolicy.*; + +/** + * Hash index implementation of a {@link org.gridgain.grid.streamer.index.StreamerIndexProvider}. + *

+ * This implementation uses a concurrent hash map, which is extremely + * efficient for CRUD operations. It does not, however, maintain the + * ordering of entries, so, operations which imply ordering are not + * supported. + *

+ * If ordering is required, consider using {@link org.gridgain.grid.streamer.index.tree.StreamerTreeIndexProvider}. + * + * @see org.gridgain.grid.streamer.index.tree.StreamerTreeIndexProvider + * + */ +public class StreamerHashIndexProvider extends StreamerIndexProviderAdapter { + /** */ + private TrieMap> key2Entry; + + /** */ + private final ThreadLocal> state = new ThreadLocal<>(); + + /** {@inheritDoc} */ + @Override protected StreamerIndex index0() { + return new Index<>(); + } + + /** {@inheritDoc} */ + @Override public void initialize() { + key2Entry = new TrieMap<>(); + } + + /** {@inheritDoc} */ + @Override public void reset0() { + // This will recreate maps. + initialize(); + } + + /** {@inheritDoc} */ + @Override protected void add(E evt, K key, StreamerIndexUpdateSync sync) throws GridException { + State state0 = state.get(); + + if (state0 != null) + throw new IllegalStateException("Previous operation has not been finished: " + state0); + + Entry oldEntry = trieGet(key, key2Entry); + + StreamerIndexUpdater updater = getUpdater(); + + if (oldEntry == null) { + V val = updater.initialValue(evt, key); + + if (val == null) + return; // Ignore event. + + state0 = new State<>(null, null, false); + + state.set(state0); + + Entry newEntry = newEntry(key, val, null, evt); + + // Save new entry to state. + state0.newEntry(newEntry); + + // Put new entry. + Entry rmv = key2Entry.put(key, newEntry); + + assert rmv == null; + + // Update passed. + state0.finished(true); + } + else { + if (isUnique()) + throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key + ']'); + + V val = updater.onAdded(oldEntry, evt); + + if (val == null) { + remove(evt, key, sync); + + return; + } + + state0 = new State<>(oldEntry, null, false); + + state.set(state0); + + Entry newEntry = addEvent(oldEntry, key, val, null, evt); + + // Save new entry to state. + state0.newEntry(newEntry); + + // Replace former entry with the new one. + Entry rmv = key2Entry.put(key, newEntry); + + assert rmv != null; + + // Update passed. + state0.finished(true); + } + } + + /** {@inheritDoc} */ + @Override protected void remove(E evt, K key, StreamerIndexUpdateSync sync) throws GridException { + State state0 = state.get(); + + if (state0 != null) + throw new IllegalStateException("Previous operation has not been finished: " + state0); + + Entry oldEntry = trieGet(key, key2Entry); + + if (oldEntry == null) + return; + + StreamerIndexUpdater updater = getUpdater(); + + V val = updater.onRemoved(oldEntry, evt); + + if (val == null) { + state0 = new State<>(oldEntry, null, false); + + state.set(state0); + + boolean b = key2Entry.remove(key, oldEntry); + + assert b; + + state0.finished(true); + } + else { + state0 = new State<>(oldEntry, null, false); + + state.set(state0); + + Entry newEntry = removeEvent(oldEntry, key, val, null, evt); + + // Save new entry to state. + state0.newEntry(newEntry); + + // Replace former entry with the new one. + Entry rmv = key2Entry.put(key, newEntry); + + assert rmv != null; + + state0.finished(true); + } + } + + /** {@inheritDoc} */ + @Override protected void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback) { + State state0 = state.get(); + + if (state0 == null) + return; + + state.remove(); + + if (rollback && state0.finished()) { + Entry oldEntry = state0.oldEntry(); + Entry newEntry = state0.newEntry(); + + // Rollback after index was updated. + if (oldEntry != null && newEntry != null) { + boolean b = key2Entry.replace(key, newEntry, oldEntry); + + assert b; + } + else if (newEntry == null) { + // Old was removed. Need to put it back. + Entry old = key2Entry.put(key, oldEntry); + + assert old == null; + } + else { + assert oldEntry == null; + + // New entry was added. Remove it. + boolean b = key2Entry.remove(key, newEntry); + + assert b; + } + } + } + + /** {@inheritDoc} */ + @Override public boolean sorted() { + return false; + } + + /** + * + */ + private class Index> implements StreamerIndex { + /** */ + private final TrieMap> key2Entry0 = key2Entry.readOnlySnapshot(); + + /** */ + private final int evtsCnt = eventsCount(); + + /** {@inheritDoc} */ + @Nullable @Override public String name() { + return getName(); + } + + /** {@inheritDoc} */ + @Override public boolean unique() { + return isUnique(); + } + + /** {@inheritDoc} */ + @Override public boolean sorted() { + return false; + } + + /** {@inheritDoc} */ + @Override public StreamerIndexPolicy policy() { + return getPolicy(); + } + + /** {@inheritDoc} */ + @Override public int size() { + return key2Entry0.size(); + } + + /** {@inheritDoc} */ + @Nullable @Override public StreamerIndexEntry entry(K key) { + A.notNull(key, "key"); + + return trieGet(key, key2Entry0); + } + + /** {@inheritDoc} */ + @Override public Collection> entries(int cnt) { + A.ensure(cnt >= 0, "cnt >= 0"); + + Collection vals = Collections.unmodifiableCollection(key2Entry0.values()); + + return (Collection>)(cnt == 0 ? vals : F.limit(vals, cnt)); + } + + /** {@inheritDoc} */ + @Override public Set keySet(int cnt) { + A.ensure(cnt >= 0, "cnt >= 0"); + + return cnt == 0 ? Collections.unmodifiableSet(key2Entry0.keySet()) : + F.limit(key2Entry0.keySet(), cnt); + } + + /** {@inheritDoc} */ + @Override public Collection values(int cnt) { + Collection> col = entries(cnt); + + return F.viewReadOnly(col, entryToVal); + } + + /** {@inheritDoc} */ + @Override public Collection events(int cnt) { + A.ensure(cnt >= 0, "cnt >= 0"); + + if (getPolicy() == EVENT_TRACKING_OFF) + throw new IllegalStateException("Event tracking is off: " + this); + + Collection evts = new AbstractCollection() { + @NotNull @Override public Iterator iterator() { + return new Iterator() { + private final Iterator> entryIter = key2Entry0.values().iterator(); + + private Iterator evtIter; + + private boolean moved = true; + + private boolean more; + + @Override public boolean hasNext() { + if (!moved) + return more; + + moved = false; + + if (evtIter != null && evtIter.hasNext()) + return more = true; + + while (entryIter.hasNext()) { + evtIter = eventsIterator(entryIter.next()); + + if (evtIter.hasNext()) + return more = true; + } + + return more = false; + } + + @Override public E next() { + if (hasNext()) { + moved = true; + + return evtIter.next(); + } + + throw new NoSuchElementException(); + } + + @Override public void remove() { + assert false; + } + }; + } + + @Override public int size() { + return evtsCnt; + } + + /** + * @param entry Entry. + * @return Events iterator. + */ + @SuppressWarnings("fallthrough") + Iterator eventsIterator(StreamerIndexEntry entry) { + switch (getPolicy()) { + case EVENT_TRACKING_ON: + case EVENT_TRACKING_ON_DEDUP: + Collection evts = entry.events(); + + assert evts != null; + + return evts.iterator(); + + default: + assert false; + + throw new IllegalStateException("Event tracking is off: " + Index.this); + } + } + }; + + + return cnt == 0 ? evts : F.limit(evts, cnt); + } + + /** {@inheritDoc} */ + @Override public Set> entrySet(V val) { + return entrySet(true, val, true, val, true); + } + + /** {@inheritDoc} */ + @Override public Set> entrySet(final boolean asc, @Nullable final V fromVal, + final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) { + throw new UnsupportedOperationException("Operation is not supported on hash index."); + } + + /** {@inheritDoc} */ + @Override public Set keySet(V val) { + throw new UnsupportedOperationException("Operation is not supported on hash index."); + } + + /** {@inheritDoc} */ + @Override public Set keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl, + @Nullable final V toVal, final boolean toIncl) { + throw new UnsupportedOperationException("Operation is not supported on hash index."); + } + + /** {@inheritDoc} */ + @Override public Collection values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, + boolean toIncl) { + throw new UnsupportedOperationException("Operation is not supported on hash index."); + } + + /** {@inheritDoc} */ + @Override public Collection events(V val) { + throw new UnsupportedOperationException("Operation is not supported on hash index."); + } + + /** {@inheritDoc} */ + @Override public Collection events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl, + @Nullable final V toVal, final boolean toIncl) { + throw new UnsupportedOperationException("Operation is not supported on hash index."); + } + + /** {@inheritDoc} */ + @Nullable @Override public StreamerIndexEntry firstEntry() { + throw new UnsupportedOperationException("Operation is not supported on hash index."); + } + + /** {@inheritDoc} */ + @Nullable @Override public StreamerIndexEntry lastEntry() { + throw new UnsupportedOperationException("Operation is not supported on hash index."); + } + + /** {@inheritDoc} */ + @Override public Iterator> iterator() { + return entries(0).iterator(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Index.class, this, "provider", StreamerHashIndexProvider.this); + } + } + + /** + * + */ + private static class State { + /** */ + private Entry oldEntry; + + /** */ + private Entry newEntry; + + /** */ + private boolean finished; + + /** + * @param oldEntry Old. + * @param newEntry New. + * @param finished Finished. + */ + private State(@Nullable Entry oldEntry, @Nullable Entry newEntry, boolean finished) { + this.oldEntry = oldEntry; + this.newEntry = newEntry; + this.finished = finished; + } + + /** + * @return Old. + */ + Entry oldEntry() { + return oldEntry; + } + + /** + * @param oldEntry Old. + */ + void oldEntry(Entry oldEntry) { + this.oldEntry = oldEntry; + } + + /** + * @return New. + */ + Entry newEntry() { + return newEntry; + } + + /** + * @param newEntry New. + */ + void newEntry(Entry newEntry) { + this.newEntry = newEntry; + } + + /** + * @return Finished. + */ + boolean finished() { + return finished; + } + + /** + * @param finished Finished. + */ + void finished(boolean finished) { + this.finished = finished; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(State.class, this); + } + } +}