accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/2] accumulo git commit: ACCUMULO-3745 simplify locking and added comments
Date Fri, 24 Apr 2015 23:49:59 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.6 dd9cc1cfb -> 0bcbab7d6


ACCUMULO-3745 simplify locking and added comments

changes from [~elserj] CR on issue


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5ac1b52e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5ac1b52e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5ac1b52e

Branch: refs/heads/1.6
Commit: 5ac1b52efcd94b939773ef88f9f4f0bfa4fccaa2
Parents: 95e234c
Author: Keith Turner <keith@deenlo.com>
Authored: Thu Apr 23 12:00:36 2015 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri Apr 24 19:09:37 2015 -0400

----------------------------------------------------------------------
 .../system/SourceSwitchingIterator.java         | 58 +++++++++++---------
 1 file changed, 33 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac1b52e/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index 7684352..ec73c27 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.core.iterators.system;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,17 +62,28 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
 
   private boolean onlySwitchAfterRow;
 
+  // Synchronization on copies synchronizes operations across all deep copies of this instance.
+  //
+  // This implementation assumes that there is one thread reading data (a scan) from all
deep copies
+  // and that another thread may call switch at any point. A single scan may have multiple
deep
+  // copies of this iterator if other iterators above this one duplicate their source. For
example,
+  // if an IntersectingIterator over two columns was configured, `copies` would contain two
SSIs
+  // instead of just one SSI. The two instances in `copies` would both be at the same "level"
+  // in the tree of iterators for the scan. If multiple instances of SSI are configure in
the iterator
+  // tree (e.g. priority 8 and priority 12), each instance would share their own `copies`
e.g.
+  // SSI@priority8:copies1[...], SSI@priority12:copies2[...]
+
   private final List<SourceSwitchingIterator> copies;
 
   private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator>
copies) {
     this.source = source;
     this.onlySwitchAfterRow = onlySwitchAfterRow;
     this.copies = copies;
+    copies.add(this);
   }
 
   public SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow) {
-    this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>()));
-    copies.add(this);
+    this(source, onlySwitchAfterRow, new ArrayList<SourceSwitchingIterator>());
   }
 
   public SourceSwitchingIterator(DataSource source) {
@@ -83,11 +93,7 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     synchronized (copies) {
-      synchronized(this){
-        SourceSwitchingIterator ssi = new SourceSwitchingIterator(source.getDeepCopyDataSource(env),
onlySwitchAfterRow, copies);
-        copies.add(ssi);
-        return ssi;
-      }
+      return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow,
copies);
     }
   }
 
@@ -113,10 +119,12 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
 
   @Override
   public void next() throws IOException {
-    readNext(false);
+    synchronized (copies) {
+      readNext(false);
+    }
   }
 
-  private synchronized void readNext(boolean initialSeek) throws IOException {
+  private void readNext(boolean initialSeek) throws IOException {
 
     // check of initialSeek second is intentional so that it does not short
     // circuit the call to switchSource
@@ -162,18 +170,20 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
   }
 
   @Override
-  public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive) throws IOException {
-    this.range = range;
-    this.inclusive = inclusive;
-    this.columnFamilies = columnFamilies;
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
+    synchronized (copies) {
+      this.range = range;
+      this.inclusive = inclusive;
+      this.columnFamilies = columnFamilies;
 
-    if (iter == null)
-      iter = source.iterator();
+      if (iter == null)
+        iter = source.iterator();
 
-    readNext(true);
+      readNext(true);
+    }
   }
 
-  private synchronized void _switchNow() throws IOException {
+  private void _switchNow() throws IOException {
     if (onlySwitchAfterRow)
       throw new IllegalStateException("Can only switch on row boundries");
 
@@ -194,15 +204,13 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
   @Override
   public void setInterruptFlag(AtomicBoolean flag) {
     synchronized (copies) {
-      synchronized (this) {
-        if (copies.size() != 1)
-          throw new IllegalStateException("setInterruptFlag() called after deep copies made
" + copies.size());
+      if (copies.size() != 1)
+        throw new IllegalStateException("setInterruptFlag() called after deep copies made
" + copies.size());
 
-        if (iter != null)
-          ((InterruptibleIterator) iter).setInterruptFlag(flag);
+      if (iter != null)
+        ((InterruptibleIterator) iter).setInterruptFlag(flag);
 
-        source.setInterruptFlag(flag);
-      }
+      source.setInterruptFlag(flag);
     }
   }
 }


Mime
View raw message