accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1438837 - in /accumulo/trunk/server/src/main/java/org/apache/accumulo/server: master/tableOps/CompactRange.java tabletserver/Tablet.java
Date Sat, 26 Jan 2013 03:54:44 GMT
Author: kturner
Date: Sat Jan 26 03:54:43 2013
New Revision: 1438837

URL: http://svn.apache.org/viewvc?rev=1438837&view=rev
Log:
ACCUMULO-989 ensured that per compaction iterators are not used for tablets outside of compaction
range

Modified:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1438837&r1=1438836&r2=1438837&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
Sat Jan 26 03:54:43 2013
@@ -16,6 +16,10 @@
  */
 package org.apache.accumulo.server.master.tableOps;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -36,10 +40,7 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
-import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
@@ -51,6 +52,8 @@ import org.apache.accumulo.server.util.M
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -187,21 +190,104 @@ class CompactionDriver extends MasterRep
   
 }
 
+
 public class CompactRange extends MasterRepo {
   
   private static final long serialVersionUID = 1L;
   private String tableId;
   private byte[] startRow;
   private byte[] endRow;
-  private IteratorConfig iterators;
+  private byte[] iterators;
   
+  public static class CompactionIterators implements Writable {
+    byte[] startRow;
+    byte[] endRow;
+    List<IteratorSetting> iterators;
+    
+    public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting>
iterators) {
+      this.startRow = startRow;
+      this.endRow = endRow;
+      this.iterators = iterators;
+    }
+    
+    public CompactionIterators() {
+      startRow = null;
+      endRow = null;
+      iterators = Collections.emptyList();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeBoolean(startRow != null);
+      if (startRow != null) {
+        out.writeInt(startRow.length);
+        out.write(startRow);
+      }
+      
+      out.writeBoolean(endRow != null);
+      if (endRow != null) {
+        out.writeInt(endRow.length);
+        out.write(endRow);
+      }
+      
+      out.writeInt(iterators.size());
+      for (IteratorSetting is : iterators) {
+        is.write(out);
+      }
+    }
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      if (in.readBoolean()) {
+        startRow = new byte[in.readInt()];
+        in.readFully(startRow);
+      } else {
+        startRow = null;
+      }
+      
+      if (in.readBoolean()) {
+        endRow = new byte[in.readInt()];
+        in.readFully(endRow);
+      } else {
+        endRow = null;
+      }
+      
+      int num = in.readInt();
+      iterators = new ArrayList<IteratorSetting>(num);
+      
+      for (int i = 0; i < num; i++) {
+        iterators.add(new IteratorSetting(in));
+      }
+    }
+    
+    public Text getEndRow() {
+      if (endRow == null)
+        return null;
+      return new Text(endRow);
+    }
+    
+    public Text getStartRow() {
+      if (startRow == null)
+        return null;
+      return new Text(startRow);
+    }
+    
+    public List<IteratorSetting> getIterators() {
+      return iterators;
+    }
+  }
+
   public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting>
iterators) throws ThriftTableOperationException {
     this.tableId = tableId;
     this.startRow = startRow.length == 0 ? null : startRow;
     this.endRow = endRow.length == 0 ? null : endRow;
-    // store as IteratorConfig because its serializable
-    this.iterators = IteratorUtil.toIteratorConfig(iterators);
     
+    if (iterators.size() > 0) {
+      this.iterators = WritableUtils.toByteArray(new CompactionIterators(this.startRow, this.endRow,
iterators));
+    } else {
+      iterators = null;
+    }
+
     if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new
Text(endRow)) >= 0)
       throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
           "start row must be less than end row");
@@ -239,16 +325,12 @@ public class CompactRange extends Master
 
           StringBuilder encodedIterators = new StringBuilder();
 
-          if (iterators != null && iterators.getIterators().size() > 0) {
+          if (iterators != null) {
             Hex hex = new Hex();
             encodedIterators.append(",");
             encodedIterators.append(txidString);
             encodedIterators.append("=");
-            for (TIteratorSetting tis : iterators.getIterators()) {
-              if (tis.iteratorClass != null)
-                tis.name = txidString + tis.name; // give a unique name to avoid collisions
with other running compactions
-            }
-            encodedIterators.append(new String(hex.encode(IteratorUtil.encodeIteratorSettings(iterators))));
+            encodedIterators.append(new String(hex.encode(iterators)));
           }
           
           return ("" + flushID + encodedIterators).getBytes();

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1438837&r1=1438836&r2=1438837&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
Sat Jan 26 03:54:43 2013
@@ -23,6 +23,8 @@ package org.apache.accumulo.server.table
  * 
  */
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -99,6 +101,7 @@ import org.apache.accumulo.server.conf.S
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.constraints.ConstraintChecker;
 import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.tableOps.CompactRange.CompactionIterators;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
@@ -2415,14 +2418,29 @@ public class Tablet {
       String[] tokens = new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath,
null)).split(",");
       long compactID = Long.parseLong(tokens[0]);
       
-      List<IteratorSetting> allIters = new ArrayList<IteratorSetting>();
-      for (int i = 1; i < tokens.length; i++) {
+      CompactionIterators iters = new CompactionIterators();
+
+      if (tokens.length > 1) {
         Hex hex = new Hex();
-        List<IteratorSetting> iters = IteratorUtil.decodeIteratorSettings(hex.decode(tokens[i].split("=")[1].getBytes()));
-        allIters.addAll(iters);
+        ByteArrayInputStream bais = new ByteArrayInputStream(hex.decode(tokens[1].split("=")[1].getBytes()));
+        DataInputStream dis = new DataInputStream(bais);
+        
+        try {
+          iters.readFields(dis);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        
+        KeyExtent ke = new KeyExtent(extent.getTableId(), iters.getEndRow(), iters.getStartRow());
+        
+        if (!ke.overlaps(extent)) {
+          // only use iterators if compaction range overlaps
+          iters = new CompactionIterators();
+        }
       }
+
       
-      return new Pair<Long,List<IteratorSetting>>(compactID, allIters);
+      return new Pair<Long,List<IteratorSetting>>(compactID, iters.getIterators());
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     } catch (NumberFormatException nfe) {
@@ -3264,7 +3282,7 @@ public class Tablet {
           copy.keySet().retainAll(smallestFiles);
           
           log.debug("Starting MajC " + extent + " (" + reason + ") " + datafileManager.abs2rel(datafileManager.string2path(copy.keySet()))
+ " --> "
-              + datafileManager.abs2rel(new Path(compactTmpName)));
+              + datafileManager.abs2rel(new Path(compactTmpName)) + "  " + compactionIterators);
 
           // always propagate deletes, unless last batch
           Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size()
== 0 ? propogateDeletes : true, acuTableConf, extent,



Mime
View raw message