accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [03/10] git commit: ACCUMULO-1628 Fixes issue after previous changes which interrupted a deep-copy
Date Fri, 12 Sep 2014 07:46:32 GMT
ACCUMULO-1628 Fixes issue after previous changes which interrupted a deep-copy

Pushes the interrupt flag from the SourceSwitchingIterator down to the
FileManager and InMemoryMap. This should avoid passing the interrupt
into a deep copy which isn't supported. Adds some more tests which
previously caused the edge case which is now fixed.

Signed-off-by: Josh Elser <elserj@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7699e1f4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7699e1f4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7699e1f4

Branch: refs/heads/master
Commit: 7699e1f43c4ee51bfa4be1e9e73ea722f934a3d6
Parents: c335fca
Author: Keith Turner <kturner@apache.org>
Authored: Thu Sep 11 16:31:08 2014 -0700
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Sep 11 16:31:08 2014 -0700

----------------------------------------------------------------------
 .../system/SourceSwitchingIterator.java         | 20 ++++-------
 .../system/SourceSwitchingIteratorTest.java     | 38 +++++++++++++++++++-
 .../server/tabletserver/FileManager.java        | 13 +++++++
 .../server/tabletserver/InMemoryMap.java        | 21 ++++++++---
 .../accumulo/server/tabletserver/Tablet.java    |  5 +++
 .../server/tabletserver/InMemoryMapTest.java    | 38 ++++++++++++++++----
 6 files changed, 110 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7699e1f4/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index 33d0ebf..6c40176 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@ -47,6 +47,8 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
     DataSource getDeepCopyDataSource(IteratorEnvironment env);
 
     SortedKeyValueIterator<Key,Value> iterator() throws IOException;
+
+    void setInterruptFlag(AtomicBoolean flag);
   }
 
   private DataSource source;
@@ -60,20 +62,18 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
   private Collection<ByteSequence> columnFamilies;
 
   private boolean onlySwitchAfterRow;
-  private AtomicBoolean iflag;
 
   private final List<SourceSwitchingIterator> copies;
 
-  private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator>
copies, AtomicBoolean iflag) {
+  private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator>
copies) {
     this.source = source;
     this.onlySwitchAfterRow = onlySwitchAfterRow;
     this.copies = copies;
-    this.iflag = iflag;
     copies.add(this);
   }
 
   public SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow) {
-    this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>()),
null);
+    this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>()));
   }
 
   public SourceSwitchingIterator(DataSource source) {
@@ -82,7 +82,7 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
 
   @Override
   public synchronized SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment
env) {
-    return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow,
copies, iflag);
+    return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow,
copies);
   }
 
   @Override
@@ -149,9 +149,6 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
     while (!source.isCurrent()) {
       source = source.getNewDataSource();
       iter = source.iterator();
-      if (iflag != null)
-        ((InterruptibleIterator) iter).setInterruptFlag(iflag);
-
       return true;
     }
 
@@ -164,11 +161,8 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
     this.inclusive = inclusive;
     this.columnFamilies = columnFamilies;
 
-    if (iter == null) {
+    if (iter == null)
       iter = source.iterator();
-      if (iflag != null)
-        ((InterruptibleIterator) iter).setInterruptFlag(iflag);
-    }
 
     readNext(true);
   }
@@ -196,10 +190,10 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
     if (copies.size() != 1)
       throw new IllegalStateException("setInterruptFlag() called after deep copies made "
+ copies.size());
 
-    this.iflag = flag;
     if (iter != null)
       ((InterruptibleIterator) iter).setInterruptFlag(flag);
 
+    source.setInterruptFlag(flag);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7699e1f4/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
b/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
index a52b141..23f08a8 100644
--- a/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.iterators.system;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import junit.framework.TestCase;
 
@@ -26,6 +27,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.SortedMapIterator;
@@ -59,6 +61,7 @@ public class SourceSwitchingIteratorTest extends TestCase {
     DataSource next;
     SortedKeyValueIterator<Key,Value> iter;
     List<TestDataSource> copies = new ArrayList<TestDataSource>();
+    AtomicBoolean iflag;
     
     TestDataSource(SortedKeyValueIterator<Key,Value> iter) {
       this(iter, new ArrayList<TestDataSource>());
@@ -82,6 +85,8 @@ public class SourceSwitchingIteratorTest extends TestCase {
     
     @Override
     public SortedKeyValueIterator<Key,Value> iterator() {
+      if (iflag != null)
+        ((InterruptibleIterator) iter).setInterruptFlag(iflag);
       return iter;
     }
     
@@ -98,7 +103,11 @@ public class SourceSwitchingIteratorTest extends TestCase {
           tds.next = new TestDataSource(next.iter.deepCopy(null), next.copies);
       }
     }
-    
+
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      this.iflag = flag;
+    }
   }
   
   public void test1() throws Exception {
@@ -236,4 +245,31 @@ public class SourceSwitchingIteratorTest extends TestCase {
     ane(dc1, "r2", "cf1", "cq2", 6, "v4", true);
     assertFalse(dc1.hasTop());
   }
+
+  public void testSetInterrupt() throws Exception {
+
+    TreeMap<Key,Value> tm1 = new TreeMap<Key,Value>();
+    put(tm1, "r1", "cf1", "cq1", 5, "v1");
+
+    SortedMapIterator smi = new SortedMapIterator(tm1);
+    TestDataSource tds = new TestDataSource(smi);
+    SourceSwitchingIterator ssi = new SourceSwitchingIterator(tds, false);
+
+    AtomicBoolean flag = new AtomicBoolean();
+    ssi.setInterruptFlag(flag);
+
+    assertSame(flag, tds.iflag);
+
+    ssi.seek(new Range("r1"), new ArrayList<ByteSequence>(), false);
+    ane(ssi, "r1", "cf1", "cq1", 5, "v1", true);
+    assertFalse(ssi.hasTop());
+
+    flag.set(true);
+
+    try {
+      ssi.seek(new Range("r1"), new ArrayList<ByteSequence>(), false);
+      fail("expected to see IterationInterruptedException");
+    } catch (IterationInterruptedException iie) {}
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7699e1f4/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
index 9613cca..cd5ca9c 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -379,6 +380,7 @@ public class FileManager {
     private boolean current = true;
     private IteratorEnvironment env;
     private String file;
+    private AtomicBoolean iflag;
     
     FileDataSource(String file, SortedKeyValueIterator<Key,Value> iter) {
       this.file = file;
@@ -411,6 +413,8 @@ public class FileManager {
     
     @Override
     public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+      if (iflag != null)
+        ((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
       return iter;
     }
     
@@ -426,11 +430,20 @@ public class FileManager {
     void setIterator(SortedKeyValueIterator<Key,Value> iter) {
       current = false;
       this.iter = iter;
+
+      if (iflag != null)
+        ((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
+
       for (FileDataSource fds : deepCopies) {
         fds.current = false;
         fds.iter = iter.deepCopy(fds.env);
       }
     }
+
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      this.iflag = flag;
+    }
     
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7699e1f4/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
index 43cf3c1..b696ff4 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
@@ -384,15 +384,17 @@ public class InMemoryMap {
     private FileSKVIterator reader;
     private MemoryDataSource parent;
     private IteratorEnvironment env;
+    private AtomicBoolean iflag;
     
     MemoryDataSource() {
-      this(null, false, null);
+      this(null, false, null, null);
     }
     
-    public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment
env) {
+    public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment
env, AtomicBoolean iflag) {
       this.parent = parent;
       this.switched = switched;
       this.env = env;
+      this.iflag = iflag;
     }
     
     @Override
@@ -428,6 +430,8 @@ public class InMemoryMap {
         FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
         
         reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
+        if (iflag != null)
+          reader.setInterruptFlag(iflag);
       }
 
       return reader;
@@ -436,9 +440,11 @@ public class InMemoryMap {
     @Override
     public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
       if (iter == null)
-        if (!switched)
+        if (!switched) {
           iter = map.skvIterator();
-        else {
+          if (iflag != null)
+            iter.setInterruptFlag(iflag);
+        } else {
           if (parent == null)
             iter = new MemKeyConversionIterator(getReader());
           else
@@ -454,7 +460,12 @@ public class InMemoryMap {
     
     @Override
     public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
-      return new MemoryDataSource(parent == null ? this : parent, switched, env);
+      return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag);
+    }
+
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      this.iflag = flag;
     }
     
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7699e1f4/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index a1fc707..bb13ff8 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -2139,6 +2139,11 @@ public class Tablet {
     public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      throw new UnsupportedOperationException();
+    }
     
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7699e1f4/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
index c905bb8..683adf4 100644
--- a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import junit.framework.TestCase;
 
@@ -32,6 +33,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
@@ -237,7 +239,7 @@ public class InMemoryMapTest extends TestCase {
     ski1.close();
   }
   
-  private void deepCopyAndDelete(int interleaving) throws Exception {
+  private void deepCopyAndDelete(int interleaving, boolean interrupt) throws Exception {
     // interleaving == 0 intentionally omitted, this runs the test w/o deleting in mem map
 
     InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
@@ -247,37 +249,61 @@ public class InMemoryMapTest extends TestCase {
     
     MemoryIterator ski1 = imm.skvIterator();
     
-    if (interleaving == 1)
+    AtomicBoolean iflag = new AtomicBoolean(false);
+    ski1.setInterruptFlag(iflag);
+
+    if (interleaving == 1) {
       imm.delete(0);
+      if (interrupt)
+        iflag.set(true);
+    }
     
     SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
 
-    if (interleaving == 2)
+    if (interleaving == 2) {
       imm.delete(0);
+      if (interrupt)
+        iflag.set(true);
+    }
 
     dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
     ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
 
-    if (interleaving == 3)
+    if (interleaving == 3) {
       imm.delete(0);
+      if (interrupt)
+        iflag.set(true);
+    }
 
     ae(dc, "r1", "foo:cq1", 3, "bar1");
     ae(ski1, "r1", "foo:cq1", 3, "bar1");
     dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
 
-    if (interleaving == 4)
+    if (interleaving == 4) {
       imm.delete(0);
+      if (interrupt)
+        iflag.set(true);
+    }
 
     ae(ski1, "r1", "foo:cq2", 3, "bar2");
     ae(dc, "r1", "foo:cq1", 3, "bar1");
     ae(dc, "r1", "foo:cq2", 3, "bar2");
     assertFalse(dc.hasTop());
     assertFalse(ski1.hasTop());
+
+    if (interrupt)
+      dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
   }
 
   public void testDeepCopyAndDelete() throws Exception {
     for (int i = 0; i <= 4; i++)
-      deepCopyAndDelete(i);
+      deepCopyAndDelete(i, false);
+
+    for (int i = 1; i <= 4; i++)
+      try {
+        deepCopyAndDelete(i, true);
+        fail("i = " + i);
+      } catch (IterationInterruptedException iie) {}
   }
 
   public void testBug1() throws Exception {


Mime
View raw message