incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1222766 [3/3] - in /incubator/accumulo/trunk: src/core/src/main/java/org/apache/accumulo/core/ src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ src/core/src/main/java/org/apache/accumulo/core/file/ src/core/src/main/java...
Date Fri, 23 Dec 2011 17:53:13 GMT
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
Fri Dec 23 17:53:12 2011
@@ -40,9 +40,9 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.map.MapFileOperations;
-import org.apache.accumulo.core.file.map.MyMapFile;
-import org.apache.accumulo.core.file.map.MySequenceFile;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SkippingIterator;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -71,7 +71,7 @@ class MemKeyComparator implements Compar
     if (cmp == 0) {
       if (k1 instanceof MemKey)
         if (k2 instanceof MemKey)
-          cmp = ((MemKey) k2).mutationCount - ((MemKey) k1).mutationCount;
+          cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
         else
           cmp = 1;
       else if (k2 instanceof MemKey)
@@ -84,22 +84,22 @@ class MemKeyComparator implements Compar
 
 class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator
{
   
-  int maxMutationCount;
+  int kvCount;
   
-  public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source,
int maxMutationCount) {
+  public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source,
int maxKVCount) {
     setSource(source);
-    this.maxMutationCount = maxMutationCount;
+    this.kvCount = maxKVCount;
   }
   
   @Override
   protected void consume() throws IOException {
-    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).mutationCount
> maxMutationCount)
+    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount >
kvCount)
       getSource().next();
   }
   
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-    return new PartialMutationSkippingIterator(getSource().deepCopy(env), maxMutationCount);
+    return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
   }
   
   @Override
@@ -109,6 +109,77 @@ class PartialMutationSkippingIterator ex
   
 }
 
+class MemKeyConversionIterator extends SkippingIterator implements InterruptibleIterator
{
+  MemKey currKey = null;
+  Value currVal = null;
+
+  public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
+    super();
+    setSource(source);
+  }
+
+  public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source, MemKey
startKey) {
+    this(source);
+    try {
+      if (currKey != null)
+        currKey = (MemKey) startKey.clone();
+    } catch (CloneNotSupportedException e) {
+      // MemKey is supported
+    }
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new MemKeyConversionIterator(getSource().deepCopy(env), currKey);
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return currKey;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    return currVal;
+  }
+  
+  private void getTopKeyVal() {
+    Key k = super.getTopKey();
+    Value v = super.getTopValue();
+    if (k instanceof MemKey || k == null) {
+      currKey = (MemKey) k;
+      currVal = v;
+      return;
+    }
+    currVal = new Value(v);
+    int mc = MemValue.splitKVCount(currVal);
+    currKey = new MemKey(k, mc);
+
+  }
+  
+  public void next() throws IOException {
+    super.next();
+    getTopKeyVal();
+  }
+
+  @Override
+  protected void consume() throws IOException {
+    MemKey stopPoint = currKey;
+    if (hasTop())
+      getTopKeyVal();
+    if (stopPoint == null)
+      return;
+    while (getSource().hasTop() && currKey.compareTo(stopPoint) <= 0)
+      next();
+  }
+
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+  }
+
+}
+
 public class InMemoryMap {
   MutationLog mutationLog;
   
@@ -152,7 +223,7 @@ public class InMemoryMap {
     
     public long getMemoryUsed();
     
-    public void mutate(List<Mutation> mutations, int mutationCount);
+    public void mutate(List<Mutation> mutations, int kvCount);
   }
   
   private static class DefaultMap implements SimpleMap {
@@ -203,15 +274,14 @@ public class InMemoryMap {
     }
     
     @Override
-    public void mutate(List<Mutation> mutations, int mutationCount) {
+    public void mutate(List<Mutation> mutations, int kvCount) {
       for (Mutation m : mutations) {
         for (ColumnUpdate cvp : m.getUpdates()) {
           Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(),
cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
-              false, mutationCount);
+              false, kvCount++);
           Value value = new Value(cvp.getValue());
           put(newKey, value);
         }
-        mutationCount++;
       }
     }
     
@@ -253,22 +323,25 @@ public class InMemoryMap {
     }
     
     @Override
-    public void mutate(List<Mutation> mutations, int mutationCount) {
-      nativeMap.mutate(mutations, mutationCount);
+    public void mutate(List<Mutation> mutations, int kvCount) {
+      nativeMap.mutate(mutations, kvCount);
     }
   }
   
-  private AtomicInteger nextMutationCount = new AtomicInteger(1);
-  private AtomicInteger mutationCount = new AtomicInteger(0);
+  private AtomicInteger nextKVCount = new AtomicInteger(1);
+  private AtomicInteger kvCount = new AtomicInteger(0);
   
   /**
    * Applies changes to a row in the InMemoryMap
    * 
    */
   public void mutate(List<Mutation> mutations) {
-    int mc = nextMutationCount.getAndAdd(mutations.size());
+    int numKVs = 0;
+    for (int i = 0; i < mutations.size(); i++)
+      numKVs += mutations.get(i).size();
+    int kv = nextKVCount.getAndAdd(numKVs);
     try {
-      map.mutate(mutations, mc);
+      map.mutate(mutations, kv);
     } finally {
       synchronized (this) {
         // Can not update mutationCount while writes that started before
@@ -277,14 +350,14 @@ public class InMemoryMap {
         // a read may not see a successful write. Therefore writes must
         // wait for writes that started before to finish.
         
-        while (mutationCount.get() != mc - 1) {
+        while (kvCount.get() != kv - 1) {
           try {
             wait();
           } catch (InterruptedException ex) {
             // ignored
           }
         }
-        mutationCount.set(mc + mutations.size() - 1);
+        kvCount.set(kv + numKVs - 1);
         notifyAll();
       }
     }
@@ -357,8 +430,8 @@ public class InMemoryMap {
           Configuration conf = CachedConfiguration.getInstance();
           FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
           
-          FileSKVIterator reader = new MapFileOperations.RangeIterator(new MyMapFile.Reader(fs,
memDumpFile, conf));
-          
+          FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs,
conf, ServerConfiguration.getSiteConfiguration());
+
           readers.add(reader);
           
           iter = reader;
@@ -447,10 +520,10 @@ public class InMemoryMap {
     if (deleted)
       throw new IllegalStateException("Can not obtain iterator after map deleted");
     
-    int mc = mutationCount.get();
+    int mc = kvCount.get();
     MemoryDataSource mds = new MemoryDataSource();
     SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
-    MemoryIterator mi = new MemoryIterator(new ColumnFamilySkippingIterator(new PartialMutationSkippingIterator(ssi,
mc)));
+    MemoryIterator mi = new MemoryIterator(new ColumnFamilySkippingIterator(new PartialMutationSkippingIterator(new
MemKeyConversionIterator(ssi), mc)));
     mi.setSSI(ssi);
     mi.setMDS(mds);
     activeIters.add(mi);
@@ -459,9 +532,9 @@ public class InMemoryMap {
   
   public SortedKeyValueIterator<Key,Value> compactionIterator() {
     
-    if (nextMutationCount.get() - 1 != mutationCount.get())
-      throw new IllegalStateException("Memory map in unexpected state : nextMutationCount
= " + nextMutationCount.get() + " mutationCount = "
-          + mutationCount.get());
+    if (nextKVCount.get() - 1 != kvCount.get())
+      throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " +
nextKVCount.get() + " kvCount = "
+          + kvCount.get());
     
     return new ColumnFamilySkippingIterator(map.skvIterator());
   }
@@ -489,17 +562,21 @@ public class InMemoryMap {
         Configuration conf = CachedConfiguration.getInstance();
         FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
         
-        String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + ".map";
+        String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
         
         Configuration newConf = new Configuration(conf);
         newConf.setInt("io.seqfile.compress.blocksize", 100000);
         
-        MyMapFile.Writer out = new MyMapFile.Writer(newConf, fs, tmpFile, MemKey.class, Value.class,
MySequenceFile.CompressionType.BLOCK);
+        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
+        out.startDefaultLocalityGroup();
         InterruptibleIterator iter = map.skvIterator();
         iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
         
         while (iter.hasTop() && activeIters.size() > 0) {
-          out.append(iter.getTopKey(), iter.getTopValue());
+          // RFile does not support MemKey, so we move the kv count into the value only for
the RFile.
+          // There is no need to change the MemKey to a normal key because the kvCount info
gets lost when it is written
+          Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+          out.append(iter.getTopKey(), newValue);
           iter.next();
         }
         

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
Fri Dec 23 17:53:12 2011
@@ -24,25 +24,25 @@ import org.apache.accumulo.core.data.Key
 
 class MemKey extends Key {
   
-  int mutationCount;
+  int kvCount;
   
   public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean
copy, int mc) {
     super(row, cf, cq, cv, ts, del, copy);
-    this.mutationCount = mc;
+    this.kvCount = mc;
   }
   
   public MemKey() {
     super();
-    this.mutationCount = Integer.MAX_VALUE;
+    this.kvCount = Integer.MAX_VALUE;
   }
   
   public MemKey(Key key, int mc) {
     super(key);
-    this.mutationCount = mc;
+    this.kvCount = mc;
   }
   
   public String toString() {
-    return super.toString() + " mc=" + mutationCount;
+    return super.toString() + " mc=" + kvCount;
   }
   
   @Override
@@ -53,13 +53,13 @@ class MemKey extends Key {
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    out.writeInt(mutationCount);
+    out.writeInt(kvCount);
   }
   
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    mutationCount = in.readInt();
+    kvCount = in.readInt();
   }
   
   @Override
@@ -68,7 +68,7 @@ class MemKey extends Key {
     int cmp = super.compareTo(k);
     
     if (cmp == 0 && k instanceof MemKey) {
-      cmp = ((MemKey) k).mutationCount - mutationCount;
+      cmp = ((MemKey) k).kvCount - kvCount;
     }
     
     return cmp;

Added: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java?rev=1222766&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
(added)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
Fri Dec 23 17:53:12 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Value;
+
+/**
+ * 
+ */
+public class MemValue extends Value {
+  int kvCount;
+  boolean merged = false;
+  
+  /**
+   * @param value
+   *          Value
+   * @param kv
+   *          kv count
+   */
+  public MemValue(byte[] value, int kv) {
+    super(value);
+    this.kvCount = kv;
+  }
+  
+  public MemValue() {
+    super();
+    this.kvCount = Integer.MAX_VALUE;
+  }
+  
+  public MemValue(Value value, int kv) {
+    super(value);
+    this.kvCount = kv;
+  }
+  
+  // Override
+  public void write(final DataOutput out) throws IOException {
+    if (!merged) {
+      byte[] combinedBytes = new byte[getSize() + 4];
+      System.arraycopy(value, 0, combinedBytes, 4, getSize());
+      combinedBytes[0] = (byte) (kvCount >>> 24);
+      combinedBytes[1] = (byte) (kvCount >>> 16);
+      combinedBytes[2] = (byte) (kvCount >>> 8);
+      combinedBytes[3] = (byte) (kvCount);
+      value = combinedBytes;
+      merged = true;
+    }
+    super.write(out);
+  }
+  
+  public void set(final byte[] b) {
+    super.set(b);
+    merged = false;
+  }
+
+  public void copy(byte[] b) {
+    super.copy(b);
+    merged = false;
+  }
+  
+  /**
+   * Takes a Value and will take out the embedded kvCount, and then return that value while
replacing the Value with the original unembedded version
+   * 
+   * @param v
+   * @return
+   */
+  public static int splitKVCount(Value v) {
+    if (v instanceof MemValue)
+      return ((MemValue) v).kvCount;
+    
+    byte[] originalBytes = new byte[v.getSize() - 4];
+    byte[] combined = v.get();
+    System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length);
+    v.set(originalBytes);
+    return (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2]
& 0xFF) << 8) + (combined[3] & 0xFF);
+  }
+}

Propchange: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
Fri Dec 23 17:53:12 2011
@@ -66,7 +66,6 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.file.map.MyMapFile;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
@@ -1548,7 +1547,7 @@ public class Tablet {
         continue;
       }
       
-      if (!filename.startsWith(MyMapFile.EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1]))
{
+      if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1]))
{
         log.error("unknown file in tablet" + path);
         continue;
       }

Copied: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java
(from r1215244, incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java)
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java?p2=incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java&p1=incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java&r1=1215244&r2=1222766&rev=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java
Fri Dec 23 17:53:12 2011
@@ -20,17 +20,17 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.map.MyMapFile;
-import org.apache.accumulo.core.file.map.MyMapFile.Writer;
-import org.apache.accumulo.core.file.map.MySequenceFile.CompressionType;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 
-public class CreateRandomMapFile {
+public class CreateRandomRFile {
   private static int num;
   private static String file;
   
@@ -62,10 +62,10 @@ public class CreateRandomMapFile {
     Arrays.sort(rands);
     
     Configuration conf = CachedConfiguration.getInstance();
-    Writer mfw;
+    FileSKVWriter mfw;
     try {
       FileSystem fs = FileSystem.get(conf);
-      mfw = new MyMapFile.Writer(conf, fs, file, Key.class, Value.class, CompressionType.BLOCK);
+      mfw = new RFileOperations().openWriter(file, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

Propchange: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java
Fri Dec 23 17:53:12 2011
@@ -23,18 +23,20 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.file.map.MyMapFile;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.map.MySequenceFile;
 import org.apache.accumulo.core.file.map.MySequenceFile.Reader;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 
 class MultipleIndexIterator2 {
   
@@ -83,12 +85,12 @@ class MultipleIndexIterator2 {
     return currentMin >= 0;
   }
   
-  WritableComparable<?> next() {
+  Key next() {
     if (currentMin < 0) {
       throw new RuntimeException("There is no next");
     }
     
-    WritableComparable<?> ret = nextKey[currentMin];
+    Key ret = nextKey[currentMin];
     
     try {
       nextKey[currentMin] = (Key) readers[currentMin].getKeyClass().newInstance();
@@ -214,16 +216,16 @@ public class MidPointPerfTest2 {
       
       start = end;
       
-      Path outFile = new Path(String.format("%s/index_%04d", newDir, count++));
-      outFiles.add(outFile);
+      String outFile = String.format("%s/index_%04d", newDir, count++);
+      outFiles.add(new Path(outFile));
       
       long t1 = System.currentTimeMillis();
       
-      MySequenceFile.Writer writer = MySequenceFile.createWriter(fs, conf, outFile, Key.class,
LongWritable.class, MySequenceFile.CompressionType.BLOCK);
+      FileSKVWriter writer = new RFileOperations().openWriter(outFile, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
       MultipleIndexIterator2 mii = new MultipleIndexIterator2(conf, fs, inFiles);
       
       while (mii.hasNext()) {
-        writer.append(mii.next(), new LongWritable(0));
+        writer.append(mii.next(), new Value(new byte[0]));
       }
       
       mii.close();
@@ -254,7 +256,7 @@ public class MidPointPerfTest2 {
     FileSystem fs = FileSystem.get(conf);
     
     for (int i = 0; i < numFiles; i++) {
-      String newDir = String.format("%s/" + MyMapFile.EXTENSION + "_%06d", dir, i);
+      String newDir = String.format("%s/" + RFile.EXTENSION + "_%06d", dir, i);
       fs.mkdirs(new Path(newDir));
       
       List<Key> keys = new ArrayList<Key>();
@@ -267,13 +269,12 @@ public class MidPointPerfTest2 {
       
       Collections.sort(keys, new CompareKeys());
       
-      MySequenceFile.Writer writer = MySequenceFile.createWriter(fs, conf, new Path(newDir
+ "/index"), Key.class, LongWritable.class,
-          MySequenceFile.CompressionType.BLOCK);
+      FileSKVWriter writer = new RFileOperations().openWriter(newDir, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
       
       System.out.println(new Path(newDir + "/index"));
       
       for (Key key : keys) {
-        writer.append(key, new LongWritable(0));
+        writer.append(key, new Value(new byte[0]));
       }
       
       writer.close();

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
Fri Dec 23 17:53:12 2011
@@ -38,7 +38,6 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.map.MyMapFile;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
@@ -295,7 +294,7 @@ public class TestIngest {
       if (ingestArgs.outputToMapFile) {
         Configuration conf = CachedConfiguration.getInstance();
         FileSystem fs = FileSystem.get(conf);
-        writer = FileOperations.getInstance().openWriter(ingestArgs.outputFile + "." + MyMapFile.EXTENSION,
fs, conf,
+        writer = FileOperations.getInstance().openWriter(ingestArgs.outputFile + "." + RFile.EXTENSION,
fs, conf,
             AccumuloConfiguration.getDefaultConfiguration());
         writer.startDefaultLocalityGroup();
       } else if (ingestArgs.outputToRFile) {

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
Fri Dec 23 17:53:12 2011
@@ -65,7 +65,7 @@ public class BadIteratorMincTest extends
     UtilWaitThread.sleep(1000);
     
     // minc should fail, so there should be no files
-    checkMapFiles("foo", 1, 1, 0, 0);
+    checkRFiles("foo", 1, 1, 0, 0);
     
     // try to scan table
     Scanner scanner = getConnector().createScanner("foo", Constants.NO_AUTHS);
@@ -85,7 +85,7 @@ public class BadIteratorMincTest extends
     UtilWaitThread.sleep(5000);
     
     // minc should complete
-    checkMapFiles("foo", 1, 1, 1, 1);
+    checkRFiles("foo", 1, 1, 1, 1);
     
     count = 0;
     for (@SuppressWarnings("unused")

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java
Fri Dec 23 17:53:12 2011
@@ -83,10 +83,10 @@ public class BloomFilterTest extends Fun
     getConnector().tableOperations().flush("bt4", null, null, true);
     
     // ensure minor compactions are finished
-    super.checkMapFiles("bt1", 1, 1, 1, 1);
-    super.checkMapFiles("bt2", 1, 1, 1, 1);
-    super.checkMapFiles("bt3", 1, 1, 1, 1);
-    super.checkMapFiles("bt4", 1, 1, 1, 1);
+    super.checkRFiles("bt1", 1, 1, 1, 1);
+    super.checkRFiles("bt2", 1, 1, 1, 1);
+    super.checkRFiles("bt3", 1, 1, 1, 1);
+    super.checkRFiles("bt4", 1, 1, 1, 1);
     
     // these queries should only run quickly if bloom
     // filters are working

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java
Fri Dec 23 17:53:12 2011
@@ -29,7 +29,6 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.file.map.MyMapFile;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.trace.TraceFileSystem;
@@ -62,14 +61,14 @@ public class BulkFileTest extends Functi
     
     fs.delete(new Path(dir), true);
     
-    FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + MyMapFile.EXTENSION,
fs, conf, ServerConfiguration.getSystemConfiguration());
+    FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION,
fs, conf,
+        ServerConfiguration.getSystemConfiguration());
     writer1.startDefaultLocalityGroup();
     writeData(writer1, 0, 333);
     writer1.close();
     
-    fs.rename(new Path(dir + "/f1." + MyMapFile.EXTENSION), new Path(dir + "/f1"));
-    
-    FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + MyMapFile.EXTENSION,
fs, conf, ServerConfiguration.getSystemConfiguration());
+    FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION,
fs, conf,
+        ServerConfiguration.getSystemConfiguration());
     writer2.startDefaultLocalityGroup();
     writeData(writer2, 334, 999);
     writer2.close();
@@ -81,7 +80,7 @@ public class BulkFileTest extends Functi
     
     bulkImport(fs, "bulkFile", dir);
     
-    checkMapFiles("bulkFile", 6, 6, 1, 1);
+    checkRFiles("bulkFile", 6, 6, 1, 1);
     
     verifyData("bulkFile", 0, 1999);
     

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java
Fri Dec 23 17:53:12 2011
@@ -68,7 +68,7 @@ public class BulkSplitOptimizationTest e
     bulkImport(fs, TABLE_NAME, "/testmf");
     
     checkSplits(TABLE_NAME, 0, 0);
-    checkMapFiles(TABLE_NAME, 1, 1, 100, 100);
+    checkRFiles(TABLE_NAME, 1, 1, 100, 100);
     
     // initiate splits
     getConnector().tableOperations().setProperty(TABLE_NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"100K");
@@ -85,6 +85,6 @@ public class BulkSplitOptimizationTest e
     VerifyIngest.main(new String[] {"-timestamp", "1", "-size", "50", "-random", "56", "100000",
"0", "1"});
     
     // ensure each tablet does not have all map files
-    checkMapFiles(TABLE_NAME, 50, 100, 1, 4);
+    checkRFiles(TABLE_NAME, 50, 100, 1, 4);
   }
 }

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java
Fri Dec 23 17:53:12 2011
@@ -60,7 +60,7 @@ public class DeleteEverythingTest extend
     
     getConnector().tableOperations().flush("de", null, null, true);
     
-    checkMapFiles("de", 1, 1, 1, 1);
+    checkRFiles("de", 1, 1, 1, 1);
     
     m = new Mutation(new Text("foo"));
     m.putDelete(new Text("bar"), new Text("1910"));
@@ -84,7 +84,7 @@ public class DeleteEverythingTest extend
     getConnector().tableOperations().setProperty("de", Property.TABLE_MAJC_RATIO.getKey(),
"1.0");
     UtilWaitThread.sleep(4000);
     
-    checkMapFiles("de", 1, 1, 0, 0);
+    checkRFiles("de", 1, 1, 0, 0);
     
     bw.close();
     

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java
Fri Dec 23 17:53:12 2011
@@ -218,11 +218,11 @@ public abstract class FunctionalTest {
   }
   
   /**
-   * A utility function that checks that each tablet has an expected number of map files.
+   * A utility function that checks that each tablet has an expected number of rfiles.
    * 
    */
   
-  protected void checkMapFiles(String tableName, int minTablets, int maxTablets, int minMapFiles,
int maxMapFiles) throws Exception {
+  protected void checkRFiles(String tableName, int minTablets, int maxTablets, int minRFiles,
int maxRFiles) throws Exception {
     Scanner scanner = getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
     String tableId = Tables.getNameToIdMap(getInstance()).get(tableName);
     scanner.setRange(new Range(new Text(tableId + ";"), true, new Text(tableId + "<"),
true));
@@ -251,7 +251,7 @@ public abstract class FunctionalTest {
     
     Set<Entry<Text,Integer>> es = tabletFileCounts.entrySet();
     for (Entry<Text,Integer> entry : es) {
-      if (entry.getValue() > maxMapFiles || entry.getValue() < minMapFiles) {
+      if (entry.getValue() > maxRFiles || entry.getValue() < minRFiles) {
         throw new Exception("tablet " + entry.getKey() + " has " + entry.getValue() + " map
files");
       }
     }

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java
Fri Dec 23 17:53:12 2011
@@ -66,7 +66,7 @@ public class MaxOpenTest extends Functio
       TestIngest.main(new String[] {"-random", "" + i, "-timestamp", "" + i, "-size", ""
+ 50, "" + NUM_TO_INGEST, "0", "1"});
       
       getConnector().tableOperations().flush("test_ingest", null, null, true);
-      checkMapFiles("test_ingest", NUM_TABLETS, NUM_TABLETS, i + 1, i + 1);
+      checkRFiles("test_ingest", NUM_TABLETS, NUM_TABLETS, i + 1, i + 1);
     }
     
     List<Range> ranges = new ArrayList<Range>(NUM_TO_INGEST);

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java
Fri Dec 23 17:53:12 2011
@@ -62,7 +62,7 @@ public class RowDeleteTest extends Funct
     bw.flush();
     getConnector().tableOperations().flush("rdel1", null, null, true);
     
-    checkMapFiles("rdel1", 1, 1, 1, 1);
+    checkRFiles("rdel1", 1, 1, 1, 1);
     
     int count = 0;
     Scanner scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS);
@@ -81,7 +81,7 @@ public class RowDeleteTest extends Funct
     // Wait for the files in HDFS to be older than the future compaction date
     UtilWaitThread.sleep(2000);
     
-    checkMapFiles("rdel1", 1, 1, 2, 2);
+    checkRFiles("rdel1", 1, 1, 2, 2);
     
     count = 0;
     scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS);
@@ -94,7 +94,7 @@ public class RowDeleteTest extends Funct
     
     getConnector().tableOperations().compact("rdel1", null, null, false, true);
     
-    checkMapFiles("rdel1", 1, 1, 0, 0);
+    checkRFiles("rdel1", 1, 1, 0, 0);
     
     count = 0;
     scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS);

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
Fri Dec 23 17:53:12 2011
@@ -34,7 +34,7 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.file.map.MyMapFile;
+import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -48,9 +48,9 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.io.Text;
 
 public class SplitRecoveryTest extends FunctionalTest {
@@ -128,7 +128,7 @@ public class SplitRecoveryTest extends F
       String tdir = "/dir_" + i;
       MetadataTable.addTablet(extent, tdir, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID,
zl);
       SortedMap<String,DataFileValue> mapFiles = new TreeMap<String,DataFileValue>();
-      mapFiles.put(tdir + "/" + MyMapFile.EXTENSION + "_000_000", new DataFileValue(1000017
+ i, 10000 + i));
+      mapFiles.put(tdir + "/" + RFile.EXTENSION + "_000_000", new DataFileValue(1000017 +
i, 10000 + i));
       
       if (i == extentToSplit) {
         splitMapFiles = mapFiles;

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java
Fri Dec 23 17:53:12 2011
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
@@ -217,7 +218,7 @@ public class MapFilePerformanceTest {
             System.out.println("Thread " + Thread.currentThread().getName() + " creating
map files blocksize = " + blocksize + " num = " + num);
             String[] filenames;
             try {
-              filenames = createMapFiles(args[0], args[1] + "/" + MyMapFile.EXTENSION + "_"
+ blocksize, blocksize, num);
+              filenames = createMapFiles(args[0], args[1] + "/" + Constants.MAPFILE_EXTENSION
+ "_" + blocksize, blocksize, num);
               
               synchronized (tests) {
                 Map<Integer,String[]> map = tests.get(num);

Modified: incubator/accumulo/trunk/test/system/auto/simple/compaction.py
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/test/system/auto/simple/compaction.py?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/test/system/auto/simple/compaction.py (original)
+++ incubator/accumulo/trunk/test/system/auto/simple/compaction.py Fri Dec 23 17:53:12 2011
@@ -44,7 +44,7 @@ class CompactionTest(SimpleBulkTest):
         handle = self.runClassOn(
             self.masterHost(),
             'org.apache.accumulo.server.test.CreateMapFiles',
-            "testmf 4 0 500000 59".split())
+            "testrf 4 0 500000 59".split())
         out, err = handle.communicate()
         self.assert_(handle.returncode == 0)
 
@@ -52,8 +52,8 @@ class CompactionTest(SimpleBulkTest):
 
         # initialize the database
         self.createTable('test_ingest')
-        self.execute(self.masterHost(), 'hadoop dfs -rmr /testmf'.split())
-        self.execute(self.masterHost(), 'hadoop dfs -rmr /testmfFail'.split())
+        self.execute(self.masterHost(), 'hadoop dfs -rmr /testrf'.split())
+        self.execute(self.masterHost(), 'hadoop dfs -rmr /testrfFail'.split())
 
         # insert some data
         self.createMapFiles(self.masterHost())



Mime
View raw message