Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3FDCF200B4C for ; Fri, 22 Jul 2016 19:06:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3E672160A5A; Fri, 22 Jul 2016 17:06:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8F46A160A8F for ; Fri, 22 Jul 2016 19:06:07 +0200 (CEST) Received: (qmail 20006 invoked by uid 500); 22 Jul 2016 17:06:06 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 19802 invoked by uid 99); 22 Jul 2016 17:06:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jul 2016 17:06:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79F18EAD93; Fri, 22 Jul 2016 17:06:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dkulp@apache.org To: commits@cxf.apache.org Date: Fri, 22 Jul 2016 17:06:11 -0000 Message-Id: In-Reply-To: <9315493051f7496e8e959b24d8c2a950@git.apache.org> References: <9315493051f7496e8e959b24d8c2a950@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/9] cxf git commit: [CXF-4209] Server side message redelivery support for WS-RM archived-at: Fri, 22 Jul 2016 17:06:09 -0000 http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RedeliveryQueueImpl.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RedeliveryQueueImpl.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RedeliveryQueueImpl.java new file mode 100644 index 0000000..e2ff271 --- /dev/null +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RedeliveryQueueImpl.java @@ -0,0 +1,699 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.ws.rm.soap; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.SortedSet; +import java.util.TimerTask; +import java.util.TreeSet; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.xml.stream.XMLStreamReader; + +import org.w3c.dom.Node; + +import org.apache.cxf.Bus; +import org.apache.cxf.binding.soap.SoapMessage; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.interceptor.Interceptor; +import org.apache.cxf.interceptor.InterceptorChain; +import org.apache.cxf.interceptor.InterceptorProvider; +import org.apache.cxf.io.CachedOutputStream; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; +import org.apache.cxf.phase.Phase; +import org.apache.cxf.phase.PhaseInterceptorChain; +import org.apache.cxf.phase.PhaseManager; +import org.apache.cxf.staxutils.StaxSource; +import org.apache.cxf.ws.rm.DestinationSequence; +import org.apache.cxf.ws.rm.RMCaptureInInterceptor; +import org.apache.cxf.ws.rm.RMContextUtils; +import org.apache.cxf.ws.rm.RMManager; +import org.apache.cxf.ws.rm.RMMessageConstants; +import org.apache.cxf.ws.rm.RMProperties; +import org.apache.cxf.ws.rm.RedeliveryQueue; +import org.apache.cxf.ws.rm.RetryStatus; +import org.apache.cxf.ws.rm.manager.RetryPolicyType; +import org.apache.cxf.ws.rm.persistence.RMStore; +import org.apache.cxf.ws.rm.v200702.Identifier; +import org.apache.cxf.ws.rm.v200702.SequenceType; + +/** + * + */ +public class RedeliveryQueueImpl implements RedeliveryQueue { + private static final Logger LOG = LogUtils.getL7dLogger(RedeliveryQueueImpl.class); + + private Map> candidates = + new HashMap>(); + private Map> suspendedCandidates = + new HashMap>(); + + private RMManager manager; + + private int undeliveredCount; + + public RedeliveryQueueImpl(RMManager m) { + manager = m; + } + + public RMManager getManager() { + return manager; + } + + public void setManager(RMManager m) { + manager = m; + } + + public void addUndelivered(Message message) { + cacheUndelivered(message); + } + + /** + * @param seq the sequence under consideration + * @return the number of undelivered messages for that sequence + */ + public synchronized int countUndelivered(DestinationSequence seq) { + List sequenceCandidates = getSequenceCandidates(seq); + return sequenceCandidates == null ? 0 : sequenceCandidates.size(); + } + + public int countUndelivered() { + return undeliveredCount; + } + + public boolean isEmpty() { + return 0 == getUndelivered().size(); + } + public void purgeAll(DestinationSequence seq) { + Collection purged = new ArrayList(); + synchronized (this) { + LOG.fine("Start purging redeliver candidates."); + List sequenceCandidates = getSequenceCandidates(seq); + if (null != sequenceCandidates) { + for (int i = sequenceCandidates.size() - 1; i >= 0; i--) { + RedeliverCandidate candidate = sequenceCandidates.get(i); + long m = candidate.getNumber(); + sequenceCandidates.remove(i); + candidate.resolved(); + undeliveredCount--; + purged.add(m); + } + if (sequenceCandidates.isEmpty()) { + candidates.remove(seq.getIdentifier().getValue()); + } + } + LOG.fine("Completed purging redeliver candidates."); + } + if (purged.size() > 0) { + RMStore store = manager.getStore(); + if (null != store) { + store.removeMessages(seq.getIdentifier(), purged, false); + } + } + } + + public List getUndeliveredMessageNumbers(DestinationSequence seq) { + List undelivered = new ArrayList(); + List sequenceCandidates = getSequenceCandidates(seq); + if (null != sequenceCandidates) { + for (int i = 0; i < sequenceCandidates.size(); i++) { + RedeliverCandidate candidate = sequenceCandidates.get(i); + RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(), + false); + SequenceType st = properties.getSequence(); + undelivered.add(st.getMessageNumber()); + } + } + return undelivered; + } + + /** + * @param seq the sequence under consideration + * @return the list of resend candidates for that sequence + * @pre called with mutex held + */ + protected List getSequenceCandidates(DestinationSequence seq) { + return getSequenceCandidates(seq.getIdentifier().getValue()); + } + + /** + * @param key the sequence identifier under consideration + * @return the list of resend candidates for that sequence + * @pre called with mutex held + */ + protected List getSequenceCandidates(String key) { + List sc = candidates.get(key); + if (null == sc) { + sc = suspendedCandidates.get(key); + } + return sc; + } + + /** + * @param key the sequence identifier under consideration + * @return true if the sequence is currently suspended; false otherwise + * @pre called with mutex held + */ + protected boolean isSequenceSuspended(String key) { + return suspendedCandidates.containsKey(key); + } + + public RetryStatus getRedeliveryStatus(DestinationSequence seq, long num) { + List sequenceCandidates = getSequenceCandidates(seq); + if (null != sequenceCandidates) { + for (int i = 0; i < sequenceCandidates.size(); i++) { + RedeliverCandidate candidate = sequenceCandidates.get(i); + RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(), + false); + SequenceType st = properties.getSequence(); + if (num == st.getMessageNumber()) { + return candidate; + } + } + } + return null; + } + + + public Map getRedeliveryStatuses(DestinationSequence seq) { + Map cp = new HashMap(); + List sequenceCandidates = getSequenceCandidates(seq); + if (null != sequenceCandidates) { + for (int i = 0; i < sequenceCandidates.size(); i++) { + RedeliverCandidate candidate = sequenceCandidates.get(i); + RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(), + false); + SequenceType st = properties.getSequence(); + cp.put(st.getMessageNumber(), candidate); + } + } + return cp; + } + + + public void start() { + // TODO Auto-generated method stub + + } + + + public void stop(DestinationSequence seq) { + synchronized (this) { + List sequenceCandidates = getSequenceCandidates(seq); + if (null != sequenceCandidates) { + for (int i = sequenceCandidates.size() - 1; i >= 0; i--) { + RedeliverCandidate candidate = sequenceCandidates.get(i); + candidate.cancel(); + } + LOG.log(Level.FINE, "Cancelled redeliveriss for sequence {0}.", + seq.getIdentifier().getValue()); + } + } + } + + + public void suspend(DestinationSequence seq) { + synchronized (this) { + String key = seq.getIdentifier().getValue(); + List sequenceCandidates = candidates.remove(key); + if (null != sequenceCandidates) { + for (int i = sequenceCandidates.size() - 1; i >= 0; i--) { + RedeliverCandidate candidate = sequenceCandidates.get(i); + candidate.suspend(); + } + suspendedCandidates.put(key, sequenceCandidates); + LOG.log(Level.FINE, "Suspended redeliveris for sequence {0}.", key); + } + } + } + + + public void resume(DestinationSequence seq) { + synchronized (this) { + String key = seq.getIdentifier().getValue(); + List sequenceCandidates = suspendedCandidates.remove(key); + if (null != sequenceCandidates) { + for (int i = 0; i < sequenceCandidates.size(); i++) { + RedeliverCandidate candidate = sequenceCandidates.get(i); + candidate.resume(); + } + candidates.put(key, sequenceCandidates); + LOG.log(Level.FINE, "Resumed redeliveries for sequence {0}.", key); + } + } + } + + /** + * Accepts a new resend candidate. + * + * @param ctx the message context. + * @return ResendCandidate + */ + protected RedeliverCandidate cacheUndelivered(Message message) { + RMProperties rmps = RMContextUtils.retrieveRMProperties(message, false); + SequenceType st = rmps.getSequence(); + Identifier sid = st.getIdentifier(); + String key = sid.getValue(); + + RedeliverCandidate candidate = null; + + synchronized (this) { + List sequenceCandidates = getSequenceCandidates(key); + if (null == sequenceCandidates) { + sequenceCandidates = new ArrayList(); + candidates.put(key, sequenceCandidates); + } + candidate = getRedeliverCandidate(st, sequenceCandidates); + if (candidate == null) { + candidate = new RedeliverCandidate(message); + if (isSequenceSuspended(key)) { + candidate.suspend(); + } + sequenceCandidates.add(candidate); + undeliveredCount++; + } + } + LOG.fine("Cached undelivered message."); + return candidate; + } + + private RedeliverCandidate getRedeliverCandidate(SequenceType st, List rcs) { + // assume the size of candidates to be relatively small; otherwise we should use message numbers as keys + for (RedeliverCandidate rc : rcs) { + if (st.getMessageNumber() == rc.getNumber()) { + return rc; + } + } + return null; + } + + protected void purgeDelivered(RedeliverCandidate candidate) { + RMProperties rmps = RMContextUtils.retrieveRMProperties(candidate.getMessage(), false); + SequenceType st = rmps.getSequence(); + Identifier sid = st.getIdentifier(); + String key = sid.getValue(); + + synchronized (this) { + List sequenceCandidates = getSequenceCandidates(key); + if (null != sequenceCandidates) { + // TODO use a constant op instead of this inefficient linear op + sequenceCandidates.remove(candidate); + undeliveredCount--; + } + if (sequenceCandidates.isEmpty()) { + candidates.remove(sid.getValue()); + } + + } + LOG.fine("Purged delivered message."); + + } + + /** + * @return a map relating sequence ID to a lists of un-acknowledged messages + * for that sequence + */ + protected Map> getUndelivered() { + return candidates; + } + + private static InterceptorChain getRedeliveryInterceptorChain(Message m, String phase) { + Exchange exchange = m.getExchange(); + Endpoint ep = exchange.getEndpoint(); + Bus bus = exchange.getBus(); + + PhaseManager pm = bus.getExtension(PhaseManager.class); + SortedSet phases = new TreeSet(pm.getInPhases()); + for (Iterator it = phases.iterator(); it.hasNext();) { + Phase p = it.next(); + if (phase.equals(p.getName())) { + break; + } + it.remove(); + } + PhaseInterceptorChain chain = new PhaseInterceptorChain(phases); + List> il = ep.getInInterceptors(); + addInterceptors(chain, il); + il = ep.getService().getInInterceptors(); + addInterceptors(chain, il); + il = ep.getBinding().getInInterceptors(); + addInterceptors(chain, il); + il = bus.getInInterceptors(); + addInterceptors(chain, il); + if (ep.getService().getDataBinding() instanceof InterceptorProvider) { + il = ((InterceptorProvider)ep.getService().getDataBinding()).getInInterceptors(); + addInterceptors(chain, il); + } + + return chain; + } + + private static void addInterceptors(PhaseInterceptorChain chain, + List> il) { + for (Interceptor i : il) { + final String iname = i.getClass().getSimpleName(); + if ("OneWayProcessorInterceptor".equals(iname) + || "MAPAggregatorImpl".equals(iname) + || "RMInInterceptor".equals(iname)) { + continue; + } + chain.add(i); + } + } + + //TODO refactor this class to unify its functionality with that of ResendCandidate + protected class RedeliverCandidate implements Runnable, RetryStatus { + private Message message; + private long number; + private Date next; + private TimerTask nextTask; + private int retries; + private int maxRetries; + private long nextInterval; + private long backoff; + private boolean pending; + private boolean suspended; + + protected RedeliverCandidate(Message m) { + message = m; + if (message instanceof SoapMessage) { + // remove old message headers like WSS headers + ((SoapMessage)message).getHeaders().clear(); + } + RetryPolicyType rmrp = null != manager.getDestinationPolicy() + ? manager.getDestinationPolicy().getRetryPolicy() : null; + long baseRedeliveryInterval = Long.parseLong(DEFAULT_BASE_REDELIVERY_INTERVAL); + if (null != rmrp && rmrp.getInterval() > 0L) { + baseRedeliveryInterval = rmrp.getInterval(); + } + backoff = RedeliveryQueue.DEFAULT_EXPONENTIAL_BACKOFF; + next = new Date(System.currentTimeMillis() + baseRedeliveryInterval); + nextInterval = baseRedeliveryInterval * backoff; + maxRetries = null != rmrp ? rmrp.getMaxRetries() : 0; + + RMProperties rmprops = RMContextUtils.retrieveRMProperties(message, false); + if (null != rmprops) { + number = rmprops.getSequence().getMessageNumber(); + } + + if (null != manager.getTimer() && maxRetries != 0) { + schedule(); + } + + } + + /** + * Initiate redelivery asynchronsly. + * + */ + protected void initiate() { + pending = true; + Endpoint ep = message.getExchange().get(Endpoint.class); + Executor executor = ep.getExecutor(); + if (null == executor) { + executor = ep.getService().getExecutor(); + LOG.log(Level.FINE, "Using service executor {0}", executor.getClass().getName()); + } else { + LOG.log(Level.FINE, "Using endpoint executor {0}", executor.getClass().getName()); + } + + try { + executor.execute(this); + } catch (RejectedExecutionException ex) { + LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex); + } + } + + public void run() { + try { + if (isPending()) { + // redeliver + redeliver(); + purgeDelivered(this); + resolved(); + } + } catch (Exception ex) { + LOG.log(Level.WARNING, "redelivery failed", ex); + } finally { + attempted(); + } + } + + + private void redeliver() throws Exception { + LOG.log(Level.INFO, "Redelivering ... for " + (1 + retries)); + String restartingPhase; + if (message.getContent(Exception.class) != null) { + message.removeContent(Exception.class); + message.getExchange().put(Exception.class, null); + + // clean-up message for redelivery + closeStreamResources(); + message.removeContent(Node.class); + } + + + InputStream is = null; + CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); + is = cos.getInputStream(); + message.setContent(InputStream.class, is); + message = message.getExchange().getEndpoint().getBinding().createMessage(message); + restartingPhase = Phase.POST_STREAM; + // skip some interceptor chain phases for redelivery + InterceptorChain chain = getRedeliveryInterceptorChain(message, restartingPhase); + ListIterator> iterator = chain.getIterator(); + while (iterator.hasNext()) { + Interceptor incept = iterator.next(); + if (incept.getClass().getName().equals(RMCaptureInInterceptor.class.getName())) { + chain.remove(incept); + } + } + message.getExchange().setInMessage(message); + message.setInterceptorChain(chain); + chain.doIntercept(message); + Exception ex = message.getContent(Exception.class); + if (null != ex) { + throw ex; + } + } + + public long getNumber() { + return number; + } + + public Date getNext() { + return next; + } + + public Date getPrevious() { + if (retries > 0) { + return new Date(next.getTime() - nextInterval / backoff); + } + return null; + } + + public int getRetries() { + return retries; + } + + public int getMaxRetries() { + return maxRetries; + } + + public long getNextInterval() { + return nextInterval; + } + + public long getBackoff() { + return backoff; + } + + public boolean isPending() { + return pending; + } + + public boolean isSuspended() { + return suspended; + } + + /** + * the message has been delivered to the application + */ + protected synchronized void resolved() { + pending = false; + next = null; + if (null != nextTask) { + nextTask.cancel(); + } + } + + /** + * Cancel further redelivery (although not successfully delivered). + */ + protected void cancel() { + if (null != nextTask) { + nextTask.cancel(); + closeStreamResources(); + releaseSavedMessage(); + } + } + + protected void suspend() { + suspended = true; + pending = false; + //TODO release the message and later reload it upon resume + //cancel(); + if (null != nextTask) { + nextTask.cancel(); + } + + } + + protected void resume() { + suspended = false; + next = new Date(System.currentTimeMillis()); + attempted(); + } + + private void releaseSavedMessage() { + CachedOutputStream saved = (CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT); + if (saved != null) { + saved.releaseTempFileHold(); + try { + saved.close(); + } catch (IOException e) { + // ignore + } + } + // Any unclosed resources must be closed to release the temp files. + Closeable closeable = (Closeable)message.get(RMMessageConstants.ATTACHMENTS_CLOSEABLE); + if (closeable != null) { + try { + closeable.close(); + } catch (IOException e) { + // ignore + } + } + } + + /* + * Close all stream-like resources stored in the message + */ + private void closeStreamResources() { + InputStream oin = message.getContent(InputStream.class); + if (oin != null) { + try { + oin.close(); + } catch (Exception e) { + // ignore + } + message.removeContent(InputStream.class); + } + XMLStreamReader oreader = message.getContent(XMLStreamReader.class); + if (oreader != null) { + try { + oreader.close(); + } catch (Exception e) { + // ignore + } + message.removeContent(XMLStreamReader.class); + } + List olist = message.getContent(List.class); + if (olist != null && olist.size() == 1) { + Object o = olist.get(0); + if (o instanceof XMLStreamReader) { + oreader = (XMLStreamReader)o; + } else if (o instanceof StaxSource) { + oreader = ((StaxSource)o).getXMLStreamReader(); + } + + if (oreader != null) { + try { + oreader.close(); + } catch (Exception e) { + // ignore + } + } + message.removeContent(List.class); + } + } + + /** + * @return associated message context + */ + protected Message getMessage() { + return message; + } + + /** + * A resend has been attempted. Schedule the next attempt. + */ + protected synchronized void attempted() { + pending = false; + retries++; + if (null != next && maxRetries != retries) { + next = new Date(next.getTime() + nextInterval); + nextInterval *= backoff; + schedule(); + } + } + + protected final synchronized void schedule() { + if (null == manager.getTimer()) { + return; + } + class RedeliverTask extends TimerTask { + RedeliverCandidate candidate; + + RedeliverTask(RedeliverCandidate c) { + candidate = c; + } + + @Override + public void run() { + if (!candidate.isPending()) { + candidate.initiate(); + } + } + } + nextTask = new RedeliverTask(this); + try { + manager.getTimer().schedule(nextTask, next); + } catch (IllegalStateException ex) { + LOG.log(Level.WARNING, "SCHEDULE_RESEND_FAILED_MSG", ex); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd index 99b60c4..e5425cd 100644 --- a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd +++ b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd @@ -214,6 +214,21 @@ + + + + The time interval between each retry in milliseconds. + + + + + + + The name of the interval adjustment algorithm. + The default algorithm is ExponentialBackoff. + + + http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb index 5bfd418..a049c3f 100644 --- a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb +++ b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb @@ -53,4 +53,13 @@ + + + + + + + + http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java index a189932..546f5f3 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java @@ -567,7 +567,7 @@ public class DestinationSequenceTest extends Assert { DestinationSequence seq = new DestinationSequence(id, ref, destination, ProtocolVariation.RM10WSA200408); - destination.removeSequence(seq); + destination.terminateSequence(seq); EasyMock.expectLastCall(); Message message = setUpMessage("1"); @@ -601,7 +601,7 @@ public class DestinationSequenceTest extends Assert { long lastAppMessage = System.currentTimeMillis() - 30000L; EasyMock.expect(rme.getLastControlMessage()).andReturn(0L); EasyMock.expect(rme.getLastApplicationMessage()).andReturn(lastAppMessage); - destination.removeSequence(seq); + destination.terminateSequence(seq); EasyMock.expectLastCall(); control.replay(); st.run(); http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java index 7600cd5..acd179c 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java @@ -292,6 +292,10 @@ public class RMEndpointTest extends Assert { EasyMock.expect(manager.getRetransmissionQueue()).andReturn(queue).anyTimes(); queue.stop(ss); EasyMock.expectLastCall().anyTimes(); + RedeliveryQueue dqueue = control.createMock(RedeliveryQueue.class); + EasyMock.expect(manager.getRedeliveryQueue()).andReturn(dqueue).anyTimes(); + dqueue.stop(ds); + EasyMock.expectLastCall().anyTimes(); control.replay(); rme.getDestination().addSequence(ds, false); rme.getSource().addSequence(ss, false); http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java index a68d821..c9411d8 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java @@ -40,6 +40,8 @@ import org.apache.cxf.ws.addressing.JAXWSAConstants; import org.apache.cxf.ws.addressing.MAPAggregator; import org.apache.cxf.ws.addressing.VersionTransformer.Names200408; import org.apache.cxf.ws.policy.AssertionInfoMap; +import org.apache.cxf.ws.rm.manager.DestinationPolicyType; +import org.apache.cxf.ws.rm.manager.RetryPolicyType; import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType; import org.apache.cxf.ws.rm.v200702.Identifier; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement; @@ -367,6 +369,29 @@ public class RMInInterceptorTest extends Assert { } catch (Exception e) { fail("unexpected exception thrown from handleFault: " + e); } + + control.reset(); + org.apache.cxf.transport.Destination td = control.createMock(org.apache.cxf.transport.Destination.class); + EasyMock.expect(exchange.getDestination()).andReturn(td).anyTimes(); + EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes(); + EasyMock.expect(message.get(RMMessageConstants.RM_PROTOCOL_VARIATION)) + .andReturn(ProtocolVariation.RM10WSA200408).anyTimes(); + EasyMock.expect(message.getContent(Exception.class)).andReturn(new SequenceFault("no sequence")).anyTimes(); + DestinationPolicyType dp = new DestinationPolicyType(); + RetryPolicyType rp = new RetryPolicyType(); + dp.setRetryPolicy(rp); + EasyMock.expect(manager.getDestinationPolicy()).andReturn(dp).anyTimes(); + RedeliveryQueue rq = control.createMock(RedeliveryQueue.class); + EasyMock.expect(manager.getRedeliveryQueue()).andReturn(rq).anyTimes(); + rq.addUndelivered(message); + EasyMock.expectLastCall().andThrow(new RuntimeException("shouldn't be queued")).anyTimes(); + control.replay(); + + try { + interceptor.handleFault(message); + } catch (Exception e) { + fail("unexpected exception thrown from handleFault: " + e); + } } @Test http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java index 37fb6ac..5231bd4 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java @@ -519,7 +519,7 @@ public class RMManagerTest extends Assert { InterfaceInfo ii = control.createMock(InterfaceInfo.class); setUpEndpointForRecovery(endpoint, ei, si, bi, ii); Conduit conduit = control.createMock(Conduit.class); - setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null, null); + setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null); control.replay(); manager.recoverReliableEndpoint(endpoint, conduit); control.verify(); @@ -528,7 +528,7 @@ public class RMManagerTest extends Assert { setUpEndpointForRecovery(endpoint, ei, si, bi, ii); SourceSequence ss = control.createMock(SourceSequence.class); DestinationSequence ds = control.createMock(DestinationSequence.class); - setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null, null); + setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null); control.replay(); manager.recoverReliableEndpoint(endpoint, conduit); control.verify(); @@ -536,16 +536,10 @@ public class RMManagerTest extends Assert { control.reset(); setUpEndpointForRecovery(endpoint, ei, si, bi, ii); RMMessage m = control.createMock(RMMessage.class); - Capture mc = Capture.newInstance(); - setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m, mc); + setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m); control.replay(); manager.recoverReliableEndpoint(endpoint, conduit); control.verify(); - - Message msg = mc.getValue(); - assertNotNull(msg); - assertNotNull(msg.getExchange()); - assertSame(msg, msg.getExchange().getOutMessage()); } @Test @@ -595,9 +589,11 @@ public class RMManagerTest extends Assert { DestinationSequence ds, RMMessage m, Capture mc) throws IOException { RMStore store = control.createMock(RMStore.class); - RetransmissionQueue queue = control.createMock(RetransmissionQueue.class); + RetransmissionQueue oqueue = control.createMock(RetransmissionQueue.class); + RedeliveryQueue iqueue = control.createMock(RedeliveryQueue.class); manager.setStore(store); - manager.setRetransmissionQueue(queue); + manager.setRetransmissionQueue(oqueue); + manager.setRedeliveryQueue(iqueue); Collection sss = new ArrayList(); if (null != ss) { @@ -652,10 +648,11 @@ public class RMManagerTest extends Assert { return; } - queue.addUnacknowledged(EasyMock.capture(mc)); - + oqueue.addUnacknowledged(EasyMock.capture(mc)); EasyMock.expectLastCall(); - queue.start(); + oqueue.start(); + EasyMock.expectLastCall(); + iqueue.start(); EasyMock.expectLastCall(); } @@ -678,12 +675,14 @@ public class RMManagerTest extends Assert { void setUpRecoverReliableEndpoint(Endpoint endpoint, Conduit conduit, SourceSequence ss, - DestinationSequence ds, RMMessage m, Capture mc) + DestinationSequence ds, RMMessage m) throws IOException { RMStore store = control.createMock(RMStore.class); - RetransmissionQueue queue = control.createMock(RetransmissionQueue.class); + RetransmissionQueue oqueue = control.createMock(RetransmissionQueue.class); + RedeliveryQueue iqueue = control.createMock(RedeliveryQueue.class); manager.setStore(store); - manager.setRetransmissionQueue(queue); + manager.setRetransmissionQueue(oqueue); + manager.setRedeliveryQueue(iqueue); Collection sss = new ArrayList(); if (null != ss) { @@ -747,13 +746,11 @@ public class RMManagerTest extends Assert { is.close(); EasyMock.expect(m.getContent()).andReturn(cos).anyTimes(); - if (mc != null) { - queue.addUnacknowledged(EasyMock.capture(mc)); - } else { - queue.addUnacknowledged(EasyMock.isA(Message.class)); - } + oqueue.addUnacknowledged(EasyMock.isA(Message.class)); + EasyMock.expectLastCall(); + oqueue.start(); EasyMock.expectLastCall(); - queue.start(); + iqueue.start(); EasyMock.expectLastCall(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java index a5f9723..87d8e6e 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java @@ -68,6 +68,8 @@ public abstract class RMTxStoreTestBase extends Assert { private static SequenceAcknowledgement ack1; private static SequenceAcknowledgement ack2; + private static final long TIME = System.currentTimeMillis(); + protected IMocksControl control; public static void setUpOnce() { @@ -867,6 +869,7 @@ public abstract class RMTxStoreTestBase extends Assert { EasyMock.expect(msg.getTo()).andReturn(to).anyTimes(); EasyMock.expect(msg.getContentType()).andReturn("text/xml").anyTimes(); + EasyMock.expect(msg.getCreatedTime()).andReturn(TIME); byte[] value = ("Message " + mn.longValue()).getBytes(); ByteArrayInputStream bais = new ByteArrayInputStream(value); CachedOutputStream cos = new CachedOutputStream(); @@ -936,6 +939,7 @@ public abstract class RMTxStoreTestBase extends Assert { } else { assertNull(msg.getTo()); } + assertEquals(TIME, msg.getCreatedTime()); try { InputStream actual = msg.getContent().getInputStream(); assertEquals(new String("Message " + mn), IOUtils.readStringFromStream(actual)); http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RedeliveryTest.java ---------------------------------------------------------------------- diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RedeliveryTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RedeliveryTest.java new file mode 100644 index 0000000..91ada49 --- /dev/null +++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RedeliveryTest.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.ws.rm; + +import java.util.logging.Logger; + +import javax.xml.ws.Endpoint; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.bus.spring.SpringBusFactory; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.greeter_control.Greeter; +import org.apache.cxf.greeter_control.GreeterService; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; +import org.apache.cxf.ws.rm.RMConfiguration; +import org.apache.cxf.ws.rm.RMManager; +import org.apache.cxf.ws.rm.manager.RetryPolicyType; +import org.apache.cxf.ws.rm.persistence.jdbc.RMTxStore; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the redelivery of the message upon a delivery error. + */ +public class RedeliveryTest extends AbstractBusClientServerTestBase { + public static final String PORT = allocatePort(Server.class); + private static final Logger LOG = LogUtils.getLogger(RedeliveryTest.class); + + private static GreeterRecorderImpl serverGreeter; + private static Bus serverBus; + private Greeter greeter; + + + public static class Server extends AbstractBusTestServerBase { + String port; + String pfx; + Endpoint ep; + + public Server(String args[]) { + port = args[0]; + pfx = args[1]; + } + + protected void run() { + SpringBusFactory bf = new SpringBusFactory(); + // use a at-most-once server with sync ack processing + System.setProperty("db.name", pfx + "-server"); + serverBus = bf.createBus("/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml"); + System.clearProperty("db.name"); + BusFactory.setDefaultBus(serverBus); + RMManager manager = serverBus.getExtension(RMManager.class); + RMConfiguration cfg = manager.getConfiguration(); + cfg.setAcknowledgementInterval(0L); + + RetryPolicyType rp = new RetryPolicyType(); + rp.setMaxRetries(-1); + serverBus.getExtension(RMManager.class).getDestinationPolicy().setRetryPolicy(rp); + serverGreeter = new GreeterRecorderImpl(); + String address = "http://localhost:" + port + "/SoapContext/GreeterPort"; + + // publish this robust oneway endpoint + ep = Endpoint.create(serverGreeter); + ep.publish(address); + LOG.info("Published greeter endpoint."); + BusFactory.setDefaultBus(null); + BusFactory.setThreadDefaultBus(null); + } + public void tearDown() { + ep.stop(); + ep = null; + } + + public static void main(String[] args) { + try { + Server s = new Server(args); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + } + + @BeforeClass + public static void startServers() throws Exception { + RMTxStore.deleteDatabaseFiles("redlv-server", true); + assertTrue("server did not launch correctly", + launchServer(Server.class, null, new String[]{PORT, "redlv"}, true)); + } + + @AfterClass + public static void cleanUpDerby() throws Exception { + RMTxStore.deleteDatabaseFiles("redlv-server", true); + } + + @Test + public void testAutomaticRedeliveryAfterError() throws Exception { + LOG.fine("Creating greeter client"); + SpringBusFactory bf = new SpringBusFactory(); + bus = bf.createBus("/org/apache/cxf/systest/ws/rm/rminterceptors.xml"); + // set the client retry interval much shorter than the slow processing delay + RMManager manager = bus.getExtension(RMManager.class); + RMConfiguration cfg = manager.getConfiguration(); + cfg.setBaseRetransmissionInterval(3000L); + + BusFactory.setDefaultBus(bus); + GreeterService gs = new GreeterService(); + greeter = gs.getGreeterPort(); + updateAddressPort(greeter, PORT); + + assertNull("last greeted by none", serverGreeter.getValue()); + + LOG.fine("Invoking greeter for one"); + greeter.greetMeOneWay("one"); + LOG.fine("Wait for 4 secs ..."); + Thread.sleep(4000); + + assertEquals("last greeted by one", "one", serverGreeter.getValue()); + assertTrue("retransmission running", manager.getRetransmissionQueue().isEmpty()); + + LOG.fine("Activating the error trigger and invoking greeter for two"); + serverGreeter.setThrowAlways(true); + greeter.greetMeOneWay("two"); + LOG.fine("Wait for 4 secs ..."); + Thread.sleep(4000); + + RMManager serverManager = serverBus.getExtension(RMManager.class); + + assertEquals("last greeted by one", "one", serverGreeter.getValue()); + assertTrue("retransmission running", manager.getRetransmissionQueue().isEmpty()); + assertFalse("redelivery not running", serverManager.getRedeliveryQueue().isEmpty()); + + LOG.fine("Deactivating the error trigger and wait for 9 secs ..."); + serverGreeter.setThrowAlways(false); + Thread.sleep(9000); + + assertEquals("last greeted by two", "two", serverGreeter.getValue()); + assertTrue("redelivery running", serverManager.getRedeliveryQueue().isEmpty()); + } + + private static class GreeterRecorderImpl extends GreeterImpl { + private String value; + private boolean ex; + + public void greetMeOneWay(String arg0) { + if (ex) { + throw new RuntimeException("intentional exception"); + } + super.greetMeOneWay(arg0); + value = arg0; + } + + public String getValue() { + return value; + } + + @Override + public void setThrowAlways(boolean b) { + super.setThrowAlways(b); + ex = b; + } + } +}