accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Keys Botzum <kbot...@maprtech.com>
Subject Re: Accumulo on MapR - Compaction Test
Date Wed, 02 May 2012 17:34:32 GMT
I need a bit more help. I really appreciate the help already provided by Kevin and Eric.

We've been testing Accumulo 1.4.0 on additional hardware platforms and have hit an unexpected
issue. The compaction auto test (test/system/auth) fails. Interestingly, it fails every time
on one machine and intermittently on another which makes me suspect it is some kind of race
condition. At this point I can easily reproduce the problem and what I observe is that when
the failure occurs, it always occurs in the same block of code but not on the same file. 

To be clear, when I run the following test:

	/run.py -t compact -d

I get this exception in the tserver log:

02 08:41:15,944 [tabletserver.TabletServer] WARN : exception while scanning tablet 1<<
java.io.IOException: invalid distance too far back
        at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
        at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221)
        at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:81)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:75)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:63)
        at java.io.DataInputStream.readInt(DataInputStream.java:370)
        at org.apache.accumulo.core.data.Value.readFields(Value.java:161)
        at org.apache.accumulo.core.file.map.MySequenceFile$Reader.getCurrentValue(MySequenceFile.java:1773)
        at org.apache.accumulo.core.file.map.MySequenceFile$Reader.next(MySequenceFile.java:1893)
        at org.apache.accumulo.core.file.map.MyMapFile$Reader.next(MyMapFile.java:678)
        at org.apache.accumulo.core.file.map.MyMapFile$Reader.next(MyMapFile.java:799)
        at org.apache.accumulo.core.file.map.MapFileOperations$RangeIterator.next(MapFileOperations.java:111)
        at org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
        at org.apache.accumulo.core.iterators.SkippingIterator.next(SkippingIterator.java:29)
        at org.apache.accumulo.server.problems.ProblemReportingIterator.next(ProblemReportingIterator.java:77)
        at org.apache.accumulo.core.iterators.system.HeapIterator.next(HeapIterator.java:88)
        at org.apache.accumulo.core.iterators.system.DeletingIterator.next(DeletingIterator.java:58)
        at org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
        at org.apache.accumulo.core.iterators.Filter.next(Filter.java:58)
        at org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
        at org.apache.accumulo.core.iterators.Filter.next(Filter.java:58)
        at org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
        at org.apache.accumulo.core.iterators.user.VersioningIterator.skipRowColumn(VersioningIterator.java:103)
        at org.apache.accumulo.core.iterators.user.VersioningIterator.next(VersioningIterator.java:53)
        at org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.readNext(SourceSwitchingIterator.java:120)
        at org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.next(SourceSwitchingIterator.java:105)
        at org.apache.accumulo.server.tabletserver.Tablet.nextBatch(Tablet.java:1766)
        at org.apache.accumulo.server.tabletserver.Tablet.access$3200(Tablet.java:143)
        at org.apache.accumulo.server.tabletserver.Tablet$Scanner.read(Tablet.java:1883)
        at org.apache.accumulo.server.tabletserver.TabletServer$ThriftClientHandler$NextBatchTask.run(TabletServer.java:905)
        at org.apache.accumulo.cloudtrace.instrument.TraceRunnable.run(TraceRunnable.java:47)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

Don't worry about the line numbers being slightly off from the official source, that's an
artifact of my having added a lot of comments and trace code to figure out what is happening.
I've added debug statements as follows to the code in MySequenceFile.java and Value.java.

Value
public void readFields(final DataInput in) throws IOException {
System.err.println(new java.util.Date() + "KDB: " + java.lang.Thread.currentThread().getId()
+ ": object id " + in.hashCode() +", readInt()...");
    int len;
    try {
        len = in.readInt();
        } catch (IOException e) {

System.err.println(new java.util.Date() + "KDB: " + java.lang.Thread.currentThread().getId()
+ ": readInt() resulted in FAILURE");
        throw e;
        }
System.err.println(new java.util.Date() + "KDB: " + java.lang.Thread.currentThread().getId()
+ ": readInt() resulted in SUCCESS");
    this.value = new byte[len];
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": readFully() value
of this many bytes: " + len);System.err.flush();
    in.readFully(this.value, 0, this.value.length);
  }

MySequenceFile
    /** Read a compressed buffer */
    private synchronized void readBuffer(DataInputBuffer buffer, CompressionInputStream filter)
throws IOException {
      // Read data into a temporary buffer
      DataOutputBuffer dataBuffer = new DataOutputBuffer();

      try {
        int dataBufferLength = WritableUtils.readVInt(in);
        dataBuffer.write(in, dataBufferLength);

        // Set up 'buffer' connected to the input-stream:
        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": dataBuffer.getLength()
" + dataBuffer.getLength() +  ", buffer.getLength() = " + buffer.getLength() + ", buffer.getPosition()
= " + buffer.getPosition());

      } finally {
        dataBuffer.close();
      }

      // Reset the codec
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": resetState()");
      filter.resetState();
    }

    private synchronized void seekToCurrentValue() throws IOException {
      if (!blockCompressed) {
        if (decompress) {
          valInFilter.resetState();
        }
        valBuffer.reset();
      } else {
        // Check if this is the first value in the 'block' to be read
        if (lazyDecompress && !valuesDecompressed) {
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": seekToCurrentValue
calling readBuffer ");
          // Read the value lengths and values
          readBuffer(valLenBuffer, valLenInFilter);
          readBuffer(valBuffer, valInFilter);
          noBufferedValues = noBufferedRecords;
          valuesDecompressed = true;
// KDB hack
          //valIn.reset();
        }

        // Calculate the no. of bytes to skip
        // Note: 'current' key has already been read!
        int skipValBytes = 0;
        int currentKey = noBufferedKeys + 1;
        if (noBufferedValues <= noBufferedKeys) {
          throw new IOException("Cannot seek to current value twice");
        }

        for (int i = noBufferedValues; i > currentKey; --i) {
          skipValBytes += WritableUtils.readVInt(valLenIn);
          --noBufferedValues;
        }
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": seekToCurrentValue
skipValBytes = " + skipValBytes +", valBuffer.getPosition() = " + valBuffer.getPosition());

        // Skip to the 'val' corresponding to 'current' key
        if (skipValBytes > 0) {
          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
            throw new IOException("Failed to seek to " + currentKey + "(th) value!");
          }
        }
      }
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": seekToCurrentValue
valIn.available() = " + valIn.available() + ", valBuffer.getPosition() = " + valBuffer.getPosition()
+ ", getLength() = " + valBuffer.getLength());
    }

    public synchronized void getCurrentValue(Writable val) throws IOException {
      if (val instanceof Configurable) {
        ((Configurable) val).setConf(this.conf);
      }

      // Position stream to 'current' value
      seekToCurrentValue();

      if (!blockCompressed) {
        val.readFields(valIn);

        if (valIn.read() > 0) {
          LOG.info("available bytes: " + valIn.available());
          throw new IOException(val + " read " + (valBuffer.getPosition() - keyLength) + "
bytes, should read " + (valBuffer.getLength() - keyLength));
        }
      } else {
        // Get the value
        int valLength = WritableUtils.readVInt(valLenIn);
        String str = "KDB: " + java.lang.Thread.currentThread().getId() + ": Attempt readFields().
valLength = " + valLength + ", valIn.available() is " + valIn.available() ;
        System.err.println(str);
        System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": " + file);
        System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + " valBuffer.getPosition()
= " + valBuffer.getPosition() + ", getLength() = " + valBuffer.getLength());

        val.readFields(new DataInputStream(valInFilter));

        //val.readFields(valIn);

        // Read another compressed 'value'
        --noBufferedValues;

        // Sanity check
        if (valLength < 0) {
          LOG.debug(val + " is a zero-length value");
        }
      }

    }



When run, this is the sequence that yields a failure. The thread and file change every time.

KDB: 195: dataBuffer.getLength() 18864, buffer.getLength() = 18864, buffer.getPosition() =
0
KDB: 195: resetState()
KDB: 195: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 0
KDB: 195: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() = 0, getLength()
= 18864
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 152878657, readInt()...
KDB: 195: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 195: /user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I000001j.map/data
KDB: 195 valBuffer.getPosition() = 0, getLength() = 18864
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
Wed May 02 08:41:15 PDT 2012KDB: 195: object id 1041146387, readInt()...
KDB: 174: call next(key)
KDB: 174: readVInt
KDB: 174: readFields KeyIn
KDB: 174: done with next(key), more = true key=row_0000387268 colf:col_00000 [L1&L2&G1&GROUP2]
1 false
KDB: 174: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 40960
KDB: 174: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() = 40960, getLength()
= 42744
KDB: 174: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 174: /user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I0000017.map/data
KDB: 174 valBuffer.getPosition() = 40960, getLength() = 42744
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 850570553, readInt()...
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
KDB: 174: call next(key)
KDB: 174: readVInt
KDB: 174: readFields KeyIn
KDB: 174: done with next(key), more = true key=row_0000387269 colf:col_00000 [L1&L2&G1&GROUP2]
1 false
KDB: 174: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 40960
KDB: 174: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() = 40960, getLength()
= 42744
KDB: 174: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 174: /user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I0000017.map/data
KDB: 174 valBuffer.getPosition() = 40960, getLength() = 42744
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 1888129839, readInt()...
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
KDB: 174: call next(key)
Wed May 02 08:41:15 PDT 2012KDB: 195: readInt() resulted in FAILURE

I'm trying to eliminate compression as part of the issue, but I haven't been able to. I disabled
compression by editing TestUtils.py and I can see this in the tserver log:

02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.blocksize = 100K
02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.blocksize.index = 128K
02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.type = none

But it's pretty clear the files are still compressed. I'd like to disable compression if you
think it would be useful to get us a clearer picture of the issue. Is there a way to do that?

I've tried a number of things that have lead to dead ends, but basically I think this comes
down to the following:
- next() is being used to get the next chunk of data
- we successfully obtain the keys
- we successfully read value data from the input stream as raw bytes (still compressed) into
the internal buffer (valBuffer) which is indirectly referenced by the valIn stream
- as soon as we try to read the first integer from the valIn stream, we get the exception
above.
- I can tell from my trace that the valBuffer has seems to have plenty of bytes, so I don't
think this is something as simple as reading past the end of the buffer, but I could be wrong.
- the failure occurs "randomly" in the sense that the thread, buffer size, and even file being
read when this fails, varies.

It's not clear to me if the problem is that the file is corrupted already or that the reading
of the file is hitting a timing condition.


Any ideas much appreciated. I can of course provide any tracing you desire or make code changes.

Thank you,
Keys
________________________________
Keys Botzum
Senior Principal Technologist
WW Systems Engineering
kbotzum@maprtech.com
443-718-0098
MapR Technologies
http://www.mapr.com

Mime
View raw message