cxf-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "avidd (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CXF-6776) MTOM client receives bogus
Date Wed, 24 Feb 2016 08:00:28 GMT

    [ https://issues.apache.org/jira/browse/CXF-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15160347#comment-15160347
] 

avidd commented on CXF-6776:
----------------------------

The issue is fixed. 
I had refactored the "minimal" SOAP example attached here to an equivalent REST example using
the same streamed QueryResponse. To do so, I had to do some refactorings. In this refactored
REST version all the errors did not occur. To cross-check I moved this refactored QueryResponse
to the SOAP example. This did actually fix all the issues. I cannot exactly say what caused
the problems. Most notably, I moved some state from the QueryResponse class to the inner classes
Encoder and Decoder. I could imagine that jaxws-spring (which we used earlier without problems)
did recreate some objects for every query whereas cxf does reuse some of the objects. 
Whatever, the problems are solved now.

> MTOM client receives bogus
> --------------------------
>
>                 Key: CXF-6776
>                 URL: https://issues.apache.org/jira/browse/CXF-6776
>             Project: CXF
>          Issue Type: Bug
>    Affects Versions: 3.1.1, 3.1.5
>         Environment: Windows 7 Professional 64 bit, java 1.8.0_45
>            Reporter: avidd
>         Attachments: CXF-6776-example.zip
>
>
> We have a SOAP web service that transmits large structured results sets (a list of rows
each of which is a List<String>) using MTOM. We have now migrated to CXF due to some
other bug in jaxws-spring. This (part of the) code seemed to work fine in jaxws-spring but
we had strange problems there which we debugged to the lower layers as well. Now we have the
situation that some single bytes as received by the client are not the bytes that were sent
by the server. This can show as a byte having a completely wrong value or as apparently a
byte added *and* the byte beside having the wrong value.
> There are test cases that return result sets of several thousand rows each of which has
several columns in it. The result set is "streamed" via MTOM. These tests fail sporadically
every 10th or 20th time.
> This is what I found out until now:
> * The error is reproducible on different machines.
> * I first saw this error with CXF 3.1.1. Then I upgraded to 3.1.5 and it still occurs.
As said before there were similar bugs with jaxws-spring which showed different failures.
> * The server cleanly writes all results into the output and flushes the output stream
before closing it
> * Both, client (a JUnit Test) and server (a Jetty server started by that test) are running
on the same machine, actually in the same VM. 
> * If I enable some of the logging in the below class (QueryResponse), this seems to reduce
the probability of the error. I was not able to reproduce the error with the logging enabled.
This is very frustrating and I think it hints at some race condition.
> Our streaming result encodes rows like this:
> * 4 bytes stating the number of columns in that row as an int (trailing nulls are cut
off)
> * for each column: 4 bytes stating the number of bytes in that column's value as an int
> * the said number of bytes, will be converted to a string
> *The test*
> * If the test runs fine, the client receives 2077 containing 15 columns each. 
> * Each column is a double or a string, it's basically the result of a simple SQL query.
(Of course, if it actually were SQL we would use JDBC. It's actually a SAP data warehouse.)
> * If the test fails, which it only does sporadically, it is always the 14th row failing.
> *Example 1 of failure*
> When it is correctly received at the client side, then this is the bytes of row 14 of
test 2. 
> * first 4 bytes: we have 15 columns
> * next 4 bytes: the first column has 2 bytes
> * next 2 bytes: "BA"
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...]
> {code}
> This is the erroneous bytes as received on the client side
> * first 4 bytes: we have 15 columns
> * next 4 bytes: the first columns has 2 bytes
> * next 2 bytes: "BM"
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...]
> {code}
> At first glance, it looks like a byte was added (because of the three consecutive 0s).
So I thought it may be an off-by-one error. But when looking at the 2's complement I see:
> {code}
> good: 65:     01000001
> bad:  77, 71: 01001101 0100111
> {code}
> I can't see where a byte would have been inserted. It also doesn't look like only bits
were flipped. It rather looks like total bogus to me.
> *Example 2 of Failure*
> * The bytes sent
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 51, 0, 0, 0,
2, ...]
> {code}
> * The bytes received
> {code}
>                                                                           XX
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 48, 0, 0, 0,
2,...]
> {code}
> Similar but different problems first occurred when we upgraded from Java 7 to Java 8
a year ago.  Before, everything was fine. At that time we were using jaxws-spring and had
the situation that sometimes the mime-boundary at the end of the attachment was missing. We
hoped that an upgrade to CXF would fix the problem but now we even have issues with the data.
> To me it looks like something is totally broken in the JRE but this only shows up when
using MTOM, so it may be some integration problem. I wonder whether we are the only ones seeing
this behavior.
> This is my "streaming result set" class:
> {code}
> @XmlAccessorType ( XmlAccessType.NONE )
> @XmlRootElement ( name = "streamResponse" )
> @XmlType ( name = "streamResponseType" )
> public class QueryResponse {
>   private static final String MIME_TYPE_OCTET_STREAM = "application/octet-stream";
>   private static final Logger LOG = LoggerFactory.getLogger(QueryResponse.class);
>   private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
>   private static final ObjectPool<String> STRINGS = ObjectPool.stringPool();
>   private static final int BUFFER_SIZE = 100;
>   private static final int PREMATURE_END_OF_RESULT = -1;
>   private static final byte[] PREMATURE_END_OF_RESULT_BYTES = 
>       ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array();
>   private volatile int totalBytesRead = 0;
>   private volatile int rowsRead = 0;
>   private volatile int rowsWritten = 0;
>   private DataHandler results;
>   private BlockingQueue<List<String>> resultSet;
>   private ResultSetIterator<String> resultSetIterator;
>   private boolean receiving = true;
>   /** Create a new response. This constructor is used by for unmarshalling the response.
*/
>   QueryResponse() { }
>   
>   /**
>    * Create a new result set encapsulating the given components.
>    * @param aResults the result iterator
>    */
>   QueryResponse(AutoCloseableIterator<List<String>> aResults) {
>     results = encode(aResults);
>   }
>   
>   @XmlElement ( required = true )
>   @XmlMimeType ( MIME_TYPE_OCTET_STREAM )
>   DataHandler getResults() {
>     return results;
>   }
>   
>   /**
>    * Set the result set, called by JAXB.
>    * @param aDataHandler the data handler
>    */
>   void setResults(DataHandler aDataHandler) {
>     if ( aDataHandler == null ) { throw new NullPointerException("aDataHandler"); }
>     if ( resultSet != null ) { throw new IllegalStateException("Result set already exists.");
}
>     results = aDataHandler;
>     // Pipelining
>     /* parse results and fill queue while loading from the network */
>     resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE);
>     resultSetIterator = new ResultSetIterator<String>(resultSet);
>     DataHandler dataHandler = results;
>     try {
>       decode(dataHandler, resultSet, resultSetIterator);
>     } catch ( InterruptedException e ) {
>       Thread.currentThread().interrupt();
>     }
>   }
>   
>   /**
>    * Called on the client side to get a streaming and blocking iterator.
>    * @return the result set as a blocking iterator
>    */
>   public Iterator<List<String>> getResultSet() {
>     return resultSetIterator; 
>   }
>   
>   private int fill(byte[] bytes, InputStream in) throws IOException {
>     int off = 0;
>     int readCount = 0;
>     do {
>       // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off ) >
0 and not EOF
>       readCount = in.read(bytes, off, bytes.length - off);  
>       off += readCount;
>       if ( readCount > 0 ) {
>         totalBytesRead += readCount;
>       }
>     } while ( readCount > 0 && off < bytes.length );
>     if ( off > 0 && off < bytes.length ) { // end of stream is a correct
termination
>       try {
>         readCount = in.read(bytes, off, bytes.length - off);
>         String exception = readException(in);
>         throw new RuntimeException(exception);
>       } catch ( EOFException e ) {
>         // There was no exception written by the server, just a premature end of the
stream causing the client-side EOF.
>         throw new RuntimeException("Premature end of stream, total bytes read: " + totalBytesRead);

>       }
>     }
>     return off;
>   }
>   private static void checkException(int len, InputStream in) throws ClassNotFoundException,
IOException {
>     if ( len == PREMATURE_END_OF_RESULT ) {
>       String exception = readException(in);
>       throw new RuntimeException(exception);
>     }
>   }
>   private static String readException(InputStream in) throws IOException {
>     ObjectInputStream objIn = new ObjectInputStream(in);
>     try {
>       Object object = objIn.readObject();
>       if ( object != null ) {
>         return object.toString();
>       }
>     } catch ( ClassNotFoundException e ) {
>       throw new RuntimeException("Could not read exception.", e);
>     }
>     return "No exception received from service after premature end of result";
>   }
>   private DataHandler encode(AutoCloseableIterator<List<String>> aResults)
{
>     assert ( aResults != null );
>     final PipedOutputStream out = new PipedOutputStream();
>     DataHandler dh = new DataHandler(new StreamDataSource(out, MIME_TYPE_OCTET_STREAM));
>     Encoder encoder = new Encoder(out, aResults, new ServerExceptionHandler());
>     new Thread(encoder).start();
>     return dh;
>   }
>   
>   private void decode(
>       DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet,
ExceptionHandler exceptionHandler) 
>       throws InterruptedException {
>     Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler);
>     new Thread(decoder).start();
>   }
>   
>   private void awaitIteratorBufferNotFull() throws InterruptedException {
>     while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); }
>   }
>   private void awaitElements() throws InterruptedException {
>     while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); }
>   }
>   private static final class StreamDataSource implements DataSource {
>     private final String name = UUID.randomUUID().toString();
>     private final InputStream in;
>     private final String mimeType;
>     private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
>       ArgumentChecks.checkNotNull(aOut, "aOut");
>       ArgumentChecks.checkNotNull(aMimeType, "aMimeType");
>       try {
>         in = new PipedInputStream(aOut);
>       } catch ( IOException e ) {
>         throw new RuntimeException("Could not create input stream.", e);
>       }
>       mimeType = aMimeType;
>     }
>     @Override public String getName() { return name; }
>     @Override public String getContentType() { return mimeType; }
>     /**
>      * {@inheritDoc}
>      * 
>      * This implementation violates the specification in that it is destructive. Only
the first call will return an
>      * appropriate input stream.
>      */
>     @Override public InputStream getInputStream() { return in; }
>     @Override public OutputStream getOutputStream() { throw new UnsupportedOperationException();
}
>   }
>   
>   /**
>    * Decodes the contents of an input stream as written by the {@link com.tn_ag.sap.QueryResponse.Encoder}
and writes
>    * parsed rows to a {@link java.util.Queue}.
>    */
>   private class Decoder implements Runnable {
>     private final DataHandler dataHandler;
>     private final BlockingQueue<List<String>> resultSet;
>     private final ExceptionHandler exceptionHandler;
>     private final byte[] lenBytes = new byte[4];
>     Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet,
ExceptionHandler aHandler) {
>       ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler");
>       ArgumentChecks.checkNotNull(aResultSet, "aResultSet");
>       ArgumentChecks.checkNotNull(aHandler, "aHandler");
>       dataHandler = aDataHandler;
>       resultSet = aResultSet;
>       exceptionHandler = aHandler;
>     }
>     
>     @Override
>     public void run() {
>       InputStream in = null;
>       List<String> row;
>       int len;
>       try { 
>         in = dataHandler.getInputStream();
>         
>         while ( receiving ) {
>           synchronized ( resultSet ) {
>             receiving = fill(lenBytes, in) > 0;       // read next row's length in
number of columns 
>             len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length to integer
>             if ( !receiving || len == 0 ) { break; }
>             checkException(len, in);                  // -1 signals an exception
>             row = readRow(in, len);
>             awaitIteratorBufferNotFull();
>             resultSet.put(row);
>             rowsRead++;
>             if ( rowsRead % 1000 == 0 ) {
>               LOG.debug("already received {} rows", rowsRead);
>             }
>             resultSet.notifyAll();                    // notify waiting consumer threads
>           }
>         }
>         stopReception();
>         LOG.debug("received a total of {} rows.", rowsRead);
>       } catch ( InterruptedException e ) {
>         LOG.info("Result reception interrupted.");
>       } catch ( Exception e ) {
>         exceptionHandler.handle(e);
>         stopReception();
>       } finally {
>         receiving = false;
>         try {
>           if ( in != null ) { in.close(); }
>         } catch ( IOException e ) {
>           exceptionHandler.handle(e);
>         }
>       }
>     }
>     private List<String> readRow(InputStream in, int len) throws IOException {
>       List<String> row = new ArrayList<>(len);           // create list of
appropriate length
>       for ( int col = 0; col < len; col++ ) {            // for each column      
       
>         fill(lenBytes, in);                              // read the value length (fixed
for some types if schema known)
>         int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the length of the
value as bytes to an int
>         final byte[] bytes = new byte[valLen];           // allocate a buffer of exactly
the required size
>         fill(bytes, in);              
>         row.add(STRINGS.internalize(new String(bytes)));
>       }
>       return row;
>     }
>     private void stopReception() {
>       synchronized ( resultSet ) {
>         receiving = false;           // we will stop parsing now
>         resultSet.notifyAll();       // consumers can now consume the remaining elements
from the queue
>         LOG.debug("Read " + rowsRead + " rows from binary stream");
>       }                
>     }
>   }
>   
>   /**
>    * Encodes the given result set and writes the result to an output stream. 
>    */
>   private class Encoder implements Runnable {
>     private final OutputStream out;
>     private final AutoCloseableIterator<List<String>> iterator;
>     private final ExceptionHandler handler;
>     Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults,
ExceptionHandler aHandler) {
>       ArgumentChecks.checkNotNull(aOut, "aOut");
>       ArgumentChecks.checkNotNull(aResults, "aResults");
>       ArgumentChecks.checkNotNull(aHandler, "aHandler");
>       out = aOut;
>       iterator = aResults;
>       handler = aHandler;
>     }
>     @Override
>     public void run() {
>       try ( AutoCloseableIterator<List<String>> iter = this.iterator; OutputStream
out = this.out ) {
>         writeResultSet(iter, out);
>         out.flush();
>       } catch ( Exception e ) {
>         handler.handle(e);
>       }
>     }
>     private void writeResultSet(AutoCloseableIterator<List<String>> iter,
OutputStream out2) throws IOException {
>       List<String> row = null;
>       while ( this.iterator.hasNext() ) {
>         try {
>           row = this.iterator.next();
>           writeRow(row);
>           rowsWritten++;
>         } catch ( Exception e ) {
>           out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an exception
instead of row size
>           writeException(out, e);                   // send exception to client for rethrowing
it there
>           throw e;                                  // stop transmission, handle exception
on server-side (logging)
>         }
>       }
>       LOG.info("wrote {} rows to binary stream, closing output stream", rowsWritten);
>     }
>     private void writeRow(List<String> row) throws IOException {
>       out.write(ByteBuffer.allocate(4).putInt(row.size()).array());     // write row
size (in number of columns)
>       for ( String s : row ) {
>         if ( s == null ) { s = ""; }
>         byte[] bytes = s.getBytes();
>         out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); // write size
of column in bytes
>         out.write(bytes);                                               // write column
value
>       }
>     }
>     private void writeException(OutputStream output, Exception e) throws IOException
{
>       ObjectOutputStream objOut = new ObjectOutputStream(output);
>       objOut.writeObject(toString(e));
>     }
>     private String toString(Throwable e) {
>       if ( e instanceof UncheckedConnectionException && e.getCause() != null
) {
>         e = e.getCause();
>       }
>       ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream();
>       PrintWriter writer = new PrintWriter(stackTraceOut);
>       e.printStackTrace(writer);
>       writer.flush();
>       return new String(stackTraceOut.toByteArray());
>     }
>   }
>   
>   private final class ResultSetIterator<T> extends ExceptionHandler implements
Iterator<List<T>> {
>     private final BlockingQueue<List<T>> queue;
>     private Exception exception;
>     private int returned = 0;
>     private ResultSetIterator(BlockingQueue<List<T>> aQueue) {
>       queue = aQueue;
>     }
>     /**
>      * {@inheritDoc}
>      * 
>      * This implementation marks the current thread as interrupted if streaming could
not be commenced. 
>      */
>     @Override
>     public boolean hasNext() {
>       if ( exception != null ) { throw new UncheckedConnectionException("Exception while
reading results", exception); }
>       try {
>         synchronized ( resultSet ) {
>           awaitElements();
>           if ( exception != null ) { 
>             throw new UncheckedConnectionException("Exception while reading results",
exception); 
>           }
>           if ( resultSet.isEmpty() ) {
>             LOG.debug("iterator returned " + returned + " rows");
>           }
>           return !resultSet.isEmpty();
>         }
>       } catch ( InterruptedException e ) {
>         Thread.currentThread().interrupt();
>         return false;
>       }
>     
>     }
>     /**
>      * {@inheritDoc}
>      * 
>      * If the current thread is interrupted during this call, it is marked as interrupted
and the method returns 
>      * {@code null}.
>      * 
>      * @throws NoSuchElementException if called while {@link #hasNext()} returns {@code
false}
>      */
>     @Override
>     public List<T> next() throws NoSuchElementException {
>       if ( exception != null ) { throw new IllegalStateException("Exception while reading
results", exception); }
>       if ( !hasNext() ) { throw new NoSuchElementException(); }
>       try {
>         List<T> result = queue.take();
>         synchronized ( resultSet ) { resultSet.notify(); }
>         returned++;
>         return result;
>       } catch ( InterruptedException e ) {
>         Thread.currentThread().interrupt();
>         return null;
>       }
>     }
>     /**
>      * This method is not supported.
>      */
>     @Override public void remove() { throw new UnsupportedOperationException("RTFM");
}
>     @Override void handle(Exception aException) {
>       if ( exception != null ) { 
>         LOG.warn("There was another exception.", aException);
>       }
>       exception = aException;
>     }
>   }
>   
>   private abstract static class ExceptionHandler {
>     abstract void handle(Exception exception);
>   }
>   
>   private static class ServerExceptionHandler extends ExceptionHandler {
>     @Override
>     void handle(Exception aException) {
>       LOG.error(FATAL, "Exception while writing response.", aException);
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message