Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D1D72200C4C for ; Tue, 4 Apr 2017 18:53:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D08AB160B90; Tue, 4 Apr 2017 16:53:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CD19C160B77 for ; Tue, 4 Apr 2017 18:53:53 +0200 (CEST) Received: (qmail 6704 invoked by uid 500); 4 Apr 2017 16:53:53 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 6694 invoked by uid 99); 4 Apr 2017 16:53:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 16:53:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5719DFE34; Tue, 4 Apr 2017 16:53:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dkulp@apache.org To: commits@cxf.apache.org Date: Tue, 04 Apr 2017 16:53:52 -0000 Message-Id: <938ff7d4e880498682fd4d24a95a645c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] cxf git commit: [CXF-7096] Make sure everything is cleaned up if the destination sequecne times out archived-at: Tue, 04 Apr 2017 16:53:55 -0000 Repository: cxf Updated Branches: refs/heads/3.1.x-fixes 8d6b79f20 -> 4406683df [CXF-7096] Make sure everything is cleaned up if the destination sequecne times out Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/15ed5d4f Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/15ed5d4f Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/15ed5d4f Branch: refs/heads/3.1.x-fixes Commit: 15ed5d4f8d4e2443a668c29d96626debcd01a8db Parents: 8d6b79f Author: Daniel Kulp Authored: Tue Apr 4 12:07:45 2017 -0400 Committer: Daniel Kulp Committed: Tue Apr 4 12:53:40 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/cxf/ws/rm/Destination.java | 5 +- .../apache/cxf/ws/rm/DestinationSequence.java | 15 +- .../java/org/apache/cxf/ws/rm/RMManager.java | 8 + .../cxf/ws/rm/DestinationSequenceTest.java | 4 +- .../cxf/systest/ws/rm/SequenceTimeoutTest.java | 213 +++++++++++++++++++ 5 files changed, 237 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java index 3d3489b..ef1ea1a 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java @@ -78,8 +78,11 @@ public class Destination extends AbstractEndpoint { // this method ensures to keep the sequence until all the messages are delivered public void terminateSequence(DestinationSequence seq) { + terminateSequence(seq, false); + } + public void terminateSequence(DestinationSequence seq, boolean forceRemove) { seq.terminate(); - if (seq.allAcknowledgedMessagesDelivered()) { + if (forceRemove || seq.allAcknowledgedMessagesDelivered()) { removeSequence(seq); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java index 3442fc5..d186194 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java @@ -590,11 +590,16 @@ public class DestinationSequence extends AbstractSequence { // terminate regardless outstanding acknowledgments - as we assume that the client is // gone there is no point in sending a SequenceAcknowledgment - - LogUtils.log(LOG, Level.WARNING, "TERMINATING_INACTIVE_SEQ_MSG", + LogUtils.log(LOG, Level.WARNING, "TERMINATING_INACTIVE_SEQ_MSG", DestinationSequence.this.getIdentifier().getValue()); - DestinationSequence.this.destination.terminateSequence(DestinationSequence.this); - + DestinationSequence.this.destination.terminateSequence(DestinationSequence.this, true); + Source source = rme.getSource(); + if (source != null) { + SourceSequence ss = source.getAssociatedSequence(DestinationSequence.this.getIdentifier()); + if (ss != null) { + source.removeSequence(ss); + } + } } else { // reschedule SequenceTermination st = new SequenceTermination(); @@ -605,4 +610,4 @@ public class DestinationSequence extends AbstractSequence { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java index a29a6a7..603e8d6 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java @@ -404,6 +404,14 @@ public class RMManager { } return rme; } + public RMEndpoint findReliableEndpoint(QName qn) { + for (RMEndpoint rpe : reliableEndpoints.values()) { + if (qn.equals(rpe.getApplicationEndpoint().getService().getName())) { + return rpe; + } + } + return null; + } public Destination getDestination(Message message) throws RMException { RMEndpoint rme = getReliableEndpoint(message); http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java index 546f5f3..72666b8 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java @@ -567,7 +567,7 @@ public class DestinationSequenceTest extends Assert { DestinationSequence seq = new DestinationSequence(id, ref, destination, ProtocolVariation.RM10WSA200408); - destination.terminateSequence(seq); + destination.terminateSequence(seq, true); EasyMock.expectLastCall(); Message message = setUpMessage("1"); @@ -601,7 +601,7 @@ public class DestinationSequenceTest extends Assert { long lastAppMessage = System.currentTimeMillis() - 30000L; EasyMock.expect(rme.getLastControlMessage()).andReturn(0L); EasyMock.expect(rme.getLastApplicationMessage()).andReturn(lastAppMessage); - destination.terminateSequence(seq); + destination.terminateSequence(seq, true); EasyMock.expectLastCall(); control.replay(); st.run(); http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTimeoutTest.java ---------------------------------------------------------------------- diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTimeoutTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTimeoutTest.java new file mode 100644 index 0000000..0a04564 --- /dev/null +++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTimeoutTest.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.ws.rm; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +import javax.xml.namespace.QName; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.ws.BindingProvider; +import javax.xml.ws.Dispatch; +import javax.xml.ws.Endpoint; +import javax.xml.ws.Service; +import javax.xml.ws.handler.MessageContext; + +import org.w3c.dom.Document; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.bus.spring.SpringBusFactory; +import org.apache.cxf.greeter_control.Greeter; +import org.apache.cxf.greeter_control.GreeterService; +import org.apache.cxf.systest.ws.util.ConnectionHelper; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; +import org.apache.cxf.testutil.common.TestUtil; +import org.apache.cxf.ws.rm.RMEndpoint; +import org.apache.cxf.ws.rm.RMManager; +import org.apache.cxf.ws.rm.manager.AcksPolicyType; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * + */ +public class SequenceTimeoutTest extends AbstractBusClientServerTestBase { + public static final String PORT = TestUtil.getPortNumber(SequenceTimeoutTest.class); + private static final String ADDRESS = "http://localhost:" + PORT + "/SoapContext/GreeterPort"; + private static final QName GREETME_NAME + = new QName("http://cxf.apache.org/greeter_control", "greetMe"); + private static final QName GREETME_SERVICE_NAME + = new QName("http://cxf.apache.org/greeter_control", "GreeterService"); + + private static RMManager rmManager; + + private Bus greeterBus; + private Greeter greeter; + + + public static class Server extends AbstractBusTestServerBase { + Endpoint endpoint; + + protected void run() { + SpringBusFactory bf = new SpringBusFactory(); + System.setProperty("db.name", "rdbm"); + Bus bus = bf.createBus("org/apache/cxf/systest/ws/rm/rminterceptors.xml"); + System.clearProperty("db.name"); + BusFactory.setDefaultBus(bus); + + setBus(bus); + + rmManager = bus.getExtension(RMManager.class); + rmManager.getConfiguration().setInactivityTimeout(1000L); + + //System.out.println("Created control bus " + bus); + GreeterImpl greeterImplementor = new GreeterImpl(); + endpoint = Endpoint.publish(ADDRESS, greeterImplementor); + + BusFactory.setDefaultBus(null); + BusFactory.setThreadDefaultBus(null); + } + + public void tearDown() throws Exception { + endpoint.stop(); + } + } + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + assertTrue("server did not launch correctly", launchServer(Server.class, true)); + } + + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + private void init(String cfgResource, boolean useDecoupledEndpoint, boolean useDispatchClient) { + init(cfgResource, useDecoupledEndpoint, useDispatchClient, null); + } + + private void init(String cfgResource, + boolean useDecoupledEndpoint, + boolean useDispatchClient, + Executor executor) { + + SpringBusFactory bf = new SpringBusFactory(); + initGreeterBus(bf, cfgResource); + if (useDispatchClient) { + initDispatch(); + } else { + initProxy(useDecoupledEndpoint, executor); + } + } + private void initGreeterBus(SpringBusFactory bf, + String cfgResource) { + greeterBus = bf.createBus(cfgResource); + BusFactory.setDefaultBus(greeterBus); + } + + + private Dispatch initDispatch() { + GreeterService gs = new GreeterService(); + Dispatch dispatch = gs.createDispatch(GreeterService.GreeterPort, + DOMSource.class, + Service.Mode.MESSAGE); + try { + updateAddressPort(dispatch, PORT); + } catch (Exception e) { + //ignore + } + dispatch.getRequestContext().put(BindingProvider.SOAPACTION_USE_PROPERTY, Boolean.FALSE); + dispatch.getRequestContext().put(MessageContext.WSDL_OPERATION, GREETME_NAME); + + return dispatch; + } + + private void initProxy(boolean useDecoupledEndpoint, Executor executor) { + GreeterService gs = new GreeterService(); + + if (null != executor) { + gs.setExecutor(executor); + } + + greeter = gs.getGreeterPort(); + try { + updateAddressPort(greeter, PORT); + } catch (Exception e) { + //ignore + } + + ConnectionHelper.setKeepAliveConnection(greeter, true); + } + @Test + public void testTimeout() throws Exception { + init("org/apache/cxf/systest/ws/rm/rminterceptors.xml", true, true); + + List> dispatches = new ArrayList<>(5); + int count = 5; + for (int x = 0; x < count; x++) { + Dispatch dispatch = initDispatch(); + AcksPolicyType ap = new AcksPolicyType(); + //don't send the acks to cause a memory leak - CXF-7096 + ap.setImmediaAcksTimeout(500000L); + greeterBus.getExtension(RMManager.class).getDestinationPolicy().setAcksPolicy(ap); + dispatch.invoke(getDOMRequest("One")); + dispatches.add(dispatch); + } + RMEndpoint ep = rmManager.findReliableEndpoint(GREETME_SERVICE_NAME); + Assert.assertNotNull(ep); + Assert.assertEquals(count, ep.getDestination().getAllSequences().size()); + Assert.assertEquals(count, ep.getSource().getAllSequences().size()); + Thread.sleep(2500); + System.gc(); + Assert.assertEquals(0, ep.getDestination().getAllSequences().size()); + Assert.assertEquals(0, ep.getSource().getAllSequences().size()); + try { + dispatches.get(0).invoke(getDOMRequest("One")); + fail("The sequence should have been terminated"); + } catch (Throwable t) { + //expected + Assert.assertTrue(t.getMessage().contains("not a known Sequence identifier")); + } + rmManager.getStore(); + } + private DOMSource getDOMRequest(String n) + throws Exception { + InputStream is = + getClass().getResourceAsStream("twoway" + + "Req" + n + ".xml"); + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(true); + DocumentBuilder builder = factory.newDocumentBuilder(); + Document newDoc = builder.parse(is); + return new DOMSource(newDoc); + } +}