Repository: accumulo
Updated Branches:
refs/heads/1.6 dd9cc1cfb -> 0bcbab7d6
ACCUMULO-3745 simplify locking and added comments
changes from [~elserj] CR on issue
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5ac1b52e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5ac1b52e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5ac1b52e
Branch: refs/heads/1.6
Commit: 5ac1b52efcd94b939773ef88f9f4f0bfa4fccaa2
Parents: 95e234c
Author: Keith Turner <keith@deenlo.com>
Authored: Thu Apr 23 12:00:36 2015 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri Apr 24 19:09:37 2015 -0400
----------------------------------------------------------------------
.../system/SourceSwitchingIterator.java | 58 +++++++++++---------
1 file changed, 33 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac1b52e/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index 7684352..ec73c27 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.core.iterators.system;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,17 +62,28 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
private boolean onlySwitchAfterRow;
+ // Synchronization on copies synchronizes operations across all deep copies of this instance.
+ //
+ // This implementation assumes that there is one thread reading data (a scan) from all
deep copies
+ // and that another thread may call switch at any point. A single scan may have multiple
deep
+ // copies of this iterator if other iterators above this one duplicate their source. For
example,
+ // if an IntersectingIterator over two columns was configured, `copies` would contain two
SSIs
+ // instead of just one SSI. The two instances in `copies` would both be at the same "level"
+ // in the tree of iterators for the scan. If multiple instances of SSI are configure in
the iterator
+ // tree (e.g. priority 8 and priority 12), each instance would share their own `copies`
e.g.
+ // SSI@priority8:copies1[...], SSI@priority12:copies2[...]
+
private final List<SourceSwitchingIterator> copies;
private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator>
copies) {
this.source = source;
this.onlySwitchAfterRow = onlySwitchAfterRow;
this.copies = copies;
+ copies.add(this);
}
public SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow) {
- this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>()));
- copies.add(this);
+ this(source, onlySwitchAfterRow, new ArrayList<SourceSwitchingIterator>());
}
public SourceSwitchingIterator(DataSource source) {
@@ -83,11 +93,7 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
synchronized (copies) {
- synchronized(this){
- SourceSwitchingIterator ssi = new SourceSwitchingIterator(source.getDeepCopyDataSource(env),
onlySwitchAfterRow, copies);
- copies.add(ssi);
- return ssi;
- }
+ return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow,
copies);
}
}
@@ -113,10 +119,12 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
@Override
public void next() throws IOException {
- readNext(false);
+ synchronized (copies) {
+ readNext(false);
+ }
}
- private synchronized void readNext(boolean initialSeek) throws IOException {
+ private void readNext(boolean initialSeek) throws IOException {
// check of initialSeek second is intentional so that it does not short
// circuit the call to switchSource
@@ -162,18 +170,20 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
}
@Override
- public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive) throws IOException {
- this.range = range;
- this.inclusive = inclusive;
- this.columnFamilies = columnFamilies;
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
+ synchronized (copies) {
+ this.range = range;
+ this.inclusive = inclusive;
+ this.columnFamilies = columnFamilies;
- if (iter == null)
- iter = source.iterator();
+ if (iter == null)
+ iter = source.iterator();
- readNext(true);
+ readNext(true);
+ }
}
- private synchronized void _switchNow() throws IOException {
+ private void _switchNow() throws IOException {
if (onlySwitchAfterRow)
throw new IllegalStateException("Can only switch on row boundries");
@@ -194,15 +204,13 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
@Override
public void setInterruptFlag(AtomicBoolean flag) {
synchronized (copies) {
- synchronized (this) {
- if (copies.size() != 1)
- throw new IllegalStateException("setInterruptFlag() called after deep copies made
" + copies.size());
+ if (copies.size() != 1)
+ throw new IllegalStateException("setInterruptFlag() called after deep copies made
" + copies.size());
- if (iter != null)
- ((InterruptibleIterator) iter).setInterruptFlag(flag);
+ if (iter != null)
+ ((InterruptibleIterator) iter).setInterruptFlag(flag);
- source.setInterruptFlag(flag);
- }
+ source.setInterruptFlag(flag);
}
}
}
|