Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CF9247DDF for ; Fri, 23 Dec 2011 17:53:39 +0000 (UTC) Received: (qmail 3991 invoked by uid 500); 23 Dec 2011 17:53:39 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 3960 invoked by uid 500); 23 Dec 2011 17:53:39 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 3953 invoked by uid 99); 23 Dec 2011 17:53:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Dec 2011 17:53:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Dec 2011 17:53:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5BAA52388A36; Fri, 23 Dec 2011 17:53:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1222766 [3/3] - in /incubator/accumulo/trunk: src/core/src/main/java/org/apache/accumulo/core/ src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ src/core/src/main/java/org/apache/accumulo/core/file/ src/core/src/main/java... Date: Fri, 23 Dec 2011 17:53:13 -0000 To: accumulo-commits@incubator.apache.org From: vines@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111223175315.5BAA52388A36@eris.apache.org> Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java Fri Dec 23 17:53:12 2011 @@ -40,9 +40,9 @@ import org.apache.accumulo.core.data.Mut import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.map.MapFileOperations; -import org.apache.accumulo.core.file.map.MyMapFile; -import org.apache.accumulo.core.file.map.MySequenceFile; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.file.rfile.RFileOperations; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -71,7 +71,7 @@ class MemKeyComparator implements Compar if (cmp == 0) { if (k1 instanceof MemKey) if (k2 instanceof MemKey) - cmp = ((MemKey) k2).mutationCount - ((MemKey) k1).mutationCount; + cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount; else cmp = 1; else if (k2 instanceof MemKey) @@ -84,22 +84,22 @@ class MemKeyComparator implements Compar class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator { - int maxMutationCount; + int kvCount; - public PartialMutationSkippingIterator(SortedKeyValueIterator source, int maxMutationCount) { + public PartialMutationSkippingIterator(SortedKeyValueIterator source, int maxKVCount) { setSource(source); - this.maxMutationCount = maxMutationCount; + this.kvCount = maxKVCount; } @Override protected void consume() throws IOException { - while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).mutationCount > maxMutationCount) + while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount) getSource().next(); } @Override public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - return new PartialMutationSkippingIterator(getSource().deepCopy(env), maxMutationCount); + return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount); } @Override @@ -109,6 +109,77 @@ class PartialMutationSkippingIterator ex } +class MemKeyConversionIterator extends SkippingIterator implements InterruptibleIterator { + MemKey currKey = null; + Value currVal = null; + + public MemKeyConversionIterator(SortedKeyValueIterator source) { + super(); + setSource(source); + } + + public MemKeyConversionIterator(SortedKeyValueIterator source, MemKey startKey) { + this(source); + try { + if (currKey != null) + currKey = (MemKey) startKey.clone(); + } catch (CloneNotSupportedException e) { + // MemKey is supported + } + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + return new MemKeyConversionIterator(getSource().deepCopy(env), currKey); + } + + @Override + public Key getTopKey() { + return currKey; + } + + @Override + public Value getTopValue() { + return currVal; + } + + private void getTopKeyVal() { + Key k = super.getTopKey(); + Value v = super.getTopValue(); + if (k instanceof MemKey || k == null) { + currKey = (MemKey) k; + currVal = v; + return; + } + currVal = new Value(v); + int mc = MemValue.splitKVCount(currVal); + currKey = new MemKey(k, mc); + + } + + public void next() throws IOException { + super.next(); + getTopKeyVal(); + } + + @Override + protected void consume() throws IOException { + MemKey stopPoint = currKey; + if (hasTop()) + getTopKeyVal(); + if (stopPoint == null) + return; + while (getSource().hasTop() && currKey.compareTo(stopPoint) <= 0) + next(); + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + +} + public class InMemoryMap { MutationLog mutationLog; @@ -152,7 +223,7 @@ public class InMemoryMap { public long getMemoryUsed(); - public void mutate(List mutations, int mutationCount); + public void mutate(List mutations, int kvCount); } private static class DefaultMap implements SimpleMap { @@ -203,15 +274,14 @@ public class InMemoryMap { } @Override - public void mutate(List mutations, int mutationCount) { + public void mutate(List mutations, int kvCount) { for (Mutation m : mutations) { for (ColumnUpdate cvp : m.getUpdates()) { Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(), - false, mutationCount); + false, kvCount++); Value value = new Value(cvp.getValue()); put(newKey, value); } - mutationCount++; } } @@ -253,22 +323,25 @@ public class InMemoryMap { } @Override - public void mutate(List mutations, int mutationCount) { - nativeMap.mutate(mutations, mutationCount); + public void mutate(List mutations, int kvCount) { + nativeMap.mutate(mutations, kvCount); } } - private AtomicInteger nextMutationCount = new AtomicInteger(1); - private AtomicInteger mutationCount = new AtomicInteger(0); + private AtomicInteger nextKVCount = new AtomicInteger(1); + private AtomicInteger kvCount = new AtomicInteger(0); /** * Applies changes to a row in the InMemoryMap * */ public void mutate(List mutations) { - int mc = nextMutationCount.getAndAdd(mutations.size()); + int numKVs = 0; + for (int i = 0; i < mutations.size(); i++) + numKVs += mutations.get(i).size(); + int kv = nextKVCount.getAndAdd(numKVs); try { - map.mutate(mutations, mc); + map.mutate(mutations, kv); } finally { synchronized (this) { // Can not update mutationCount while writes that started before @@ -277,14 +350,14 @@ public class InMemoryMap { // a read may not see a successful write. Therefore writes must // wait for writes that started before to finish. - while (mutationCount.get() != mc - 1) { + while (kvCount.get() != kv - 1) { try { wait(); } catch (InterruptedException ex) { // ignored } } - mutationCount.set(mc + mutations.size() - 1); + kvCount.set(kv + numKVs - 1); notifyAll(); } } @@ -357,8 +430,8 @@ public class InMemoryMap { Configuration conf = CachedConfiguration.getInstance(); FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); - FileSKVIterator reader = new MapFileOperations.RangeIterator(new MyMapFile.Reader(fs, memDumpFile, conf)); - + FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration()); + readers.add(reader); iter = reader; @@ -447,10 +520,10 @@ public class InMemoryMap { if (deleted) throw new IllegalStateException("Can not obtain iterator after map deleted"); - int mc = mutationCount.get(); + int mc = kvCount.get(); MemoryDataSource mds = new MemoryDataSource(); SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource()); - MemoryIterator mi = new MemoryIterator(new ColumnFamilySkippingIterator(new PartialMutationSkippingIterator(ssi, mc))); + MemoryIterator mi = new MemoryIterator(new ColumnFamilySkippingIterator(new PartialMutationSkippingIterator(new MemKeyConversionIterator(ssi), mc))); mi.setSSI(ssi); mi.setMDS(mds); activeIters.add(mi); @@ -459,9 +532,9 @@ public class InMemoryMap { public SortedKeyValueIterator compactionIterator() { - if (nextMutationCount.get() - 1 != mutationCount.get()) - throw new IllegalStateException("Memory map in unexpected state : nextMutationCount = " + nextMutationCount.get() + " mutationCount = " - + mutationCount.get()); + if (nextKVCount.get() - 1 != kvCount.get()) + throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = " + + kvCount.get()); return new ColumnFamilySkippingIterator(map.skvIterator()); } @@ -489,17 +562,21 @@ public class InMemoryMap { Configuration conf = CachedConfiguration.getInstance(); FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); - String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + ".map"; + String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION; Configuration newConf = new Configuration(conf); newConf.setInt("io.seqfile.compress.blocksize", 100000); - MyMapFile.Writer out = new MyMapFile.Writer(newConf, fs, tmpFile, MemKey.class, Value.class, MySequenceFile.CompressionType.BLOCK); + FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration()); + out.startDefaultLocalityGroup(); InterruptibleIterator iter = map.skvIterator(); iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); while (iter.hasTop() && activeIters.size() > 0) { - out.append(iter.getTopKey(), iter.getTopValue()); + // RFile does not support MemKey, so we move the kv count into the value only for the RFile. + // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written + Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount); + out.append(iter.getTopKey(), newValue); iter.next(); } Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java Fri Dec 23 17:53:12 2011 @@ -24,25 +24,25 @@ import org.apache.accumulo.core.data.Key class MemKey extends Key { - int mutationCount; + int kvCount; public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) { super(row, cf, cq, cv, ts, del, copy); - this.mutationCount = mc; + this.kvCount = mc; } public MemKey() { super(); - this.mutationCount = Integer.MAX_VALUE; + this.kvCount = Integer.MAX_VALUE; } public MemKey(Key key, int mc) { super(key); - this.mutationCount = mc; + this.kvCount = mc; } public String toString() { - return super.toString() + " mc=" + mutationCount; + return super.toString() + " mc=" + kvCount; } @Override @@ -53,13 +53,13 @@ class MemKey extends Key { @Override public void write(DataOutput out) throws IOException { super.write(out); - out.writeInt(mutationCount); + out.writeInt(kvCount); } @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - mutationCount = in.readInt(); + kvCount = in.readInt(); } @Override @@ -68,7 +68,7 @@ class MemKey extends Key { int cmp = super.compareTo(k); if (cmp == 0 && k instanceof MemKey) { - cmp = ((MemKey) k).mutationCount - mutationCount; + cmp = ((MemKey) k).kvCount - kvCount; } return cmp; Added: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java?rev=1222766&view=auto ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java (added) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java Fri Dec 23 17:53:12 2011 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tabletserver; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.accumulo.core.data.Value; + +/** + * + */ +public class MemValue extends Value { + int kvCount; + boolean merged = false; + + /** + * @param value + * Value + * @param kv + * kv count + */ + public MemValue(byte[] value, int kv) { + super(value); + this.kvCount = kv; + } + + public MemValue() { + super(); + this.kvCount = Integer.MAX_VALUE; + } + + public MemValue(Value value, int kv) { + super(value); + this.kvCount = kv; + } + + // Override + public void write(final DataOutput out) throws IOException { + if (!merged) { + byte[] combinedBytes = new byte[getSize() + 4]; + System.arraycopy(value, 0, combinedBytes, 4, getSize()); + combinedBytes[0] = (byte) (kvCount >>> 24); + combinedBytes[1] = (byte) (kvCount >>> 16); + combinedBytes[2] = (byte) (kvCount >>> 8); + combinedBytes[3] = (byte) (kvCount); + value = combinedBytes; + merged = true; + } + super.write(out); + } + + public void set(final byte[] b) { + super.set(b); + merged = false; + } + + public void copy(byte[] b) { + super.copy(b); + merged = false; + } + + /** + * Takes a Value and will take out the embedded kvCount, and then return that value while replacing the Value with the original unembedded version + * + * @param v + * @return + */ + public static int splitKVCount(Value v) { + if (v instanceof MemValue) + return ((MemValue) v).kvCount; + + byte[] originalBytes = new byte[v.getSize() - 4]; + byte[] combined = v.get(); + System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length); + v.set(originalBytes); + return (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2] & 0xFF) << 8) + (combined[3] & 0xFF); + } +} Propchange: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Fri Dec 23 17:53:12 2011 @@ -66,7 +66,6 @@ import org.apache.accumulo.core.data.thr import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.FileUtil; -import org.apache.accumulo.core.file.map.MyMapFile; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil; @@ -1548,7 +1547,7 @@ public class Tablet { continue; } - if (!filename.startsWith(MyMapFile.EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) { + if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) { log.error("unknown file in tablet" + path); continue; } Copied: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java (from r1215244, incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java) URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java?p2=incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java&p1=incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java&r1=1215244&r2=1222766&rev=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java Fri Dec 23 17:53:12 2011 @@ -20,17 +20,17 @@ import java.io.IOException; import java.util.Arrays; import java.util.Random; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.map.MyMapFile; -import org.apache.accumulo.core.file.map.MyMapFile.Writer; -import org.apache.accumulo.core.file.map.MySequenceFile.CompressionType; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFileOperations; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; -public class CreateRandomMapFile { +public class CreateRandomRFile { private static int num; private static String file; @@ -62,10 +62,10 @@ public class CreateRandomMapFile { Arrays.sort(rands); Configuration conf = CachedConfiguration.getInstance(); - Writer mfw; + FileSKVWriter mfw; try { FileSystem fs = FileSystem.get(conf); - mfw = new MyMapFile.Writer(conf, fs, file, Key.class, Value.class, CompressionType.BLOCK); + mfw = new RFileOperations().openWriter(file, fs, conf, AccumuloConfiguration.getDefaultConfiguration()); } catch (IOException e) { throw new RuntimeException(e); } Propchange: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java Fri Dec 23 17:53:12 2011 @@ -23,18 +23,20 @@ import java.util.Comparator; import java.util.List; import java.util.Random; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.file.map.MyMapFile; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.map.MySequenceFile; import org.apache.accumulo.core.file.map.MySequenceFile.Reader; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.file.rfile.RFileOperations; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; class MultipleIndexIterator2 { @@ -83,12 +85,12 @@ class MultipleIndexIterator2 { return currentMin >= 0; } - WritableComparable next() { + Key next() { if (currentMin < 0) { throw new RuntimeException("There is no next"); } - WritableComparable ret = nextKey[currentMin]; + Key ret = nextKey[currentMin]; try { nextKey[currentMin] = (Key) readers[currentMin].getKeyClass().newInstance(); @@ -214,16 +216,16 @@ public class MidPointPerfTest2 { start = end; - Path outFile = new Path(String.format("%s/index_%04d", newDir, count++)); - outFiles.add(outFile); + String outFile = String.format("%s/index_%04d", newDir, count++); + outFiles.add(new Path(outFile)); long t1 = System.currentTimeMillis(); - MySequenceFile.Writer writer = MySequenceFile.createWriter(fs, conf, outFile, Key.class, LongWritable.class, MySequenceFile.CompressionType.BLOCK); + FileSKVWriter writer = new RFileOperations().openWriter(outFile, fs, conf, AccumuloConfiguration.getDefaultConfiguration()); MultipleIndexIterator2 mii = new MultipleIndexIterator2(conf, fs, inFiles); while (mii.hasNext()) { - writer.append(mii.next(), new LongWritable(0)); + writer.append(mii.next(), new Value(new byte[0])); } mii.close(); @@ -254,7 +256,7 @@ public class MidPointPerfTest2 { FileSystem fs = FileSystem.get(conf); for (int i = 0; i < numFiles; i++) { - String newDir = String.format("%s/" + MyMapFile.EXTENSION + "_%06d", dir, i); + String newDir = String.format("%s/" + RFile.EXTENSION + "_%06d", dir, i); fs.mkdirs(new Path(newDir)); List keys = new ArrayList(); @@ -267,13 +269,12 @@ public class MidPointPerfTest2 { Collections.sort(keys, new CompareKeys()); - MySequenceFile.Writer writer = MySequenceFile.createWriter(fs, conf, new Path(newDir + "/index"), Key.class, LongWritable.class, - MySequenceFile.CompressionType.BLOCK); + FileSKVWriter writer = new RFileOperations().openWriter(newDir, fs, conf, AccumuloConfiguration.getDefaultConfiguration()); System.out.println(new Path(newDir + "/index")); for (Key key : keys) { - writer.append(key, new LongWritable(0)); + writer.append(key, new Value(new byte[0])); } writer.close(); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java Fri Dec 23 17:53:12 2011 @@ -38,7 +38,6 @@ import org.apache.accumulo.core.data.Mut import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.file.map.MyMapFile; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; @@ -295,7 +294,7 @@ public class TestIngest { if (ingestArgs.outputToMapFile) { Configuration conf = CachedConfiguration.getInstance(); FileSystem fs = FileSystem.get(conf); - writer = FileOperations.getInstance().openWriter(ingestArgs.outputFile + "." + MyMapFile.EXTENSION, fs, conf, + writer = FileOperations.getInstance().openWriter(ingestArgs.outputFile + "." + RFile.EXTENSION, fs, conf, AccumuloConfiguration.getDefaultConfiguration()); writer.startDefaultLocalityGroup(); } else if (ingestArgs.outputToRFile) { Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java Fri Dec 23 17:53:12 2011 @@ -65,7 +65,7 @@ public class BadIteratorMincTest extends UtilWaitThread.sleep(1000); // minc should fail, so there should be no files - checkMapFiles("foo", 1, 1, 0, 0); + checkRFiles("foo", 1, 1, 0, 0); // try to scan table Scanner scanner = getConnector().createScanner("foo", Constants.NO_AUTHS); @@ -85,7 +85,7 @@ public class BadIteratorMincTest extends UtilWaitThread.sleep(5000); // minc should complete - checkMapFiles("foo", 1, 1, 1, 1); + checkRFiles("foo", 1, 1, 1, 1); count = 0; for (@SuppressWarnings("unused") Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java Fri Dec 23 17:53:12 2011 @@ -83,10 +83,10 @@ public class BloomFilterTest extends Fun getConnector().tableOperations().flush("bt4", null, null, true); // ensure minor compactions are finished - super.checkMapFiles("bt1", 1, 1, 1, 1); - super.checkMapFiles("bt2", 1, 1, 1, 1); - super.checkMapFiles("bt3", 1, 1, 1, 1); - super.checkMapFiles("bt4", 1, 1, 1, 1); + super.checkRFiles("bt1", 1, 1, 1, 1); + super.checkRFiles("bt2", 1, 1, 1, 1); + super.checkRFiles("bt3", 1, 1, 1, 1); + super.checkRFiles("bt4", 1, 1, 1, 1); // these queries should only run quickly if bloom // filters are working Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java Fri Dec 23 17:53:12 2011 @@ -29,7 +29,6 @@ import org.apache.accumulo.core.data.Val import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.FileUtil; -import org.apache.accumulo.core.file.map.MyMapFile; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.trace.TraceFileSystem; @@ -62,14 +61,14 @@ public class BulkFileTest extends Functi fs.delete(new Path(dir), true); - FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + MyMapFile.EXTENSION, fs, conf, ServerConfiguration.getSystemConfiguration()); + FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, + ServerConfiguration.getSystemConfiguration()); writer1.startDefaultLocalityGroup(); writeData(writer1, 0, 333); writer1.close(); - fs.rename(new Path(dir + "/f1." + MyMapFile.EXTENSION), new Path(dir + "/f1")); - - FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + MyMapFile.EXTENSION, fs, conf, ServerConfiguration.getSystemConfiguration()); + FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, + ServerConfiguration.getSystemConfiguration()); writer2.startDefaultLocalityGroup(); writeData(writer2, 334, 999); writer2.close(); @@ -81,7 +80,7 @@ public class BulkFileTest extends Functi bulkImport(fs, "bulkFile", dir); - checkMapFiles("bulkFile", 6, 6, 1, 1); + checkRFiles("bulkFile", 6, 6, 1, 1); verifyData("bulkFile", 0, 1999); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java Fri Dec 23 17:53:12 2011 @@ -68,7 +68,7 @@ public class BulkSplitOptimizationTest e bulkImport(fs, TABLE_NAME, "/testmf"); checkSplits(TABLE_NAME, 0, 0); - checkMapFiles(TABLE_NAME, 1, 1, 100, 100); + checkRFiles(TABLE_NAME, 1, 1, 100, 100); // initiate splits getConnector().tableOperations().setProperty(TABLE_NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K"); @@ -85,6 +85,6 @@ public class BulkSplitOptimizationTest e VerifyIngest.main(new String[] {"-timestamp", "1", "-size", "50", "-random", "56", "100000", "0", "1"}); // ensure each tablet does not have all map files - checkMapFiles(TABLE_NAME, 50, 100, 1, 4); + checkRFiles(TABLE_NAME, 50, 100, 1, 4); } } Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java Fri Dec 23 17:53:12 2011 @@ -60,7 +60,7 @@ public class DeleteEverythingTest extend getConnector().tableOperations().flush("de", null, null, true); - checkMapFiles("de", 1, 1, 1, 1); + checkRFiles("de", 1, 1, 1, 1); m = new Mutation(new Text("foo")); m.putDelete(new Text("bar"), new Text("1910")); @@ -84,7 +84,7 @@ public class DeleteEverythingTest extend getConnector().tableOperations().setProperty("de", Property.TABLE_MAJC_RATIO.getKey(), "1.0"); UtilWaitThread.sleep(4000); - checkMapFiles("de", 1, 1, 0, 0); + checkRFiles("de", 1, 1, 0, 0); bw.close(); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java Fri Dec 23 17:53:12 2011 @@ -218,11 +218,11 @@ public abstract class FunctionalTest { } /** - * A utility function that checks that each tablet has an expected number of map files. + * A utility function that checks that each tablet has an expected number of rfiles. * */ - protected void checkMapFiles(String tableName, int minTablets, int maxTablets, int minMapFiles, int maxMapFiles) throws Exception { + protected void checkRFiles(String tableName, int minTablets, int maxTablets, int minRFiles, int maxRFiles) throws Exception { Scanner scanner = getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); String tableId = Tables.getNameToIdMap(getInstance()).get(tableName); scanner.setRange(new Range(new Text(tableId + ";"), true, new Text(tableId + "<"), true)); @@ -251,7 +251,7 @@ public abstract class FunctionalTest { Set> es = tabletFileCounts.entrySet(); for (Entry entry : es) { - if (entry.getValue() > maxMapFiles || entry.getValue() < minMapFiles) { + if (entry.getValue() > maxRFiles || entry.getValue() < minRFiles) { throw new Exception("tablet " + entry.getKey() + " has " + entry.getValue() + " map files"); } } Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java Fri Dec 23 17:53:12 2011 @@ -66,7 +66,7 @@ public class MaxOpenTest extends Functio TestIngest.main(new String[] {"-random", "" + i, "-timestamp", "" + i, "-size", "" + 50, "" + NUM_TO_INGEST, "0", "1"}); getConnector().tableOperations().flush("test_ingest", null, null, true); - checkMapFiles("test_ingest", NUM_TABLETS, NUM_TABLETS, i + 1, i + 1); + checkRFiles("test_ingest", NUM_TABLETS, NUM_TABLETS, i + 1, i + 1); } List ranges = new ArrayList(NUM_TO_INGEST); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java Fri Dec 23 17:53:12 2011 @@ -62,7 +62,7 @@ public class RowDeleteTest extends Funct bw.flush(); getConnector().tableOperations().flush("rdel1", null, null, true); - checkMapFiles("rdel1", 1, 1, 1, 1); + checkRFiles("rdel1", 1, 1, 1, 1); int count = 0; Scanner scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS); @@ -81,7 +81,7 @@ public class RowDeleteTest extends Funct // Wait for the files in HDFS to be older than the future compaction date UtilWaitThread.sleep(2000); - checkMapFiles("rdel1", 1, 1, 2, 2); + checkRFiles("rdel1", 1, 1, 2, 2); count = 0; scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS); @@ -94,7 +94,7 @@ public class RowDeleteTest extends Funct getConnector().tableOperations().compact("rdel1", null, null, false, true); - checkMapFiles("rdel1", 1, 1, 0, 0); + checkRFiles("rdel1", 1, 1, 0, 0); count = 0; scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java Fri Dec 23 17:53:12 2011 @@ -34,7 +34,7 @@ import org.apache.accumulo.core.data.Key import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileUtil; -import org.apache.accumulo.core.file.map.MyMapFile; +import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.MetadataTable.DataFileValue; import org.apache.accumulo.core.zookeeper.ZooUtil; @@ -48,9 +48,9 @@ import org.apache.accumulo.server.tablet import org.apache.accumulo.server.util.MetadataTable; import org.apache.accumulo.server.zookeeper.IZooReaderWriter; import org.apache.accumulo.server.zookeeper.ZooLock; -import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.io.Text; public class SplitRecoveryTest extends FunctionalTest { @@ -128,7 +128,7 @@ public class SplitRecoveryTest extends F String tdir = "/dir_" + i; MetadataTable.addTablet(extent, tdir, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID, zl); SortedMap mapFiles = new TreeMap(); - mapFiles.put(tdir + "/" + MyMapFile.EXTENSION + "_000_000", new DataFileValue(1000017 + i, 10000 + i)); + mapFiles.put(tdir + "/" + RFile.EXTENSION + "_000_000", new DataFileValue(1000017 + i, 10000 + i)); if (i == extentToSplit) { splitMapFiles = mapFiles; Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java Fri Dec 23 17:53:12 2011 @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Range; @@ -217,7 +218,7 @@ public class MapFilePerformanceTest { System.out.println("Thread " + Thread.currentThread().getName() + " creating map files blocksize = " + blocksize + " num = " + num); String[] filenames; try { - filenames = createMapFiles(args[0], args[1] + "/" + MyMapFile.EXTENSION + "_" + blocksize, blocksize, num); + filenames = createMapFiles(args[0], args[1] + "/" + Constants.MAPFILE_EXTENSION + "_" + blocksize, blocksize, num); synchronized (tests) { Map map = tests.get(num); Modified: incubator/accumulo/trunk/test/system/auto/simple/compaction.py URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/test/system/auto/simple/compaction.py?rev=1222766&r1=1222765&r2=1222766&view=diff ============================================================================== --- incubator/accumulo/trunk/test/system/auto/simple/compaction.py (original) +++ incubator/accumulo/trunk/test/system/auto/simple/compaction.py Fri Dec 23 17:53:12 2011 @@ -44,7 +44,7 @@ class CompactionTest(SimpleBulkTest): handle = self.runClassOn( self.masterHost(), 'org.apache.accumulo.server.test.CreateMapFiles', - "testmf 4 0 500000 59".split()) + "testrf 4 0 500000 59".split()) out, err = handle.communicate() self.assert_(handle.returncode == 0) @@ -52,8 +52,8 @@ class CompactionTest(SimpleBulkTest): # initialize the database self.createTable('test_ingest') - self.execute(self.masterHost(), 'hadoop dfs -rmr /testmf'.split()) - self.execute(self.masterHost(), 'hadoop dfs -rmr /testmfFail'.split()) + self.execute(self.masterHost(), 'hadoop dfs -rmr /testrf'.split()) + self.execute(self.masterHost(), 'hadoop dfs -rmr /testrfFail'.split()) # insert some data self.createMapFiles(self.masterHost())