accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1342122 [2/2] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/src/main/java/org/apache/accumulo/core/iterators/ core/src/main/java/org/apache/acc...
Date Thu, 24 May 2012 03:05:15 GMT
Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/admin/TableOperationsHelperTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/admin/TableOperationsHelperTest.java?rev=1342122&r1=1342121&r2=1342122&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/admin/TableOperationsHelperTest.java
(original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/admin/TableOperationsHelperTest.java
Thu May 24 03:05:14 2012
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -97,6 +98,10 @@ public class TableOperationsHelperTest {
         AccumuloException {}
     
     @Override
+    public void compact(String tableName, Text start, Text end, List<IteratorSetting>
iterators, boolean flush, boolean wait) throws AccumuloSecurityException,
+        TableNotFoundException, AccumuloException {}
+    
+    @Override
     public void delete(String tableName) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {}
     
     @Override
@@ -177,7 +182,6 @@ public class TableOperationsHelperTest {
       }
       Assert.assertEquals(expected, settings.get(tablename));
     }
-    
   }
   
   @Test

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1342122&r1=1342121&r2=1342122&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Thu
May 24 03:05:14 2012
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.B
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
@@ -66,6 +67,7 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.master.thrift.LoggerStatus;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -1043,11 +1045,12 @@ public class Master implements LiveTServ
           String tableId = ByteBufferUtil.toString(arguments.get(0));
           byte[] startRow = ByteBufferUtil.toBytes(arguments.get(1));
           byte[] endRow = ByteBufferUtil.toBytes(arguments.get(2));
+          List<IteratorSetting> iterators = IteratorUtil.decodeIteratorSettings(ByteBufferUtil.toBytes(arguments.get(3)));
           
           verify(c, tableId, TableOperation.COMPACT,
               check(c, tableId, TablePermission.WRITE) || check(c, tableId, TablePermission.ALTER_TABLE)
|| check(c, SystemPermission.ALTER_TABLE));
           
-          fate.seedTransaction(opid, new TraceRepo<Master>(new CompactRange(tableId,
startRow, endRow)), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new CompactRange(tableId,
startRow, endRow, iterators)), autoCleanup);
           break;
         }
         default:

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1342122&r1=1342121&r2=1342122&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
Thu May 24 03:05:14 2012
@@ -18,12 +18,14 @@ package org.apache.accumulo.server.maste
 
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.Tables;
@@ -34,7 +36,10 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
+import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.fate.Repo;
@@ -46,6 +51,7 @@ import org.apache.accumulo.server.util.M
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -161,6 +167,7 @@ class CompactionDriver extends MasterRep
   
   @Override
   public Repo<Master> call(long tid, Master environment) throws Exception {
+    CompactRange.removeIterators(tid, tableId);
     Utils.getReadLock(tableId, tid).unlock();
     return null;
   }
@@ -178,11 +185,14 @@ public class CompactRange extends Master
   private String tableId;
   private byte[] startRow;
   private byte[] endRow;
+  private IteratorConfig iterators;
   
-  public CompactRange(String tableId, byte[] startRow, byte[] endRow) throws ThriftTableOperationException
{
+  public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting>
iterators) throws ThriftTableOperationException {
     this.tableId = tableId;
     this.startRow = startRow.length == 0 ? null : startRow;
     this.endRow = endRow.length == 0 ? null : endRow;
+    // store as IteratorConfig because its serializable
+    this.iterators = IteratorUtil.toIteratorConfig(iterators);
     
     if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new
Text(endRow)) >= 0)
       throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
@@ -195,7 +205,7 @@ public class CompactRange extends Master
   }
   
   @Override
-  public Repo<Master> call(long tid, Master environment) throws Exception {
+  public Repo<Master> call(final long tid, Master environment) throws Exception {
     String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
     
     IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
@@ -204,22 +214,78 @@ public class CompactRange extends Master
       cid = zoo.mutate(zTablePath, null, null, new Mutator() {
         @Override
         public byte[] mutate(byte[] currentValue) throws Exception {
-          long flushID = Long.parseLong(new String(currentValue));
+          String cvs = new String(currentValue);
+          String[] tokens = cvs.split(",");
+          long flushID = Long.parseLong(new String(tokens[0]));
           flushID++;
-          return ("" + flushID).getBytes();
+          
+          String txidString = String.format("%016x", tid);
+          StringBuilder encodedIterators = new StringBuilder();
+          for (int i = 1; i < tokens.length; i++) {
+            if (tokens[i].startsWith(txidString))
+              continue; // skip self
+            encodedIterators.append(",");
+            encodedIterators.append(tokens[i]);
+          }
+
+          if (iterators != null && iterators.getIterators().size() > 0) {
+            Hex hex = new Hex();
+            encodedIterators.append(",");
+            encodedIterators.append(txidString);
+            encodedIterators.append("=");
+            for (TIteratorSetting tis : iterators.getIterators()) {
+              if (tis.iteratorClass != null)
+                tis.name = txidString + tis.name; // give a unique name to avoid collisions
with other running compactions
+            }
+            encodedIterators.append(new String(hex.encode(IteratorUtil.encodeIteratorSettings(iterators))));
+          }
+          
+          return ("" + flushID + encodedIterators).getBytes();
         }
       });
       
-      return new CompactionDriver(Long.parseLong(new String(cid)), tableId, startRow, endRow);
+      return new CompactionDriver(Long.parseLong(new String(cid).split(",")[0]), tableId,
startRow, endRow);
     } catch (NoNodeException nne) {
       throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND,
null);
     }
     
   }
   
+  static void removeIterators(final long txid, String tableId) throws Exception {
+    String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
+    
+    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    
+    zoo.mutate(zTablePath, null, null, new Mutator() {
+      @Override
+      public byte[] mutate(byte[] currentValue) throws Exception {
+        String cvs = new String(currentValue);
+        String[] tokens = cvs.split(",");
+        long flushID = Long.parseLong(new String(tokens[0]));
+
+        String txidString = String.format("%016x", txid);
+        
+        StringBuilder encodedIterators = new StringBuilder();
+        for (int i = 1; i < tokens.length; i++) {
+          if (tokens[i].startsWith(txidString))
+            continue;
+          encodedIterators.append(",");
+          encodedIterators.append(tokens[i]);
+        }
+        
+        return ("" + flushID + encodedIterators).getBytes();
+      }
+    });
+
+  }
+
   @Override
   public void undo(long tid, Master environment) throws Exception {
-    Utils.unreserveTable(tableId, tid, false);
+    try {
+      removeIterators(tid, tableId);
+    } finally {
+      Utils.unreserveTable(tableId, tid, false);
+    }
   }
   
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1342122&r1=1342121&r2=1342122&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
Thu May 24 03:05:14 2012
@@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.accumulo.cloudtrace.instrument.Span;
 import org.apache.accumulo.cloudtrace.instrument.Trace;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -38,7 +39,6 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.CountingIterator;
 import org.apache.accumulo.core.iterators.system.DeletingIterator;
 import org.apache.accumulo.core.iterators.system.MultiIterator;
@@ -80,9 +80,10 @@ public class Compactor implements Callab
   private Configuration conf;
   private FileSystem fs;
   private KeyExtent extent;
+  private List<IteratorSetting> iterators;
   
   Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap
imm, String outputFile, boolean propogateDeletes,
-      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
+      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting>
iterators) {
     this.extent = extent;
     this.conf = conf;
     this.fs = fs;
@@ -92,6 +93,12 @@ public class Compactor implements Callab
     this.propogateDeletes = propogateDeletes;
     this.acuTableConf = acuTableConf;
     this.env = env;
+    this.iterators = iterators;
+  }
+  
+  Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap
imm, String outputFile, boolean propogateDeletes,
+      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
+    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new
ArrayList<IteratorSetting>());
   }
   
   public FileSystem getFileSystem() {
@@ -258,7 +265,7 @@ public class Compactor implements Callab
         throw new IllegalArgumentException();
       
       SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(),
cfsi, extent, acuTableConf,
-          iterEnv));
+          iterators, iterEnv));
       
       itr.seek(extent.toDataRange(), columnFamilies, inclusive);
       

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1342122&r1=1342121&r2=1342122&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
Thu May 24 03:05:14 2012
@@ -49,6 +49,7 @@ import org.apache.accumulo.cloudtrace.in
 import org.apache.accumulo.cloudtrace.instrument.Trace;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationObserver;
@@ -121,6 +122,8 @@ import org.apache.accumulo.server.util.M
 import org.apache.accumulo.server.util.TabletOperations;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -2391,11 +2394,22 @@ public class Tablet {
     }
   }
   
-  long getCompactionID() throws NoNodeException {
+  Pair<Long,List<IteratorSetting>> getCompactionID() throws NoNodeException {
     try {
       String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZTABLES + "/" + extent.getTableId()
           + Constants.ZTABLE_COMPACT_ID;
-      return Long.parseLong(new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath,
null)));
+      
+      String[] tokens = new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath,
null)).split(",");
+      long compactID = Long.parseLong(tokens[0]);
+      
+      List<IteratorSetting> allIters = new ArrayList<IteratorSetting>();
+      for (int i = 1; i < tokens.length; i++) {
+        Hex hex = new Hex();
+        List<IteratorSetting> iters = IteratorUtil.decodeIteratorSettings(hex.decode(tokens[i].split("=")[1].getBytes()));
+        allIters.addAll(iters);
+      }
+      
+      return new Pair<Long,List<IteratorSetting>>(compactID, allIters);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     } catch (NumberFormatException nfe) {
@@ -2406,6 +2420,8 @@ public class Tablet {
       } else {
         throw new RuntimeException(ke);
       }
+    } catch (DecoderException e) {
+      throw new RuntimeException(e);
     }
   }
   
@@ -3167,7 +3183,7 @@ public class Tablet {
       
       log.debug(String.format("MajC initiate lock %.2f secs, wait %.2f secs", (t3 - t2) /
1000.0, (t2 - t1) / 1000.0));
       
-      Long compactionId = null;
+      Pair<Long,List<IteratorSetting>> compactionId = null;
       if (!propogateDeletes) {
         // compacting everything, so update the compaction id in !METADATA
         try {
@@ -3177,6 +3193,11 @@ public class Tablet {
         }
       }
       
+      List<IteratorSetting> compactionIterators = new ArrayList<IteratorSetting>();
+      if (compactionId != null) {
+        compactionIterators = compactionId.getSecond();
+      }
+
       // need to handle case where only one file is being major compacted
       while (filesToCompact.size() > 0) {
         
@@ -3222,7 +3243,7 @@ public class Tablet {
                                                                                         
                                                 // unless
                                                                                         
                                                 // last
                                                                                         
                                                 // batch
-              acuTableConf, extent, cenv);
+              acuTableConf, extent, cenv, compactionIterators);
           
           CompactionStats mcs = compactor.call();
           
@@ -3231,7 +3252,7 @@ public class Tablet {
           span.data("written", "" + mcs.getEntriesWritten());
           majCStats.add(mcs);
           
-          datafileManager.bringMajorCompactionOnline(smallestFiles, compactTmpName, fileName,
filesToCompact.size() == 0 ? compactionId : null,
+          datafileManager.bringMajorCompactionOnline(smallestFiles, compactTmpName, fileName,
filesToCompact.size() == 0 ? compactionId.getFirst() : null,
               new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()));
           
           // when major compaction produces a file w/ zero entries, it will be deleted...
do not want

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1342122&r1=1342121&r2=1342122&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Thu May 24 03:05:14 2012
@@ -2031,7 +2031,7 @@ public class TabletServer extends Abstra
         // compaction id once
         if (compactionId == null)
           try {
-            compactionId = tablet.getCompactionID();
+            compactionId = tablet.getCompactionID().getFirst();
           } catch (NoNodeException e) {
             log.info("Asked to compact table with no compaction id " + ke + " " + e.getMessage());
             return;



Mime
View raw message