accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Fuchs <adam.p.fu...@ugov.gov>
Subject Re: Accumulo on MapR - Compaction Test
Date Wed, 02 May 2012 22:14:30 GMT
Keys,

There's not really a way to change the compression type used by MyMapFile
within Accumulo -- it always uses block compression with the DefaultCodec
(gzip). However, if you want to write a separate standalone test that tries
to mimic this behavior then you can use one of the MyMapFile.Writer
constructors to specify the type of compression you want. Incidentally,
MyMapFile is legacy code and we're getting rid of it in version 1.5, but it
is well tested and we wouldn't expect to see this type of problem.

Cheers,
Adam



On Wed, May 2, 2012 at 1:35 PM, Keys Botzum <kbotzum@maprtech.com> wrote:

> I need a bit more help. I really appreciate the help already provided by
> Kevin and Eric.
>
> We've been testing Accumulo 1.4.0 on additional hardware platforms and
> have hit an unexpected issue. The compaction auto test (test/system/auth)
> fails. Interestingly, it fails every time on one machine and intermittently
> on another which makes me suspect it is some kind of race condition. At
> this point I can easily reproduce the problem and what I observe is that
> when the failure occurs, it always occurs in the same block of code but not
> on the same file.
>
> To be clear, when I run the following test:
>
> /run.py -t compact -d
>
> I get this exception in the tserver log:
>
> 02 08:41:15,944 [tabletserver.TabletServer] WARN : exception while
> scanning tablet 1<<
> java.io.IOException: invalid distance too far back
>         at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
> Method)
>         at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221)
>         at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:81)
>         at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:75)
>         at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:63)
> *        at java.io.DataInputStream.readInt(DataInputStream.java:370)*
> *        at org.apache.accumulo.core.data.Value.readFields(Value.java:161)
> *
> *        at
> org.apache.accumulo.core.file.map.MySequenceFile$Reader.getCurrentValue(MySequenceFile.java:1773)
> *
> *        at
> org.apache.accumulo.core.file.map.MySequenceFile$Reader.next(MySequenceFile.java:1893)
> *
>         at
> org.apache.accumulo.core.file.map.MyMapFile$Reader.next(MyMapFile.java:678)
>         at
> org.apache.accumulo.core.file.map.MyMapFile$Reader.next(MyMapFile.java:799)
>         at
> org.apache.accumulo.core.file.map.MapFileOperations$RangeIterator.next(MapFileOperations.java:111)
>         at
> org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
>         at
> org.apache.accumulo.core.iterators.SkippingIterator.next(SkippingIterator.java:29)
>         at
> org.apache.accumulo.server.problems.ProblemReportingIterator.next(ProblemReportingIterator.java:77)
>         at
> org.apache.accumulo.core.iterators.system.HeapIterator.next(HeapIterator.java:88)
>         at
> org.apache.accumulo.core.iterators.system.DeletingIterator.next(DeletingIterator.java:58)
>         at
> org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
>         at org.apache.accumulo.core.iterators.Filter.next(Filter.java:58)
>         at
> org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
>         at org.apache.accumulo.core.iterators.Filter.next(Filter.java:58)
>         at
> org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
>         at
> org.apache.accumulo.core.iterators.user.VersioningIterator.skipRowColumn(VersioningIterator.java:103)
>         at
> org.apache.accumulo.core.iterators.user.VersioningIterator.next(VersioningIterator.java:53)
>         at
> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.readNext(SourceSwitchingIterator.java:120)
>         at
> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.next(SourceSwitchingIterator.java:105)
>         at
> org.apache.accumulo.server.tabletserver.Tablet.nextBatch(Tablet.java:1766)
>         at
> org.apache.accumulo.server.tabletserver.Tablet.access$3200(Tablet.java:143)
>         at
> org.apache.accumulo.server.tabletserver.Tablet$Scanner.read(Tablet.java:1883)
>         at
> org.apache.accumulo.server.tabletserver.TabletServer$ThriftClientHandler$NextBatchTask.run(TabletServer.java:905)
>         at
> org.apache.accumulo.cloudtrace.instrument.TraceRunnable.run(TraceRunnable.java:47)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
>
> Don't worry about the line numbers being slightly off from the official
> source, that's an artifact of my having added a lot of comments and trace
> code to figure out what is happening. I've added debug statements as
> follows to the code in MySequenceFile.java and Value.java.
>
> *Value*
> public void readFields(final DataInput in) throws IOException {
> System.err.println(new java.util.Date() + "KDB: " +
> java.lang.Thread.currentThread().getId() + ": object id " + in.hashCode()
> +", readInt()...");
>     int len;
>     try {
>         len = in.readInt();
>         } catch (IOException e) {
>
> System.err.println(new java.util.Date() + "KDB: " +
> java.lang.Thread.currentThread().getId() + ": readInt() resulted in *
> FAILURE*");
>         throw e;
>         }
> System.err.println(new java.util.Date() + "KDB: " +
> java.lang.Thread.currentThread().getId() + ": readInt() resulted in
> SUCCESS");
>     this.value = new byte[len];
> System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
> readFully() value of this many bytes: " + len);System.err.flush();
>     in.readFully(this.value, 0, this.value.length);
>   }
>
> *MySequenceFile*
>     /** Read a compressed buffer */
>     private synchronized void readBuffer(DataInputBuffer buffer,
> CompressionInputStream filter) throws IOException {
>       // Read data into a temporary buffer
>       DataOutputBuffer dataBuffer = new DataOutputBuffer();
>
>       try {
>         int dataBufferLength = WritableUtils.readVInt(in);
>         dataBuffer.write(in, dataBufferLength);
>
>         // Set up 'buffer' connected to the input-stream:
>         buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
> System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
> dataBuffer.getLength() " + dataBuffer.getLength() +  ", buffer.getLength()
> = " + buffer.getLength() + ", buffer.getPosition() = " +
> buffer.getPosition());
>
>       } finally {
>         dataBuffer.close();
>       }
>
>       // Reset the codec
> System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
> resetState()");
>       filter.resetState();
>     }
>
>     private synchronized void seekToCurrentValue() throws IOException {
>       if (!blockCompressed) {
>         if (decompress) {
>           valInFilter.resetState();
>         }
>         valBuffer.reset();
>       } else {
>         // Check if this is the first value in the 'block' to be read
>         if (lazyDecompress && !valuesDecompressed) {
> System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
> seekToCurrentValue calling readBuffer ");
>           // Read the value lengths and values
>           readBuffer(valLenBuffer, valLenInFilter);
>           readBuffer(valBuffer, valInFilter);
>           noBufferedValues = noBufferedRecords;
>           valuesDecompressed = true;
> // KDB hack
>           //valIn.reset();
>         }
>
>         // Calculate the no. of bytes to skip
>         // Note: 'current' key has already been read!
>         int skipValBytes = 0;
>         int currentKey = noBufferedKeys + 1;
>         if (noBufferedValues <= noBufferedKeys) {
>           throw new IOException("Cannot seek to current value twice");
>         }
>
>         for (int i = noBufferedValues; i > currentKey; --i) {
>           skipValBytes += WritableUtils.readVInt(valLenIn);
>           --noBufferedValues;
>         }
> System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
> seekToCurrentValue skipValBytes = " + skipValBytes +",
> valBuffer.getPosition() = " + valBuffer.getPosition());
>
>         // Skip to the 'val' corresponding to 'current' key
>         if (skipValBytes > 0) {
>           if (valIn.skipBytes(skipValBytes) != skipValBytes) {
>             throw new IOException("Failed to seek to " + currentKey +
> "(th) value!");
>           }
>         }
>       }
> System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
> seekToCurrentValue valIn.available() = " + valIn.available() + ",
> valBuffer.getPosition() = " + valBuffer.getPosition() + ", getLength() = "
> + valBuffer.getLength());
>     }
>
>     public synchronized void getCurrentValue(Writable val) throws
> IOException {
>       if (val instanceof Configurable) {
>         ((Configurable) val).setConf(this.conf);
>       }
>
>       // Position stream to 'current' value
>       seekToCurrentValue();
>
>       if (!blockCompressed) {
>         val.readFields(valIn);
>
>         if (valIn.read() > 0) {
>           LOG.info("available bytes: " + valIn.available());
>           throw new IOException(val + " read " + (valBuffer.getPosition()
> - keyLength) + " bytes, should read " + (valBuffer.getLength() -
> keyLength));
>         }
>       } else {
>         // Get the value
>         int valLength = WritableUtils.readVInt(valLenIn);
>         String str = "KDB: " + java.lang.Thread.currentThread().getId() +
> ": Attempt readFields(). valLength = " + valLength + ", valIn.available()
> is " + valIn.available() ;
>         System.err.println(str);
>         System.err.println("KDB: " +
> java.lang.Thread.currentThread().getId() + ": " + file);
>         System.err.println("KDB: " +
> java.lang.Thread.currentThread().getId() + " valBuffer.getPosition() = " +
> valBuffer.getPosition() + ", getLength() = " + valBuffer.getLength());
>
>         val.readFields(new DataInputStream(valInFilter));
>
>         //val.readFields(valIn);
>
>         // Read another compressed 'value'
>         --noBufferedValues;
>
>         // Sanity check
>         if (valLength < 0) {
>           LOG.debug(val + " is a zero-length value");
>         }
>       }
>
>     }
>
>
>
> When run, this is the sequence that yields a failure. The thread and file
> change every time.
>
> KDB: 195: dataBuffer.getLength() 18864, buffer.getLength() = 18864,
> buffer.getPosition() = 0
> KDB: 195: resetState()
> KDB: 195: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 0
> KDB: 195: seekToCurrentValue valIn.available() = 1,
> valBuffer.getPosition() = 0, getLength() = 18864
> Wed May 02 08:41:15 PDT 2012KDB: 174: object id 152878657, readInt()...
> KDB: 195: Attempt readFields(). valLength = 54, valIn.available() is 1
> KDB: 195:
> /user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I000001j.map/data
> KDB: 195 valBuffer.getPosition() = 0, getLength() = 18864
> Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
> KDB: 174: readFully() value of this many bytes: 50
> Wed May 02 08:41:15 PDT 2012KDB: 195: object id 1041146387, readInt()...
> KDB: 174: call next(key)
> KDB: 174: readVInt
> KDB: 174: readFields KeyIn
> KDB: 174: done with next(key), more = true key=row_0000387268
> colf:col_00000 [L1&L2&G1&GROUP2] 1 false
> KDB: 174: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() =
> 40960
> KDB: 174: seekToCurrentValue valIn.available() = 1,
> valBuffer.getPosition() = 40960, getLength() = 42744
> KDB: 174: Attempt readFields(). valLength = 54, valIn.available() is 1
> KDB: 174:
> /user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I0000017.map/data
> KDB: 174 valBuffer.getPosition() = 40960, getLength() = 42744
> Wed May 02 08:41:15 PDT 2012KDB: 174: object id 850570553, readInt()...
> Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
> KDB: 174: readFully() value of this many bytes: 50
> KDB: 174: call next(key)
> KDB: 174: readVInt
> KDB: 174: readFields KeyIn
> KDB: 174: done with next(key), more = true key=row_0000387269
> colf:col_00000 [L1&L2&G1&GROUP2] 1 false
> KDB: 174: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() =
> 40960
> KDB: 174: seekToCurrentValue valIn.available() = 1,
> valBuffer.getPosition() = 40960, getLength() = 42744
> KDB: 174: Attempt readFields(). valLength = 54, valIn.available() is 1
> KDB: 174:
> /user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I0000017.map/data
> KDB: 174 valBuffer.getPosition() = 40960, getLength() = 42744
> Wed May 02 08:41:15 PDT 2012KDB: 174: object id 1888129839, readInt()...
> Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
> KDB: 174: readFully() value of this many bytes: 50
> KDB: 174: call next(key)
> Wed May 02 08:41:15 PDT 2012KDB: 195: readInt() resulted in *FAILURE*
>
> I'm trying to eliminate compression as part of the issue, but I haven't
> been able to. I disabled compression by editing TestUtils.py and I can see
> this in the tserver log:
>
> 02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.blocksize =
> 100K
> 02 08:40:48,103 [server.Accumulo] INFO :
> table.file.compress.blocksize.index = 128K
> 02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.type = none
>
> But it's pretty clear the files are still compressed. I'd like to disable
> compression if you think it would be useful to get us a clearer picture of
> the issue. Is there a way to do that?
>
> I've tried a number of things that have lead to dead ends, but basically I
> think this comes down to the following:
> - next() is being used to get the next chunk of data
> - we successfully obtain the keys
> - we successfully read value data from the input stream as raw bytes
> (still compressed) into the internal buffer (valBuffer) which is indirectly
> referenced by the valIn stream
> - as soon as we try to read the first integer from the valIn stream, we
> get the exception above.
> - I can tell from my trace that the valBuffer has seems to have plenty of
> bytes, so I don't think this is something as simple as reading past the end
> of the buffer, but I could be wrong.
> - the failure occurs "randomly" in the sense that the thread, buffer size,
> and even file being read when this fails, varies.
>
> It's not clear to me if the problem is that the file is corrupted already
> or that the reading of the file is hitting a timing condition.
>
>
> Any ideas much appreciated. I can of course provide any tracing you desire
> or make code changes.
>
> Thank you,
> Keys
> ________________________________
> Keys Botzum
> Senior Principal Technologist
> WW Systems Engineering
> kbotzum@maprtech.com
> 443-718-0098
> MapR Technologies
> http://www.mapr.com
>

Mime
View raw message