camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Damian Malczyk (JIRA)" <j...@apache.org>
Subject [jira] [Created] (CAMEL-10171) Camel CXF expired continuations cause memory leak
Date Thu, 21 Jul 2016 23:32:20 GMT
Damian Malczyk created CAMEL-10171:
--------------------------------------

             Summary: Camel CXF expired continuations cause memory leak
                 Key: CAMEL-10171
                 URL: https://issues.apache.org/jira/browse/CAMEL-10171
             Project: Camel
          Issue Type: Bug
          Components: camel-cxf
    Affects Versions: 2.17.1
            Reporter: Damian Malczyk


Looks like exchanges expired by CXF continuation timeout are being accumulated in InflightRepository.
Tested with Camel 2.17.1 and cxf-rt-transports-http-jetty:

Dependencies:
{code}<dependencies>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>2.17.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-cxf</artifactId>
            <version>2.17.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-http-jetty</artifactId>
            <version>3.1.5</version>
        </dependency>
    </dependencies>{code}

Reproducer:
{code}import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.cxf.CxfEndpoint;
import org.apache.camel.component.cxf.DataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.springframework.util.StreamUtils;
import org.w3c.dom.Document;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.soap.MessageFactory;
import javax.xml.soap.SOAPMessage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Sample {

    private final static String URI = "http://127.0.0.1:8080/";
    private final static long CONTINUATION_TIMEOUT = 100L;
    private final static long DELAYER_VALUE = 200L;
    private final static int SENDER_THREADS = Runtime.getRuntime().availableProcessors();
    private final static int MESSAGES_PER_SENDER = 10000;

    private static void setupCamel() throws Exception {
        final CamelContext camelContext = new DefaultCamelContext();
        final CxfEndpoint endpoint = (CxfEndpoint)camelContext.getEndpoint( "cxf://" + URI
);
        endpoint.setContinuationTimeout( CONTINUATION_TIMEOUT );
        endpoint.setDataFormat( DataFormat.PAYLOAD );
        camelContext.addRoutes( new RouteBuilder() {
            public void configure() throws Exception {
                from( endpoint )
                .threads()
                .setBody( constant( "<ok />" ) )
                .delay( DELAYER_VALUE )
                .end();
            }
        });
        final TimerTask repoSizeReporter = new TimerTask() {
            public void run() {
                System.out.println( "Inflight repository size: " + camelContext.getInflightRepository().size()
);
                System.gc();
                System.out.println( "Memory usage: " + (Runtime.getRuntime().totalMemory()
- Runtime.getRuntime().freeMemory())/(1024*1024) + "MB" );
            }
        };
        final Timer repoSizeReporterTimer = new Timer();
        repoSizeReporterTimer.schedule( repoSizeReporter, 1000, 1000 );
        camelContext.start();
    }

    private static byte[] createSoapMessage() throws Exception {
        final StringBuilder payloadBuilder = new StringBuilder( "<payload>" );
        for( int i = 0; i < 5000; i++ ) {
            payloadBuilder.append( "<payloadElement />" );
        }
        final String payload = payloadBuilder.append( "</payload>" ).toString();
        final DocumentBuilder documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
        final Document payloadDocument = documentBuilder.parse( new ByteArrayInputStream(
payload.getBytes() ) );
        final ByteArrayOutputStream soapOutStream = new ByteArrayOutputStream();
        final SOAPMessage message = MessageFactory.newInstance().createMessage();
        message.getSOAPBody().addDocument( payloadDocument );
        message.writeTo( soapOutStream );
        return soapOutStream.toByteArray();
    }

    private static Runnable soapSender() {
        return () -> {
            try {
                final byte[] soapMessage = createSoapMessage();
                for( int i = 0; i < MESSAGES_PER_SENDER; i++ ) {
                    final HttpURLConnection connection = (HttpURLConnection)new URL( URI ).openConnection();
                    connection.setDoOutput( true );
                    connection.setRequestProperty( "Content-Type", "text/xml" );
                    connection.setRequestProperty( "SOAPAction", "\"\"" );
                    connection.setRequestMethod( "POST" );
                    connection.setRequestProperty( "Accept", "*/*" );
                    connection.connect();
                    StreamUtils.copy( soapMessage, connection.getOutputStream() );
                    connection.getResponseCode();
                    connection.disconnect();
                }
            } catch ( final Exception ex ) {
                ex.printStackTrace();
            }
        };
    }

    public static void main(String[] args) throws Exception {
        setupCamel();
        final Executor executor = Executors.newFixedThreadPool( SENDER_THREADS );
        for( int i = 0; i < SENDER_THREADS; i++ ) {
            executor.execute( soapSender() );
        }
    }
}{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message