cxf-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "avidd (JIRA)" <j...@apache.org>
Subject [jira] [Created] (CXF-6776) MTOM client receives bogus
Date Mon, 15 Feb 2016 14:45:18 GMT
avidd created CXF-6776:
--------------------------

             Summary: MTOM client receives bogus
                 Key: CXF-6776
                 URL: https://issues.apache.org/jira/browse/CXF-6776
             Project: CXF
          Issue Type: Bug
    Affects Versions: 3.1.5, 3.1.1
         Environment: Windows 7 Professional 64 bit, java 1.8.0_45
            Reporter: avidd


We have a SOAP web service that transmits large structured results sets (a list of rows each
of which is a List<String>) using MTOM. We have now migrated to CXF due to some other
bug in jaxws-spring. This (part of the) code seemed to work fine in jaxws-spring but we had
strange problems there which we debugged to the lower layers as well. Now we have the following
situation:

There are test cases that return result sets of several thousand rows each of which has several
columns in it. The result set is "streamed" via MTOM. These tests fail sporadically every
10th or 20th time.
This is what I found out until now:
* This is reproducible on different machines.
* I first saw this with CXF 3.1.1. Then I upgraded to 3.1.5 and it still occurs. As said before
there were similar bugs with jaxws-spring which showed different failures.
* The server cleanly writes all results into the output and flushes the output stream before
closing it
* Both, client (a JUnit Test) and server (a Jetty server started by that test) are running
on the same machine, actually in the same VM. 
* If I enable some of the logging in the below class (QueryResponse), this seems to reduce
the probability of the error. I was not able to reproduce the error with the logging enabled.
This is very frustrating and I think it hints at some race condition.

Our streaming result encodes rows like this:
* 4 bytes stating the number of columns in that row as an int (trailing nulls are cut off)
* for each column: 4 bytes stating the number of bytes in that column's value as an int
* the said number of bytes, will be converted to a string

*The test*
* If the test runs fine, the client receives 2077 containing 15 columns each. 
* Each column is a double or a string, it's basically the result of a simple SQL query. (Of
course, if it actually were SQL we would use JDBC. It's actually a SAP data warehouse.)
* If the test fails, which it only does sporadically, it is always the 14th row failing.

Example of failure: 
When it is correctly received at the client side, then this is the bytes of row 14 of test
2. 
* first 4 bytes: we have 15 columns
* next 4 bytes: the first column has 2 bytes
* next 2 bytes: "BA"
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...]
{code}

This is the erroneous bytes as received on the client side
* first 4 bytes: we have 15 columns
* next 4 bytes: the first columns has 2 bytes
* next 2 bytes: "BM"
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...]
{code}

At first glance, it looks like a byte was added (because of the three consecutive 0s). So
I thought it may be an off-by-one error. But when looking at the 2's complement I see:
{code}
good: 65:     01000001
bad:  77, 71: 01001101 0100111
{code}
I can't see where a byte would have been inserted. It also doesn't look like only bits were
flipped. It rather looks like total bogus to me.

Similar but different problems first occurred when we upgraded from Java 7 to Java 8 a year
ago. Before, everything was fine. At that time we were using jaxws-spring and had the situation
that sometimes the mime-boundary at the end of the attachment was missing. We hoped that an
upgrade to CXF would fix the problem but now we even have issues with the data.
To me it looks like something is totally broken in the JRE but this only shows up when using
MTOM, so it may be some integration problem. I wonder whether we are the only ones seeing
this behavior.

This is my "streaming result set" class:

{code}
@XmlAccessorType ( XmlAccessType.NONE )
@XmlRootElement ( name = "streamResponse" )
@XmlType ( name = "streamResponseType" )
public class QueryResponse {
  private static final String MIME_TYPE_OCTET_STREAM = "application/octet-stream";
  private static final Logger LOG = LoggerFactory.getLogger(QueryResponse.class);
  private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
  private static final ObjectPool<String> STRINGS = ObjectPool.stringPool();
  private static final int BUFFER_SIZE = 100;
  private static final int PREMATURE_END_OF_RESULT = -1;
  private static final byte[] PREMATURE_END_OF_RESULT_BYTES = 
      ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array();
  private volatile int totalBytesRead = 0;
  private volatile int rowsRead = 0;
  private volatile int rowsWritten = 0;

  private DataHandler results;
  private BlockingQueue<List<String>> resultSet;
  private ResultSetIterator<String> resultSetIterator;
  private boolean receiving = true;

  /** Create a new response. This constructor is used by for unmarshalling the response. */
  QueryResponse() { }
  
  /**
   * Create a new result set encapsulating the given components.
   * @param aResults the result iterator
   */
  QueryResponse(AutoCloseableIterator<List<String>> aResults) {
    results = encode(aResults);
  }
  
  @XmlElement ( required = true )
  @XmlMimeType ( MIME_TYPE_OCTET_STREAM )
  DataHandler getResults() {
    return results;
  }
  
  /**
   * Set the result set, called by JAXB.
   * @param aDataHandler the data handler
   */
  void setResults(DataHandler aDataHandler) {
    if ( aDataHandler == null ) { throw new NullPointerException("aDataHandler"); }
    if ( resultSet != null ) { throw new IllegalStateException("Result set already exists.");
}
    results = aDataHandler;

    // Pipelining
    /* parse results and fill queue while loading from the network */
    resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE);
    resultSetIterator = new ResultSetIterator<String>(resultSet);
    DataHandler dataHandler = results;
    try {
      decode(dataHandler, resultSet, resultSetIterator);
    } catch ( InterruptedException e ) {
      Thread.currentThread().interrupt();
    }
  }
  
  /**
   * Called on the client side to get a streaming and blocking iterator.
   * @return the result set as a blocking iterator
   */
  public Iterator<List<String>> getResultSet() {
    return resultSetIterator; 
  }
  
  private int fill(byte[] bytes, InputStream in) throws IOException {
    int off = 0;
    int readCount = 0;
    do {
      // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off ) > 0
and not EOF
      readCount = in.read(bytes, off, bytes.length - off);  
      off += readCount;
      if ( readCount > 0 ) {
        totalBytesRead += readCount;
      }
    } while ( readCount > 0 && off < bytes.length );
    if ( off > 0 && off < bytes.length ) { // end of stream is a correct termination
      try {
        readCount = in.read(bytes, off, bytes.length - off);
        String exception = readException(in);
        throw new RuntimeException(exception);
      } catch ( EOFException e ) {
        // There was no exception written by the server, just a premature end of the stream
causing the client-side EOF.
        throw new RuntimeException("Premature end of stream, total bytes read: " + totalBytesRead);

      }
    }
    return off;
  }

  private static void checkException(int len, InputStream in) throws ClassNotFoundException,
IOException {
    if ( len == PREMATURE_END_OF_RESULT ) {
      String exception = readException(in);
      throw new RuntimeException(exception);
    }
  }

  private static String readException(InputStream in) throws IOException {
    ObjectInputStream objIn = new ObjectInputStream(in);
    try {
      Object object = objIn.readObject();
      if ( object != null ) {
        return object.toString();
      }
    } catch ( ClassNotFoundException e ) {
      throw new RuntimeException("Could not read exception.", e);
    }
    return "No exception received from service after premature end of result";
  }

  private DataHandler encode(AutoCloseableIterator<List<String>> aResults) {
    assert ( aResults != null );
    final PipedOutputStream out = new PipedOutputStream();
    DataHandler dh = new DataHandler(new StreamDataSource(out, MIME_TYPE_OCTET_STREAM));
    Encoder encoder = new Encoder(out, aResults, new ServerExceptionHandler());
    new Thread(encoder).start();
    return dh;
  }
  
  private void decode(
      DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet, ExceptionHandler
exceptionHandler) 
      throws InterruptedException {
    Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler);
    new Thread(decoder).start();
  }
  
  private void awaitIteratorBufferNotFull() throws InterruptedException {
    while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); }
  }

  private void awaitElements() throws InterruptedException {
    while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); }
  }

  private static final class StreamDataSource implements DataSource {
    private final String name = UUID.randomUUID().toString();
    private final InputStream in;
    private final String mimeType;

    private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
      ArgumentChecks.checkNotNull(aOut, "aOut");
      ArgumentChecks.checkNotNull(aMimeType, "aMimeType");
      try {
        in = new PipedInputStream(aOut);
      } catch ( IOException e ) {
        throw new RuntimeException("Could not create input stream.", e);
      }
      mimeType = aMimeType;
    }

    @Override public String getName() { return name; }

    @Override public String getContentType() { return mimeType; }

    /**
     * {@inheritDoc}
     * 
     * This implementation violates the specification in that it is destructive. Only the
first call will return an
     * appropriate input stream.
     */
    @Override public InputStream getInputStream() { return in; }

    @Override public OutputStream getOutputStream() { throw new UnsupportedOperationException();
}
  }
  
  /**
   * Decodes the contents of an input stream as written by the {@link com.tn_ag.sap.QueryResponse.Encoder}
and writes
   * parsed rows to a {@link java.util.Queue}.
   */
  private class Decoder implements Runnable {
    private final DataHandler dataHandler;
    private final BlockingQueue<List<String>> resultSet;
    private final ExceptionHandler exceptionHandler;
    private final byte[] lenBytes = new byte[4];

    Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet,
ExceptionHandler aHandler) {
      ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler");
      ArgumentChecks.checkNotNull(aResultSet, "aResultSet");
      ArgumentChecks.checkNotNull(aHandler, "aHandler");
      dataHandler = aDataHandler;
      resultSet = aResultSet;
      exceptionHandler = aHandler;
    }
    
    @Override
    public void run() {
      InputStream in = null;
      List<String> row;
      int len;
      try { 
        in = dataHandler.getInputStream();
        
        while ( receiving ) {
          synchronized ( resultSet ) {
            receiving = fill(lenBytes, in) > 0;       // read next row's length in number
of columns 
            len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length to integer
            if ( !receiving || len == 0 ) { break; }
            checkException(len, in);                  // -1 signals an exception

            row = readRow(in, len);
            awaitIteratorBufferNotFull();
            resultSet.put(row);
            rowsRead++;
            if ( rowsRead % 1000 == 0 ) {
              LOG.debug("already received {} rows", rowsRead);
            }
            resultSet.notifyAll();                    // notify waiting consumer threads
          }
        }
        stopReception();
        LOG.debug("received a total of {} rows.", rowsRead);
      } catch ( InterruptedException e ) {
        LOG.info("Result reception interrupted.");
      } catch ( Exception e ) {
        exceptionHandler.handle(e);
        stopReception();
      } finally {
        receiving = false;
        try {
          if ( in != null ) { in.close(); }
        } catch ( IOException e ) {
          exceptionHandler.handle(e);
        }
      }
    }

    private List<String> readRow(InputStream in, int len) throws IOException {
      List<String> row = new ArrayList<>(len);           // create list of appropriate
length
      for ( int col = 0; col < len; col++ ) {            // for each column           
  
        fill(lenBytes, in);                              // read the value length (fixed for
some types if schema known)
        int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the length of the value
as bytes to an int
        final byte[] bytes = new byte[valLen];           // allocate a buffer of exactly the
required size
        fill(bytes, in);              
        row.add(STRINGS.internalize(new String(bytes)));
      }
      return row;
    }

    private void stopReception() {
      synchronized ( resultSet ) {
        receiving = false;           // we will stop parsing now
        resultSet.notifyAll();       // consumers can now consume the remaining elements from
the queue
        LOG.debug("Read " + rowsRead + " rows from binary stream");
      }                
    }
  }
  
  /**
   * Encodes the given result set and writes the result to an output stream. 
   */
  private class Encoder implements Runnable {
    private final OutputStream out;
    private final AutoCloseableIterator<List<String>> iterator;
    private final ExceptionHandler handler;

    Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults, ExceptionHandler
aHandler) {
      ArgumentChecks.checkNotNull(aOut, "aOut");
      ArgumentChecks.checkNotNull(aResults, "aResults");
      ArgumentChecks.checkNotNull(aHandler, "aHandler");
      out = aOut;
      iterator = aResults;
      handler = aHandler;
    }

    @Override
    public void run() {
      try ( AutoCloseableIterator<List<String>> iter = this.iterator; OutputStream
out = this.out ) {
        writeResultSet(iter, out);
        out.flush();
      } catch ( Exception e ) {
        handler.handle(e);
      }
    }

    private void writeResultSet(AutoCloseableIterator<List<String>> iter, OutputStream
out2) throws IOException {
      List<String> row = null;
      while ( this.iterator.hasNext() ) {
        try {
          row = this.iterator.next();
          writeRow(row);
          rowsWritten++;
        } catch ( Exception e ) {
          out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an exception instead
of row size
          writeException(out, e);                   // send exception to client for rethrowing
it there
          throw e;                                  // stop transmission, handle exception
on server-side (logging)
        }
      }
      LOG.info("wrote {} rows to binary stream, closing output stream", rowsWritten);
    }

    private void writeRow(List<String> row) throws IOException {
      out.write(ByteBuffer.allocate(4).putInt(row.size()).array());     // write row size
(in number of columns)
      for ( String s : row ) {
        if ( s == null ) { s = ""; }
        byte[] bytes = s.getBytes();
        out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); // write size of column
in bytes
        out.write(bytes);                                               // write column value
      }
    }

    private void writeException(OutputStream output, Exception e) throws IOException {
      ObjectOutputStream objOut = new ObjectOutputStream(output);
      objOut.writeObject(toString(e));
    }

    private String toString(Throwable e) {
      if ( e instanceof UncheckedConnectionException && e.getCause() != null ) {
        e = e.getCause();
      }
      ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream();
      PrintWriter writer = new PrintWriter(stackTraceOut);
      e.printStackTrace(writer);
      writer.flush();
      return new String(stackTraceOut.toByteArray());
    }
  }
  
  private final class ResultSetIterator<T> extends ExceptionHandler implements Iterator<List<T>>
{
    private final BlockingQueue<List<T>> queue;
    private Exception exception;
    private int returned = 0;

    private ResultSetIterator(BlockingQueue<List<T>> aQueue) {
      queue = aQueue;
    }

    /**
     * {@inheritDoc}
     * 
     * This implementation marks the current thread as interrupted if streaming could not
be commenced. 
     */
    @Override
    public boolean hasNext() {
      if ( exception != null ) { throw new UncheckedConnectionException("Exception while reading
results", exception); }
      try {
        synchronized ( resultSet ) {
          awaitElements();
          if ( exception != null ) { 
            throw new UncheckedConnectionException("Exception while reading results", exception);

          }
          if ( resultSet.isEmpty() ) {
            LOG.debug("iterator returned " + returned + " rows");
          }
          return !resultSet.isEmpty();
        }
      } catch ( InterruptedException e ) {
        Thread.currentThread().interrupt();
        return false;
      }
    
    }

    /**
     * {@inheritDoc}
     * 
     * If the current thread is interrupted during this call, it is marked as interrupted
and the method returns 
     * {@code null}.
     * 
     * @throws NoSuchElementException if called while {@link #hasNext()} returns {@code false}
     */
    @Override
    public List<T> next() throws NoSuchElementException {
      if ( exception != null ) { throw new IllegalStateException("Exception while reading
results", exception); }
      if ( !hasNext() ) { throw new NoSuchElementException(); }
      try {
        List<T> result = queue.take();
        synchronized ( resultSet ) { resultSet.notify(); }
        returned++;
        return result;
      } catch ( InterruptedException e ) {
        Thread.currentThread().interrupt();
        return null;
      }
    }

    /**
     * This method is not supported.
     */
    @Override public void remove() { throw new UnsupportedOperationException("RTFM"); }

    @Override void handle(Exception aException) {
      if ( exception != null ) { 
        LOG.warn("There was another exception.", aException);
      }
      exception = aException;
    }
  }
  
  private abstract static class ExceptionHandler {
    abstract void handle(Exception exception);
  }
  
  private static class ServerExceptionHandler extends ExceptionHandler {
    @Override
    void handle(Exception aException) {
      LOG.error(FATAL, "Exception while writing response.", aException);
    }
  }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message