accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [44/45] accumulo git commit: Merge branch '1.7' into 1.8
Date Fri, 09 Sep 2016 23:38:03 GMT
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7dca4ab2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7dca4ab2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7dca4ab2

Branch: refs/heads/1.8
Commit: 7dca4ab2920037386518a36aa1f651e10872fe0b
Parents: ba0f0d4 f21c11c
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Fri Sep 9 19:22:41 2016 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Fri Sep 9 19:22:41 2016 -0400

----------------------------------------------------------------------
 .../file/blockfile/impl/CachableBlockFile.java  |   5 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java |  22 +-
 .../streams/BoundedRangeFileInputStream.java    |  35 +-
 .../core/file/rfile/MultiThreadedRFileTest.java | 390 +++++++++++++++++++
 .../accumulo/core/file/rfile/RFileTest.java     |   7 -
 core/src/test/resources/log4j.properties        |   2 +
 6 files changed, 433 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7dca4ab2/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7dca4ab2/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 77de47e,85a833e..bba8aeb
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@@ -615,9 -612,10 +615,10 @@@ public final class BCFile 
        private Decompressor decompressor;
        private final BlockRegion region;
        private final InputStream in;
+       private volatile boolean closed;
  
 -      public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion
region, Configuration conf, CryptoModule cryptoModule,
 -          Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException
{
 +      public <InputStreamType extends InputStream & Seekable> RBlockState(Algorithm
compressionAlgo, InputStreamType fsin, BlockRegion region,
 +          Configuration conf, CryptoModule cryptoModule, Version bcFileVersion, CryptoModuleParameters
cryptoParams) throws IOException {
          this.compressAlgo = compressionAlgo;
          this.region = region;
          this.decompressor = compressionAlgo.getDecompressor();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7dca4ab2/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
index 1c01843,0000000..a033ad4
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
@@@ -1,152 -1,0 +1,157 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.accumulo.core.file.streams;
 +
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.security.AccessController;
 +import java.security.PrivilegedActionException;
 +import java.security.PrivilegedExceptionAction;
 +
 +import org.apache.hadoop.fs.Seekable;
 +
 +/**
 + * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream
as a regular input stream. One can create multiple
 + * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere
with each other.
 + */
 +public class BoundedRangeFileInputStream extends InputStream {
-   private InputStream in;
++
++  private volatile boolean closed = false;
++  private final InputStream in;
 +  private long pos;
 +  private long end;
 +  private long mark;
 +  private final byte[] oneByte = new byte[1];
 +
 +  /**
 +   * Constructor
 +   *
 +   * @param in
 +   *          The FSDataInputStream we connect to.
 +   * @param offset
 +   *          Beginning offset of the region.
 +   * @param length
 +   *          Length of the region.
 +   *
 +   *          The actual length of the region may be smaller if (off_begin + length) goes
beyond the end of FS input stream.
 +   */
 +  public <StreamType extends InputStream & Seekable> BoundedRangeFileInputStream(StreamType
in, long offset, long length) {
 +    if (offset < 0 || length < 0) {
 +      throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length);
 +    }
 +
 +    this.in = in;
 +    this.pos = offset;
 +    this.end = offset + length;
 +    this.mark = -1;
 +  }
 +
 +  @Override
 +  public int available() throws IOException {
-     int avail = in.available();
-     if (pos + avail > end) {
-       avail = (int) (end - pos);
-     }
- 
-     return avail;
++    return (int) (end - pos);
 +  }
 +
 +  @Override
 +  public int read() throws IOException {
 +    int ret = read(oneByte);
 +    if (ret == 1)
 +      return oneByte[0] & 0xff;
 +    return -1;
 +  }
 +
 +  @Override
 +  public int read(byte[] b) throws IOException {
 +    return read(b, 0, b.length);
 +  }
 +
 +  @Override
 +  public int read(final byte[] b, final int off, int len) throws IOException {
 +    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
 +      throw new IndexOutOfBoundsException();
 +    }
 +
 +    final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
 +    if (n == 0)
 +      return -1;
 +    Integer ret = 0;
-     final InputStream inLocal = in;
-     synchronized (inLocal) {
-       ((Seekable) inLocal).seek(pos);
++    synchronized (in) {
++      // ensuring we are not closed which would be followed by someone else reusing the
decompressor
++      if (closed) {
++        throw new IOException("Stream closed");
++      }
++      ((Seekable) in).seek(pos);
 +      try {
 +        ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>()
{
 +          @Override
 +          public Integer run() throws IOException {
 +            int ret = 0;
-             ret = inLocal.read(b, off, n);
++            ret = in.read(b, off, n);
 +            return ret;
 +          }
 +        });
 +      } catch (PrivilegedActionException e) {
 +        throw (IOException) e.getException();
 +      }
 +    }
 +    if (ret < 0) {
 +      end = pos;
 +      return -1;
 +    }
 +    pos += ret;
 +    return ret;
 +  }
 +
 +  @Override
 +  /*
 +   * We may skip beyond the end of the file.
 +   */
 +  public long skip(long n) throws IOException {
 +    long len = Math.min(n, end - pos);
 +    pos += len;
 +    return len;
 +  }
 +
 +  @Override
 +  public void mark(int readlimit) {
 +    mark = pos;
 +  }
 +
 +  @Override
 +  public void reset() throws IOException {
 +    if (mark < 0)
 +      throw new IOException("Resetting to invalid mark");
 +    pos = mark;
 +  }
 +
 +  @Override
 +  public boolean markSupported() {
 +    return true;
 +  }
 +
 +  @Override
 +  public void close() {
-     // Invalidate the state of the stream.
-     in = null;
-     pos = end;
-     mark = -1;
++    // Synchronize on the FSDataInputStream to ensure we are blocked if in the read method:
++    // Once this close completes, the underlying decompression stream may be returned to
++    // the pool and subsequently used. Turns out this is a problem if currently using it
to read.
++    if (!closed) {
++      synchronized (in) {
++        // Invalidate the state of the stream.
++        closed = true;
++      }
++    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7dca4ab2/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 0000000,a0efeed..16f2349
mode 000000,100644..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@@ -1,0 -1,380 +1,390 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.core.file.rfile;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.PrintWriter;
+ import java.io.StringWriter;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Random;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
++
++import org.apache.accumulo.core.client.sample.Sampler;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.data.ArrayByteSequence;
+ import org.apache.accumulo.core.data.ByteSequence;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Range;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.file.FileSKVIterator;
+ import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+ import org.apache.accumulo.core.file.rfile.RFile.Reader;
++import org.apache.accumulo.core.file.streams.PositionedOutputs;
+ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+ import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
++import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
++import org.apache.accumulo.core.sample.impl.SamplerFactory;
+ import org.apache.accumulo.core.util.CachedConfiguration;
+ import org.apache.accumulo.core.util.NamingThreadFactory;
 -import org.apache.commons.lang3.mutable.MutableInt;
++import org.apache.commons.lang.mutable.MutableInt;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.log4j.Logger;
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.rules.TemporaryFolder;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class MultiThreadedRFileTest {
+ 
+   private static final Logger LOG = Logger.getLogger(MultiThreadedRFileTest.class);
 -  private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
++  private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<>();
+ 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir")
+ "/target"));
+ 
+   private static void checkIndex(Reader reader) throws IOException {
+     FileSKVIterator indexIter = reader.getIndex();
+ 
+     if (indexIter.hasTop()) {
+       Key lastKey = new Key(indexIter.getTopKey());
+ 
+       if (reader.getFirstKey().compareTo(lastKey) > 0)
+         throw new RuntimeException("First key out of order " + reader.getFirstKey() + "
" + lastKey);
+ 
+       indexIter.next();
+ 
+       while (indexIter.hasTop()) {
+         if (lastKey.compareTo(indexIter.getTopKey()) > 0)
+           throw new RuntimeException("Indext out of order " + lastKey + " " + indexIter.getTopKey());
+ 
+         lastKey = new Key(indexIter.getTopKey());
+         indexIter.next();
+ 
+       }
+ 
+       if (!reader.getLastKey().equals(lastKey)) {
+         throw new RuntimeException("Last key out of order " + reader.getLastKey() + " "
+ lastKey);
+       }
+     }
+   }
+ 
+   public static class TestRFile {
+ 
+     private Configuration conf = CachedConfiguration.getInstance();
+     public RFile.Writer writer;
+     private FSDataOutputStream dos;
+     private AccumuloConfiguration accumuloConfiguration;
+     public Reader reader;
+     public SortedKeyValueIterator<Key,Value> iter;
+     public File rfile = null;
+     public boolean deepCopy = false;
+ 
+     public TestRFile(AccumuloConfiguration accumuloConfiguration) {
+       this.accumuloConfiguration = accumuloConfiguration;
+       if (this.accumuloConfiguration == null)
+         this.accumuloConfiguration = AccumuloConfiguration.getDefaultConfiguration();
+     }
+ 
+     public void close() throws IOException {
+       if (rfile != null) {
+         FileSystem fs = FileSystem.newInstance(conf);
+         Path path = new Path("file://" + rfile.toString());
+         fs.delete(path, false);
+       }
+     }
+ 
+     public TestRFile deepCopy() throws IOException {
+       TestRFile copy = new TestRFile(accumuloConfiguration);
+       // does not copy any writer resources. This would be for read only.
+       copy.reader = (Reader) reader.deepCopy(null);
+       copy.rfile = rfile;
+       copy.iter = new ColumnFamilySkippingIterator(copy.reader);
+       copy.deepCopy = true;
+ 
+       checkIndex(copy.reader);
+       return copy;
+     }
+ 
+     public void openWriter(boolean startDLG) throws IOException {
+       if (deepCopy) {
+         throw new IOException("Cannot open writer on a deep copy");
+       }
+       if (rfile == null) {
+         rfile = File.createTempFile("TestRFile", ".rf");
+       }
+       FileSystem fs = FileSystem.newInstance(conf);
+       Path path = new Path("file://" + rfile.toString());
+       dos = fs.create(path, true);
 -      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration);
 -      writer = new RFile.Writer(_cbw, 1000, 1000);
++      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos),
"gz", conf, accumuloConfiguration);
++      SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration);
++      Sampler sampler = null;
++      if (samplerConfig != null) {
++        sampler = SamplerFactory.newSampler(samplerConfig, accumuloConfiguration);
++      }
++      writer = new RFile.Writer(_cbw, 1000, 1000, samplerConfig, sampler);
+ 
+       if (startDLG)
+         writer.startDefaultLocalityGroup();
+     }
+ 
+     public void openWriter() throws IOException {
+       openWriter(true);
+     }
+ 
+     public void closeWriter() throws IOException {
+       if (deepCopy) {
+         throw new IOException("Cannot open writer on a deepcopy");
+       }
+       dos.flush();
+       writer.close();
+       dos.flush();
+       dos.close();
+     }
+ 
+     public void openReader() throws IOException {
+       FileSystem fs = FileSystem.newInstance(conf);
+       Path path = new Path("file://" + rfile.toString());
+ 
+       // the caches used to obfuscate the multithreaded issues
+       CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, null,
null, AccumuloConfiguration.getDefaultConfiguration());
+       reader = new RFile.Reader(_cbr);
+       iter = new ColumnFamilySkippingIterator(reader);
+ 
+       checkIndex(reader);
+     }
+ 
+     public void closeReader() throws IOException {
+       reader.close();
+     }
+ 
+     public void seek(Key nk) throws IOException {
+       iter.seek(new Range(nk, null), EMPTY_COL_FAMS, false);
+     }
+   }
+ 
+   static Key nk(String row, String cf, String cq, String cv, long ts) {
+     return new Key(row.getBytes(), cf.getBytes(), cq.getBytes(), cv.getBytes(), ts);
+   }
+ 
+   static Value nv(String val) {
+     return new Value(val.getBytes());
+   }
+ 
+   public AccumuloConfiguration conf = null;
+ 
+   @Test
+   public void testMultipleReaders() throws IOException {
+     final List<Throwable> threadExceptions = Collections.synchronizedList(new ArrayList<Throwable>());
 -    Map<String,MutableInt> messages = new HashMap<String,MutableInt>();
 -    Map<String,String> stackTrace = new HashMap<String,String>();
++    Map<String,MutableInt> messages = new HashMap<>();
++    Map<String,String> stackTrace = new HashMap<>();
+ 
+     final TestRFile trfBase = new TestRFile(conf);
+ 
+     writeData(trfBase);
+ 
+     trfBase.openReader();
+ 
+     try {
+ 
+       validate(trfBase);
+ 
+       final TestRFile trfBaseCopy = trfBase.deepCopy();
+ 
+       validate(trfBaseCopy);
+ 
+       // now start up multiple RFile deepcopies
+       int maxThreads = 10;
+       String name = "MultiThreadedRFileTestThread";
+       ThreadPoolExecutor pool = new ThreadPoolExecutor(maxThreads + 1, maxThreads + 1, 5
* 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+           new NamingThreadFactory(name));
+       pool.allowCoreThreadTimeOut(true);
+       try {
+         Runnable runnable = new Runnable() {
+           @Override
+           public void run() {
+             try {
+               TestRFile trf = trfBase;
+               synchronized (trfBaseCopy) {
+                 trf = trfBaseCopy.deepCopy();
+               }
+               validate(trf);
+             } catch (Throwable t) {
+               threadExceptions.add(t);
+             }
+           }
+         };
+         for (int i = 0; i < maxThreads; i++) {
+           pool.submit(runnable);
+         }
+       } finally {
+         pool.shutdown();
+         try {
+           pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+         } catch (InterruptedException e) {
+           e.printStackTrace();
+         }
+       }
+ 
+       for (Throwable t : threadExceptions) {
+         String msg = t.getClass() + " : " + t.getMessage();
+         if (!messages.containsKey(msg)) {
+           messages.put(msg, new MutableInt(1));
+         } else {
+           messages.get(msg).increment();
+         }
+         StringWriter string = new StringWriter();
+         PrintWriter writer = new PrintWriter(string);
+         t.printStackTrace(writer);
+         writer.flush();
+         stackTrace.put(msg, string.getBuffer().toString());
+       }
+     } finally {
+       trfBase.closeReader();
+       trfBase.close();
+     }
+ 
+     for (String message : messages.keySet()) {
+       LOG.error(messages.get(message) + ": " + message);
+       LOG.error(stackTrace.get(message));
+     }
+ 
+     assertTrue(threadExceptions.isEmpty());
+   }
+ 
+   private void validate(TestRFile trf) throws IOException {
+     Random random = new Random();
+     for (int iteration = 0; iteration < 10; iteration++) {
+       int part = random.nextInt(4);
+ 
+       Range range = new Range(getKey(part, 0, 0), true, getKey(part, 4, 2048), true);
+       trf.iter.seek(range, EMPTY_COL_FAMS, false);
+ 
+       Key last = null;
+       for (int locality = 0; locality < 4; locality++) {
+         for (int i = 0; i < 2048; i++) {
+           Key key = getKey(part, locality, i);
+           Value value = getValue(i);
+           assertTrue("No record found for row " + part + " locality " + locality + " index
" + i, trf.iter.hasTop());
+           assertEquals("Invalid key found for row " + part + " locality " + locality + "
index " + i, key, trf.iter.getTopKey());
+           assertEquals("Invalie value found for row " + part + " locality " + locality +
" index " + i, value, trf.iter.getTopValue());
+           last = trf.iter.getTopKey();
+           trf.iter.next();
+         }
+       }
+       if (trf.iter.hasTop()) {
+         assertFalse("Found " + trf.iter.getTopKey() + " after " + last + " in " + range,
trf.iter.hasTop());
+       }
+ 
+       range = new Range(getKey(4, 4, 0), true, null, true);
+       trf.iter.seek(range, EMPTY_COL_FAMS, false);
+       if (trf.iter.hasTop()) {
+         assertFalse("Found " + trf.iter.getTopKey() + " in " + range, trf.iter.hasTop());
+       }
+     }
+ 
+     Range range = new Range((Key) null, null);
+     trf.iter.seek(range, EMPTY_COL_FAMS, false);
+ 
+     Key last = null;
+     for (int part = 0; part < 4; part++) {
+       for (int locality = 0; locality < 4; locality++) {
+         for (int i = 0; i < 2048; i++) {
+           Key key = getKey(part, locality, i);
+           Value value = getValue(i);
+           assertTrue("No record found for row " + part + " locality " + locality + " index
" + i, trf.iter.hasTop());
+           assertEquals("Invalid key found for row " + part + " locality " + locality + "
index " + i, key, trf.iter.getTopKey());
+           assertEquals("Invalie value found for row " + part + " locality " + locality +
" index " + i, value, trf.iter.getTopValue());
+           last = trf.iter.getTopKey();
+           trf.iter.next();
+         }
+       }
+     }
+ 
+     if (trf.iter.hasTop()) {
+       assertFalse("Found " + trf.iter.getTopKey() + " after " + last + " in " + range, trf.iter.hasTop());
+     }
+   }
+ 
+   private void writeData(TestRFile trfBase) throws IOException {
+     trfBase.openWriter(false);
+ 
+     try {
+       for (int locality = 1; locality < 4; locality++) {
+         trfBase.writer.startNewLocalityGroup("locality" + locality, Collections.singleton((ByteSequence)
(new ArrayByteSequence(getCf(locality)))));
+         for (int part = 0; part < 4; part++) {
+           for (int i = 0; i < 2048; i++) {
+             trfBase.writer.append(getKey(part, locality, i), getValue(i));
+           }
+         }
+       }
+ 
+       trfBase.writer.startDefaultLocalityGroup();
+       for (int part = 0; part < 4; part++) {
+         for (int i = 0; i < 2048; i++) {
+           trfBase.writer.append(getKey(part, 0, i), getValue(i));
+         }
+       }
+     } finally {
+       trfBase.closeWriter();
+     }
+   }
+ 
+   private Key getKey(int part, int locality, int index) {
+     String row = "r000" + part;
+     String cf = getCf(locality);
+     String cq = "cq" + pad(index);
+ 
+     return nk(row, cf, cq, "", 1);
+   }
+ 
+   private String pad(int val) {
+     String valStr = String.valueOf(val);
+     switch (valStr.length()) {
+       case 1:
+         return "000" + valStr;
+       case 2:
+         return "00" + valStr;
+       case 3:
+         return "0" + valStr;
+       default:
+         return valStr;
+     }
+   }
+ 
+   private Value getValue(int index) {
+     return nv("" + index);
+   }
+ 
+   private String getCf(int locality) {
+     return "cf" + locality;
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7dca4ab2/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 069077c,15729c5..3833564
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@@ -77,9 -67,6 +77,7 @@@ import org.apache.hadoop.fs.FileSystem
  import org.apache.hadoop.fs.PositionedReadable;
  import org.apache.hadoop.fs.Seekable;
  import org.apache.hadoop.io.Text;
- import org.apache.log4j.Level;
- import org.apache.log4j.Logger;
 +import org.junit.Assert;
  import org.junit.Rule;
  import org.junit.Test;
  import org.junit.rules.TemporaryFolder;


Mime
View raw message