Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 576609805 for ; Fri, 6 Apr 2012 16:21:51 +0000 (UTC) Received: (qmail 88027 invoked by uid 500); 6 Apr 2012 16:21:51 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 87961 invoked by uid 500); 6 Apr 2012 16:21:51 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 87954 invoked by uid 99); 6 Apr 2012 16:21:51 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Apr 2012 16:21:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Apr 2012 16:21:40 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 91BE52388B3A; Fri, 6 Apr 2012 16:21:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1310446 [2/4] - in /cxf/trunk: rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/wsdl/extensions/ rt/core/src/main/java/org/apache/cxf/bus/extension/ rt/core/src/main/java/org/apache/cxf/bus/osgi/ rt/core/src/main/java/org/apache/... Date: Fri, 06 Apr 2012 16:21:15 -0000 To: commits@cxf.apache.org From: dkulp@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120406162117.91BE52388B3A@eris.apache.org> Modified: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java?rev=1310446&r1=1310445&r2=1310446&view=diff ============================================================================== --- cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java (original) +++ cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java Fri Apr 6 16:21:14 2012 @@ -1,139 +1,139 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.transport.common.gzip; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.cxf.helpers.HttpHeaderHelper; -import org.apache.cxf.interceptor.Fault; -import org.apache.cxf.interceptor.InterceptorChain; -import org.apache.cxf.message.Exchange; -import org.apache.cxf.message.ExchangeImpl; -import org.apache.cxf.message.Message; -import org.apache.cxf.message.MessageImpl; -import org.easymock.EasyMock; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.FORCE; -import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.YES; - -/** - * Test for the parsing of Accept-Encoding by the GZIPOutInterceptor. For - * Accept-Encoding values that enable gzip we expect an extra interceptor to be - * added to the out message, and the {@link GZIPOutInterceptor#USE_GZIP_KEY} to - * be set correctly. For Accept-Encoding values that do not enable gzip the - * interceptor should not be added. - */ -public class GZIPAcceptEncodingTest extends Assert { - - private GZIPOutInterceptor interceptor; - private Message inMessage; - private Message outMessage; - private InterceptorChain outInterceptors; - - @Before - public void setUp() throws Exception { - interceptor = new GZIPOutInterceptor(); - inMessage = new MessageImpl(); - outMessage = new MessageImpl(); - Exchange exchange = new ExchangeImpl(); - exchange.setInMessage(inMessage); - inMessage.setExchange(exchange); - inMessage.setContent(InputStream.class, new ByteArrayInputStream(new byte[0])); - exchange.setOutMessage(outMessage); - outMessage.setExchange(exchange); - outMessage.setContent(OutputStream.class, new ByteArrayOutputStream()); - outInterceptors = EasyMock.createMock(InterceptorChain.class); - outMessage.setInterceptorChain(outInterceptors); - } - - @Test - public void testNoAcceptEncoding() throws Exception { - EasyMock.replay(outInterceptors); - interceptor.handleMessage(outMessage); - } - - @Test - public void testAcceptGzip() throws Exception { - singleTest("gzip", true, YES, "gzip"); - } - - @Test - public void testAcceptXGzip() throws Exception { - singleTest("x-gzip, x-compress", true, YES, "x-gzip"); - } - - @Test - public void testAcceptStar() throws Exception { - singleTest("*", true, YES, "gzip"); - } - - @Test - public void testAcceptOnlyGzip() throws Exception { - singleTest("gzip, identity; q=0", true, FORCE, "gzip"); - } - - @Test - public void testOnlyIdentitySupported() throws Exception { - singleTest("deflate", false, null, null); - } - - @Test - public void testGzipExplicitlyDisabled() throws Exception { - singleTest("gzip; q=0.00", false, null, null); - } - - @Test(expected = Fault.class) - public void testNoValidEncodings() throws Exception { - EasyMock.replay(); - setAcceptEncoding("*;q=0, deflate;q=0.5"); - interceptor.handleMessage(outMessage); - } - - private void singleTest(String encoding, boolean expectEndingInterceptor, - GZIPOutInterceptor.UseGzip expectedUseGzip, String expectedGzipEncoding) - throws Exception { - - EasyMock.replay(outInterceptors); - setAcceptEncoding(encoding); - interceptor.handleMessage(outMessage); - assertSame("Wrong value of " + GZIPOutInterceptor.USE_GZIP_KEY, expectedUseGzip, outMessage - .get(GZIPOutInterceptor.USE_GZIP_KEY)); - assertEquals("Wrong value of " + GZIPOutInterceptor.GZIP_ENCODING_KEY, expectedGzipEncoding, - outMessage.get(GZIPOutInterceptor.GZIP_ENCODING_KEY)); - } - - private void setAcceptEncoding(String enc) { - Map> protocolHeaders - = new TreeMap>(String.CASE_INSENSITIVE_ORDER); - protocolHeaders.put(HttpHeaderHelper.getHeaderKey(HttpHeaderHelper.ACCEPT_ENCODING), Collections - .singletonList(enc)); - inMessage.put(Message.PROTOCOL_HEADERS, protocolHeaders); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.common.gzip; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.cxf.helpers.HttpHeaderHelper; +import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.interceptor.InterceptorChain; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.ExchangeImpl; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageImpl; +import org.easymock.EasyMock; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.FORCE; +import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.YES; + +/** + * Test for the parsing of Accept-Encoding by the GZIPOutInterceptor. For + * Accept-Encoding values that enable gzip we expect an extra interceptor to be + * added to the out message, and the {@link GZIPOutInterceptor#USE_GZIP_KEY} to + * be set correctly. For Accept-Encoding values that do not enable gzip the + * interceptor should not be added. + */ +public class GZIPAcceptEncodingTest extends Assert { + + private GZIPOutInterceptor interceptor; + private Message inMessage; + private Message outMessage; + private InterceptorChain outInterceptors; + + @Before + public void setUp() throws Exception { + interceptor = new GZIPOutInterceptor(); + inMessage = new MessageImpl(); + outMessage = new MessageImpl(); + Exchange exchange = new ExchangeImpl(); + exchange.setInMessage(inMessage); + inMessage.setExchange(exchange); + inMessage.setContent(InputStream.class, new ByteArrayInputStream(new byte[0])); + exchange.setOutMessage(outMessage); + outMessage.setExchange(exchange); + outMessage.setContent(OutputStream.class, new ByteArrayOutputStream()); + outInterceptors = EasyMock.createMock(InterceptorChain.class); + outMessage.setInterceptorChain(outInterceptors); + } + + @Test + public void testNoAcceptEncoding() throws Exception { + EasyMock.replay(outInterceptors); + interceptor.handleMessage(outMessage); + } + + @Test + public void testAcceptGzip() throws Exception { + singleTest("gzip", true, YES, "gzip"); + } + + @Test + public void testAcceptXGzip() throws Exception { + singleTest("x-gzip, x-compress", true, YES, "x-gzip"); + } + + @Test + public void testAcceptStar() throws Exception { + singleTest("*", true, YES, "gzip"); + } + + @Test + public void testAcceptOnlyGzip() throws Exception { + singleTest("gzip, identity; q=0", true, FORCE, "gzip"); + } + + @Test + public void testOnlyIdentitySupported() throws Exception { + singleTest("deflate", false, null, null); + } + + @Test + public void testGzipExplicitlyDisabled() throws Exception { + singleTest("gzip; q=0.00", false, null, null); + } + + @Test(expected = Fault.class) + public void testNoValidEncodings() throws Exception { + EasyMock.replay(); + setAcceptEncoding("*;q=0, deflate;q=0.5"); + interceptor.handleMessage(outMessage); + } + + private void singleTest(String encoding, boolean expectEndingInterceptor, + GZIPOutInterceptor.UseGzip expectedUseGzip, String expectedGzipEncoding) + throws Exception { + + EasyMock.replay(outInterceptors); + setAcceptEncoding(encoding); + interceptor.handleMessage(outMessage); + assertSame("Wrong value of " + GZIPOutInterceptor.USE_GZIP_KEY, expectedUseGzip, outMessage + .get(GZIPOutInterceptor.USE_GZIP_KEY)); + assertEquals("Wrong value of " + GZIPOutInterceptor.GZIP_ENCODING_KEY, expectedGzipEncoding, + outMessage.get(GZIPOutInterceptor.GZIP_ENCODING_KEY)); + } + + private void setAcceptEncoding(String enc) { + Map> protocolHeaders + = new TreeMap>(String.CASE_INSENSITIVE_ORDER); + protocolHeaders.put(HttpHeaderHelper.getHeaderKey(HttpHeaderHelper.ACCEPT_ENCODING), Collections + .singletonList(enc)); + inMessage.put(Message.PROTOCOL_HEADERS, protocolHeaders); + } +} Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java?rev=1310446&r1=1310445&r2=1310446&view=diff ============================================================================== --- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java (original) +++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java Fri Apr 6 16:21:14 2012 @@ -1,165 +1,165 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.clustering; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.xml.namespace.QName; - -import org.apache.cxf.common.logging.LogUtils; -import org.apache.cxf.endpoint.Endpoint; -import org.apache.cxf.message.Exchange; -import org.apache.cxf.service.model.EndpointInfo; -import org.apache.cxf.service.model.ServiceInfo; - -/** - * Failover strategy based on a static cluster represented by - * multiple endpoints associated with the same service instance. - */ -public abstract class AbstractStaticFailoverStrategy implements FailoverStrategy { - private static final Logger LOG = - LogUtils.getL7dLogger(AbstractStaticFailoverStrategy.class); - - private List alternateAddresses; - private long delayBetweenRetries; - - public void setDelayBetweenRetries(long delay) { - this.delayBetweenRetries = delay; - } - - public long getDelayBetweenRetries() { - return this.delayBetweenRetries; - } - - public void setAlternateAddresses(List alternateAddresses) { - this.alternateAddresses = alternateAddresses; - } - - /** - * Get the alternate addresses for this invocation. - * - * @param exchange the current Exchange - * @return a List of alternate addresses if available - */ - public List getAlternateAddresses(Exchange exchange) { - return alternateAddresses != null - ? new ArrayList(alternateAddresses) - : null; - } - - /** - * Select one of the alternate addresses for a retried invocation. - * - * @param a List of alternate addresses if available - * @return the selected address - */ - public String selectAlternateAddress(List alternates) { - String selected = null; - if (alternates != null && alternates.size() > 0) { - selected = getNextAlternate(alternates); - LOG.log(Level.WARNING, - "FAILING_OVER_TO_ADDRESS_OVERRIDE", - selected); - } else { - LOG.warning("NO_ALTERNATE_TARGETS_REMAIN"); - } - return selected; - } - - /** - * Get the alternate endpoints for this invocation. - * - * @param exchange the current Exchange - * @return a List of alternate endpoints if available - */ - public List getAlternateEndpoints(Exchange exchange) { - return getEndpoints(exchange, false); - } - - /** - * Select one of the alternate endpoints for a retried invocation. - * - * @param a List of alternate endpoints if available - * @return the selected endpoint - */ - public Endpoint selectAlternateEndpoint(List alternates) { - Endpoint selected = null; - if (alternates != null && alternates.size() > 0) { - selected = getNextAlternate(alternates); - LOG.log(Level.WARNING, - "FAILING_OVER_TO_ALTERNATE_ENDPOINT", - new Object[] {selected.getEndpointInfo().getName(), - selected.getEndpointInfo().getAddress()}); - } else { - LOG.warning("NO_ALTERNATE_TARGETS_REMAIN"); - } - return selected; - } - - /** - * Get the endpoints for this invocation. - * - * @param exchange the current Exchange - * @param acceptCandidatesWithSameAddress true to accept candidates with the same address - * @return a List of alternate endpoints if available - */ - protected List getEndpoints(Exchange exchange, boolean acceptCandidatesWithSameAddress) { - Endpoint endpoint = exchange.get(Endpoint.class); - Collection services = endpoint.getService().getServiceInfos(); - QName currentBinding = endpoint.getBinding().getBindingInfo().getName(); - List alternates = new ArrayList(); - for (ServiceInfo service : services) { - Collection candidates = service.getEndpoints(); - for (EndpointInfo candidate : candidates) { - QName candidateBinding = candidate.getBinding().getName(); - if (candidateBinding.equals(currentBinding)) { - if (acceptCandidatesWithSameAddress || !candidate.getAddress().equals( - endpoint.getEndpointInfo().getAddress())) { - Endpoint alternate = - endpoint.getService().getEndpoints().get(candidate.getName()); - if (alternate != null) { - LOG.log(Level.INFO, - "FAILOVER_CANDIDATE_ACCEPTED", - candidate.getName()); - alternates.add(alternate); - } - } - } else { - LOG.log(Level.INFO, - "FAILOVER_CANDIDATE_REJECTED", - new Object[] {candidate.getName(), candidateBinding}); - } - } - } - return alternates; - } - - /** - * Get next alternate endpoint. - * - * @param alternates non-empty List of alternate endpoints - * @return - */ - protected abstract T getNextAlternate(List alternates); -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.clustering; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.xml.namespace.QName; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.service.model.ServiceInfo; + +/** + * Failover strategy based on a static cluster represented by + * multiple endpoints associated with the same service instance. + */ +public abstract class AbstractStaticFailoverStrategy implements FailoverStrategy { + private static final Logger LOG = + LogUtils.getL7dLogger(AbstractStaticFailoverStrategy.class); + + private List alternateAddresses; + private long delayBetweenRetries; + + public void setDelayBetweenRetries(long delay) { + this.delayBetweenRetries = delay; + } + + public long getDelayBetweenRetries() { + return this.delayBetweenRetries; + } + + public void setAlternateAddresses(List alternateAddresses) { + this.alternateAddresses = alternateAddresses; + } + + /** + * Get the alternate addresses for this invocation. + * + * @param exchange the current Exchange + * @return a List of alternate addresses if available + */ + public List getAlternateAddresses(Exchange exchange) { + return alternateAddresses != null + ? new ArrayList(alternateAddresses) + : null; + } + + /** + * Select one of the alternate addresses for a retried invocation. + * + * @param a List of alternate addresses if available + * @return the selected address + */ + public String selectAlternateAddress(List alternates) { + String selected = null; + if (alternates != null && alternates.size() > 0) { + selected = getNextAlternate(alternates); + LOG.log(Level.WARNING, + "FAILING_OVER_TO_ADDRESS_OVERRIDE", + selected); + } else { + LOG.warning("NO_ALTERNATE_TARGETS_REMAIN"); + } + return selected; + } + + /** + * Get the alternate endpoints for this invocation. + * + * @param exchange the current Exchange + * @return a List of alternate endpoints if available + */ + public List getAlternateEndpoints(Exchange exchange) { + return getEndpoints(exchange, false); + } + + /** + * Select one of the alternate endpoints for a retried invocation. + * + * @param a List of alternate endpoints if available + * @return the selected endpoint + */ + public Endpoint selectAlternateEndpoint(List alternates) { + Endpoint selected = null; + if (alternates != null && alternates.size() > 0) { + selected = getNextAlternate(alternates); + LOG.log(Level.WARNING, + "FAILING_OVER_TO_ALTERNATE_ENDPOINT", + new Object[] {selected.getEndpointInfo().getName(), + selected.getEndpointInfo().getAddress()}); + } else { + LOG.warning("NO_ALTERNATE_TARGETS_REMAIN"); + } + return selected; + } + + /** + * Get the endpoints for this invocation. + * + * @param exchange the current Exchange + * @param acceptCandidatesWithSameAddress true to accept candidates with the same address + * @return a List of alternate endpoints if available + */ + protected List getEndpoints(Exchange exchange, boolean acceptCandidatesWithSameAddress) { + Endpoint endpoint = exchange.get(Endpoint.class); + Collection services = endpoint.getService().getServiceInfos(); + QName currentBinding = endpoint.getBinding().getBindingInfo().getName(); + List alternates = new ArrayList(); + for (ServiceInfo service : services) { + Collection candidates = service.getEndpoints(); + for (EndpointInfo candidate : candidates) { + QName candidateBinding = candidate.getBinding().getName(); + if (candidateBinding.equals(currentBinding)) { + if (acceptCandidatesWithSameAddress || !candidate.getAddress().equals( + endpoint.getEndpointInfo().getAddress())) { + Endpoint alternate = + endpoint.getService().getEndpoints().get(candidate.getName()); + if (alternate != null) { + LOG.log(Level.INFO, + "FAILOVER_CANDIDATE_ACCEPTED", + candidate.getName()); + alternates.add(alternate); + } + } + } else { + LOG.log(Level.INFO, + "FAILOVER_CANDIDATE_REJECTED", + new Object[] {candidate.getName(), candidateBinding}); + } + } + } + return alternates; + } + + /** + * Get next alternate endpoint. + * + * @param alternates non-empty List of alternate endpoints + * @return + */ + protected abstract T getNextAlternate(List alternates); +} Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java?rev=1310446&r1=1310445&r2=1310446&view=diff ============================================================================== --- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java (original) +++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java Fri Apr 6 16:21:14 2012 @@ -1,84 +1,84 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.clustering; - -import org.apache.cxf.Bus; -import org.apache.cxf.annotations.EvaluateAllEndpoints; -import org.apache.cxf.common.injection.NoJSR250Annotations; -import org.apache.cxf.endpoint.Client; -import org.apache.cxf.endpoint.ConduitSelector; -import org.apache.cxf.endpoint.ConduitSelectorHolder; -import org.apache.cxf.endpoint.Endpoint; -import org.apache.cxf.feature.AbstractFeature; -import org.apache.cxf.interceptor.InterceptorProvider; - -/** - * This feature may be applied to a Client so as to enable - * failover from the initial target endpoint to any other - * compatible endpoint for the target service. - */ -@NoJSR250Annotations -@EvaluateAllEndpoints -public class FailoverFeature extends AbstractFeature { - - private FailoverStrategy failoverStrategy; - private FailoverTargetSelector targetSelector; - - @Override - protected void initializeProvider(InterceptorProvider provider, Bus bus) { - if (provider instanceof ConduitSelectorHolder) { - ConduitSelectorHolder csHolder = (ConduitSelectorHolder) provider; - Endpoint endpoint = csHolder.getConduitSelector().getEndpoint(); - ConduitSelector conduitSelector = initTargetSelector(endpoint); - csHolder.setConduitSelector(conduitSelector); - } - } - - @Override - public void initialize(Client client, Bus bus) { - ConduitSelector selector = initTargetSelector(client.getConduitSelector().getEndpoint()); - client.setConduitSelector(selector); - } - - protected ConduitSelector initTargetSelector(Endpoint endpoint) { - FailoverTargetSelector selector = getTargetSelector(); - selector.setEndpoint(endpoint); - selector.setStrategy(getStrategy()); - return selector; - } - - public FailoverTargetSelector getTargetSelector() { - if (this.targetSelector == null) { - this.targetSelector = new FailoverTargetSelector(); - } - return this.targetSelector; - } - - public void setTargetSelector(FailoverTargetSelector selector) { - this.targetSelector = selector; - } - - public void setStrategy(FailoverStrategy strategy) { - failoverStrategy = strategy; - } - - public FailoverStrategy getStrategy() { - return failoverStrategy; - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.clustering; + +import org.apache.cxf.Bus; +import org.apache.cxf.annotations.EvaluateAllEndpoints; +import org.apache.cxf.common.injection.NoJSR250Annotations; +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.endpoint.ConduitSelector; +import org.apache.cxf.endpoint.ConduitSelectorHolder; +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.feature.AbstractFeature; +import org.apache.cxf.interceptor.InterceptorProvider; + +/** + * This feature may be applied to a Client so as to enable + * failover from the initial target endpoint to any other + * compatible endpoint for the target service. + */ +@NoJSR250Annotations +@EvaluateAllEndpoints +public class FailoverFeature extends AbstractFeature { + + private FailoverStrategy failoverStrategy; + private FailoverTargetSelector targetSelector; + + @Override + protected void initializeProvider(InterceptorProvider provider, Bus bus) { + if (provider instanceof ConduitSelectorHolder) { + ConduitSelectorHolder csHolder = (ConduitSelectorHolder) provider; + Endpoint endpoint = csHolder.getConduitSelector().getEndpoint(); + ConduitSelector conduitSelector = initTargetSelector(endpoint); + csHolder.setConduitSelector(conduitSelector); + } + } + + @Override + public void initialize(Client client, Bus bus) { + ConduitSelector selector = initTargetSelector(client.getConduitSelector().getEndpoint()); + client.setConduitSelector(selector); + } + + protected ConduitSelector initTargetSelector(Endpoint endpoint) { + FailoverTargetSelector selector = getTargetSelector(); + selector.setEndpoint(endpoint); + selector.setStrategy(getStrategy()); + return selector; + } + + public FailoverTargetSelector getTargetSelector() { + if (this.targetSelector == null) { + this.targetSelector = new FailoverTargetSelector(); + } + return this.targetSelector; + } + + public void setTargetSelector(FailoverTargetSelector selector) { + this.targetSelector = selector; + } + + public void setStrategy(FailoverStrategy strategy) { + failoverStrategy = strategy; + } + + public FailoverStrategy getStrategy() { + return failoverStrategy; + } +} Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java?rev=1310446&r1=1310445&r2=1310446&view=diff ============================================================================== --- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java (original) +++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java Fri Apr 6 16:21:14 2012 @@ -1,64 +1,64 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.clustering; - -import java.util.List; - -import org.apache.cxf.endpoint.Endpoint; -import org.apache.cxf.message.Exchange; - -/** - * Supports pluggable strategies for alternate endpoint selection on - * failover. - */ -public interface FailoverStrategy { - /** - * Get the alternate endpoints for this invocation. - * - * @param exchange the current Exchange - * @return a failover endpoint if one is available - */ - List getAlternateEndpoints(Exchange exchange); - - /** - * Select one of the alternate endpoints for a retried invocation. - * - * @param alternates List of alternate endpoints if available - * @return the selected endpoint - */ - Endpoint selectAlternateEndpoint(List alternates); - - /** - * Get the alternate addresses for this invocation. - * These addresses over-ride any addresses specified in the WSDL. - * - * @param exchange the current Exchange - * @return a failover endpoint if one is available - */ - List getAlternateAddresses(Exchange exchange); - - /** - * Select one of the alternate addresses for a retried invocation. - * - * @param addresses List of alternate addresses if available - * @return the selected address - */ - String selectAlternateAddress(List addresses); -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.clustering; + +import java.util.List; + +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.message.Exchange; + +/** + * Supports pluggable strategies for alternate endpoint selection on + * failover. + */ +public interface FailoverStrategy { + /** + * Get the alternate endpoints for this invocation. + * + * @param exchange the current Exchange + * @return a failover endpoint if one is available + */ + List getAlternateEndpoints(Exchange exchange); + + /** + * Select one of the alternate endpoints for a retried invocation. + * + * @param alternates List of alternate endpoints if available + * @return the selected endpoint + */ + Endpoint selectAlternateEndpoint(List alternates); + + /** + * Get the alternate addresses for this invocation. + * These addresses over-ride any addresses specified in the WSDL. + * + * @param exchange the current Exchange + * @return a failover endpoint if one is available + */ + List getAlternateAddresses(Exchange exchange); + + /** + * Select one of the alternate addresses for a retried invocation. + * + * @param addresses List of alternate addresses if available + * @return the selected address + */ + String selectAlternateAddress(List addresses); +} Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java?rev=1310446&r1=1310445&r2=1310446&view=diff ============================================================================== --- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java (original) +++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java Fri Apr 6 16:21:14 2012 @@ -1,436 +1,436 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.clustering; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.cxf.common.logging.LogUtils; -import org.apache.cxf.endpoint.AbstractConduitSelector; -import org.apache.cxf.endpoint.Client; -import org.apache.cxf.endpoint.Endpoint; -import org.apache.cxf.endpoint.Retryable; -import org.apache.cxf.helpers.CastUtils; -import org.apache.cxf.message.Exchange; -import org.apache.cxf.message.Message; -import org.apache.cxf.service.model.BindingOperationInfo; -import org.apache.cxf.transport.Conduit; - - -/** - * Implements a target selection strategy based on failover to an - * alternate target endpoint when a transport level failure is - * encountered. - * Note that this feature changes the conduit on the fly and thus makes - * the Client not thread safe. - */ -public class FailoverTargetSelector extends AbstractConduitSelector { - - private static final Logger LOG = - LogUtils.getL7dLogger(FailoverTargetSelector.class); - protected Map inProgress - = new ConcurrentHashMap();; - protected FailoverStrategy failoverStrategy; - - /** - * Normal constructor. - */ - public FailoverTargetSelector() { - super(); - } - - /** - * Constructor, allowing a specific conduit to override normal selection. - * - * @param c specific conduit - */ - public FailoverTargetSelector(Conduit c) { - super(c); - } - - /** - * Called prior to the interceptor chain being traversed. - * - * @param message the current Message - */ - public synchronized void prepare(Message message) { - Exchange exchange = message.getExchange(); - InvocationKey key = new InvocationKey(exchange); - if (!inProgress.containsKey(key)) { - Endpoint endpoint = exchange.get(Endpoint.class); - BindingOperationInfo bindingOperationInfo = - exchange.getBindingOperationInfo(); - Object[] params = message.getContent(List.class).toArray(); - Map context = - CastUtils.cast((Map)message.get(Message.INVOCATION_CONTEXT)); - InvocationContext invocation = - new InvocationContext(endpoint, - bindingOperationInfo, - params, - context); - inProgress.put(key, invocation); - } - } - - /** - * Called when a Conduit is actually required. - * - * @param message - * @return the Conduit to use for mediation of the message - */ - public Conduit selectConduit(Message message) { - Conduit c = message.get(Conduit.class); - if (c != null) { - return c; - } - return getSelectedConduit(message); - } - - /** - * Called on completion of the MEP for which the Conduit was required. - * - * @param exchange represents the completed MEP - */ - public void complete(Exchange exchange) { - InvocationKey key = new InvocationKey(exchange); - InvocationContext invocation = null; - synchronized (this) { - invocation = inProgress.get(key); - } - boolean failover = false; - if (requiresFailover(exchange)) { - Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName()); - - Endpoint failoverTarget = getFailoverTarget(exchange, invocation); - if (failoverTarget != null) { - setEndpoint(failoverTarget); - if (old != null) { - old.close(); - conduits.remove(old); - } - Exception prevExchangeFault = - (Exception)exchange.remove(Exception.class.getName()); - Message outMessage = exchange.getOutMessage(); - Exception prevMessageFault = - outMessage.getContent(Exception.class); - outMessage.setContent(Exception.class, null); - overrideAddressProperty(invocation.getContext()); - Retryable retry = exchange.get(Retryable.class); - exchange.clear(); - if (retry != null) { - try { - failover = true; - long delay = getDelayBetweenRetries(); - if (delay > 0) { - Thread.sleep(delay); - } - retry.invoke(invocation.getBindingOperationInfo(), - invocation.getParams(), - invocation.getContext(), - exchange); - } catch (Exception e) { - if (exchange.get(Exception.class) != null) { - exchange.put(Exception.class, prevExchangeFault); - } - if (outMessage.getContent(Exception.class) != null) { - outMessage.setContent(Exception.class, - prevMessageFault); - } - } - } - } else { - setEndpoint(invocation.retrieveOriginalEndpoint(endpoint)); - } - } - if (!failover) { - getLogger().fine("FAILOVER_NOT_REQUIRED"); - synchronized (this) { - inProgress.remove(key); - } - super.complete(exchange); - } - } - - /** - * @param strategy the FailoverStrategy to use - */ - public synchronized void setStrategy(FailoverStrategy strategy) { - getLogger().log(Level.INFO, "USING_STRATEGY", new Object[] {strategy}); - failoverStrategy = strategy; - } - - /** - * @return strategy the FailoverStrategy to use - */ - public synchronized FailoverStrategy getStrategy() { - if (failoverStrategy == null) { - failoverStrategy = new SequentialStrategy(); - getLogger().log(Level.INFO, - "USING_STRATEGY", - new Object[] {failoverStrategy}); - } - return failoverStrategy; - } - - /** - * @return the logger to use - */ - protected Logger getLogger() { - return LOG; - } - - /** - * Returns delay (in milliseconds) between retries - * @return delay, 0 means no delay - */ - protected long getDelayBetweenRetries() { - FailoverStrategy strategy = getStrategy(); - if (strategy instanceof AbstractStaticFailoverStrategy) { - return ((AbstractStaticFailoverStrategy)strategy).getDelayBetweenRetries(); - } - //perhaps supporting FailoverTargetSelector specific property can make sense too - return 0; - } - - /** - * Check if the exchange is suitable for a failover. - * - * @param exchange the current Exchange - * @return boolean true if a failover should be attempted - */ - protected boolean requiresFailover(Exchange exchange) { - Message outMessage = exchange.getOutMessage(); - Exception ex = outMessage.get(Exception.class) != null - ? outMessage.get(Exception.class) - : exchange.get(Exception.class); - getLogger().log(Level.FINE, - "CHECK_LAST_INVOKE_FAILED", - new Object[] {ex != null}); - Throwable curr = ex; - boolean failover = false; - while (curr != null) { - failover = curr instanceof java.io.IOException; - curr = curr.getCause(); - } - if (ex != null) { - getLogger().log(Level.INFO, - "CHECK_FAILURE_IN_TRANSPORT", - new Object[] {ex, failover}); - } - return failover; - } - - /** - * Get the failover target endpoint, if a suitable one is available. - * - * @param exchange the current Exchange - * @param invocation the current InvocationContext - * @return a failover endpoint if one is available - */ - protected Endpoint getFailoverTarget(Exchange exchange, - InvocationContext invocation) { - List alternateAddresses = null; - if (!invocation.hasAlternates()) { - // no previous failover attempt on this invocation - // - alternateAddresses = - getStrategy().getAlternateAddresses(exchange); - if (alternateAddresses != null) { - invocation.setAlternateAddresses(alternateAddresses); - } else { - invocation.setAlternateEndpoints( - getStrategy().getAlternateEndpoints(exchange)); - } - } else { - alternateAddresses = invocation.getAlternateAddresses(); - } - - Endpoint failoverTarget = null; - if (alternateAddresses != null) { - String alternateAddress = - getStrategy().selectAlternateAddress(alternateAddresses); - if (alternateAddress != null) { - // re-use current endpoint - // - failoverTarget = getEndpoint(); - - failoverTarget.getEndpointInfo().setAddress(alternateAddress); - } - } else { - failoverTarget = getStrategy().selectAlternateEndpoint( - invocation.getAlternateEndpoints()); - } - return failoverTarget; - } - - /** - * Override the ENDPOINT_ADDRESS property in the request context - * - * @param context the request context - */ - protected void overrideAddressProperty(Map context) { - overrideAddressProperty(context, getEndpoint().getEndpointInfo().getAddress()); - } - - protected void overrideAddressProperty(Map context, - String address) { - Map requestContext = - CastUtils.cast((Map)context.get(Client.REQUEST_CONTEXT)); - if (requestContext != null) { - requestContext.put(Message.ENDPOINT_ADDRESS, address); - requestContext.put("javax.xml.ws.service.endpoint.address", address); - } - } - - // Some conduits may replace the endpoint address after it has already been prepared - // but before the invocation has been done (ex, org.apache.cxf.clustering.LoadDistributorTargetSelector) - // which may affect JAX-RS clients where actual endpoint address property may include additional path - // segments. - protected boolean replaceEndpointAddressPropertyIfNeeded(Message message, - String endpointAddress, - Conduit cond) { - String requestURI = (String)message.get(Message.REQUEST_URI); - if (requestURI != null && endpointAddress != null && !requestURI.startsWith(endpointAddress)) { - String basePath = (String)message.get(Message.BASE_PATH); - if (basePath != null && requestURI.startsWith(basePath)) { - String pathInfo = requestURI.substring(basePath.length()); - message.put(Message.BASE_PATH, endpointAddress); - final String slash = "/"; - boolean startsWithSlash = pathInfo.startsWith(slash); - if (endpointAddress.endsWith(slash)) { - endpointAddress = endpointAddress + (startsWithSlash ? pathInfo.substring(1) : pathInfo); - } else { - endpointAddress = endpointAddress + (startsWithSlash ? pathInfo : (slash + pathInfo)); - } - message.put(Message.ENDPOINT_ADDRESS, endpointAddress); - - Exchange exchange = message.getExchange(); - InvocationKey key = new InvocationKey(exchange); - InvocationContext invocation = inProgress.get(key); - if (invocation != null) { - overrideAddressProperty(invocation.getContext(), - cond.getTarget().getAddress().getValue()); - } - return true; - } - } - return false; - } - - /** - * Used to wrap an Exchange for usage as a Map key. The raw Exchange - * is not a suitable key type, as the hashCode is computed from its - * current contents, which may obviously change over the lifetime of - * an invocation. - */ - protected static class InvocationKey { - private Exchange exchange; - - InvocationKey(Exchange ex) { - exchange = ex; - } - - @Override - public int hashCode() { - return System.identityHashCode(exchange); - } - - @Override - public boolean equals(Object o) { - return o instanceof InvocationKey - && exchange == ((InvocationKey)o).exchange; - } - } - - - /** - * Records the context of an invocation. - */ - protected class InvocationContext { - private Endpoint originalEndpoint; - private String originalAddress; - private BindingOperationInfo bindingOperationInfo; - private Object[] params; - private Map context; - private List alternateEndpoints; - private List alternateAddresses; - - InvocationContext(Endpoint endpoint, - BindingOperationInfo boi, - Object[] prms, - Map ctx) { - originalEndpoint = endpoint; - originalAddress = endpoint.getEndpointInfo().getAddress(); - bindingOperationInfo = boi; - params = prms; - context = ctx; - } - - Endpoint retrieveOriginalEndpoint(Endpoint endpoint) { - if (endpoint != originalEndpoint) { - getLogger().log(Level.INFO, - "REVERT_TO_ORIGINAL_TARGET", - endpoint.getEndpointInfo().getName()); - } - if (!endpoint.getEndpointInfo().getAddress().equals(originalAddress)) { - endpoint.getEndpointInfo().setAddress(originalAddress); - getLogger().log(Level.INFO, - "REVERT_TO_ORIGINAL_ADDRESS", - endpoint.getEndpointInfo().getAddress()); - } - return originalEndpoint; - } - - BindingOperationInfo getBindingOperationInfo() { - return bindingOperationInfo; - } - - Object[] getParams() { - return params; - } - - Map getContext() { - return context; - } - - List getAlternateEndpoints() { - return alternateEndpoints; - } - - List getAlternateAddresses() { - return alternateAddresses; - } - - void setAlternateEndpoints(List alternates) { - alternateEndpoints = alternates; - } - - void setAlternateAddresses(List alternates) { - alternateAddresses = alternates; - } - - boolean hasAlternates() { - return !(alternateEndpoints == null && alternateAddresses == null); - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.clustering; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.endpoint.AbstractConduitSelector; +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.endpoint.Retryable; +import org.apache.cxf.helpers.CastUtils; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; +import org.apache.cxf.service.model.BindingOperationInfo; +import org.apache.cxf.transport.Conduit; + + +/** + * Implements a target selection strategy based on failover to an + * alternate target endpoint when a transport level failure is + * encountered. + * Note that this feature changes the conduit on the fly and thus makes + * the Client not thread safe. + */ +public class FailoverTargetSelector extends AbstractConduitSelector { + + private static final Logger LOG = + LogUtils.getL7dLogger(FailoverTargetSelector.class); + protected Map inProgress + = new ConcurrentHashMap();; + protected FailoverStrategy failoverStrategy; + + /** + * Normal constructor. + */ + public FailoverTargetSelector() { + super(); + } + + /** + * Constructor, allowing a specific conduit to override normal selection. + * + * @param c specific conduit + */ + public FailoverTargetSelector(Conduit c) { + super(c); + } + + /** + * Called prior to the interceptor chain being traversed. + * + * @param message the current Message + */ + public synchronized void prepare(Message message) { + Exchange exchange = message.getExchange(); + InvocationKey key = new InvocationKey(exchange); + if (!inProgress.containsKey(key)) { + Endpoint endpoint = exchange.get(Endpoint.class); + BindingOperationInfo bindingOperationInfo = + exchange.getBindingOperationInfo(); + Object[] params = message.getContent(List.class).toArray(); + Map context = + CastUtils.cast((Map)message.get(Message.INVOCATION_CONTEXT)); + InvocationContext invocation = + new InvocationContext(endpoint, + bindingOperationInfo, + params, + context); + inProgress.put(key, invocation); + } + } + + /** + * Called when a Conduit is actually required. + * + * @param message + * @return the Conduit to use for mediation of the message + */ + public Conduit selectConduit(Message message) { + Conduit c = message.get(Conduit.class); + if (c != null) { + return c; + } + return getSelectedConduit(message); + } + + /** + * Called on completion of the MEP for which the Conduit was required. + * + * @param exchange represents the completed MEP + */ + public void complete(Exchange exchange) { + InvocationKey key = new InvocationKey(exchange); + InvocationContext invocation = null; + synchronized (this) { + invocation = inProgress.get(key); + } + boolean failover = false; + if (requiresFailover(exchange)) { + Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName()); + + Endpoint failoverTarget = getFailoverTarget(exchange, invocation); + if (failoverTarget != null) { + setEndpoint(failoverTarget); + if (old != null) { + old.close(); + conduits.remove(old); + } + Exception prevExchangeFault = + (Exception)exchange.remove(Exception.class.getName()); + Message outMessage = exchange.getOutMessage(); + Exception prevMessageFault = + outMessage.getContent(Exception.class); + outMessage.setContent(Exception.class, null); + overrideAddressProperty(invocation.getContext()); + Retryable retry = exchange.get(Retryable.class); + exchange.clear(); + if (retry != null) { + try { + failover = true; + long delay = getDelayBetweenRetries(); + if (delay > 0) { + Thread.sleep(delay); + } + retry.invoke(invocation.getBindingOperationInfo(), + invocation.getParams(), + invocation.getContext(), + exchange); + } catch (Exception e) { + if (exchange.get(Exception.class) != null) { + exchange.put(Exception.class, prevExchangeFault); + } + if (outMessage.getContent(Exception.class) != null) { + outMessage.setContent(Exception.class, + prevMessageFault); + } + } + } + } else { + setEndpoint(invocation.retrieveOriginalEndpoint(endpoint)); + } + } + if (!failover) { + getLogger().fine("FAILOVER_NOT_REQUIRED"); + synchronized (this) { + inProgress.remove(key); + } + super.complete(exchange); + } + } + + /** + * @param strategy the FailoverStrategy to use + */ + public synchronized void setStrategy(FailoverStrategy strategy) { + getLogger().log(Level.INFO, "USING_STRATEGY", new Object[] {strategy}); + failoverStrategy = strategy; + } + + /** + * @return strategy the FailoverStrategy to use + */ + public synchronized FailoverStrategy getStrategy() { + if (failoverStrategy == null) { + failoverStrategy = new SequentialStrategy(); + getLogger().log(Level.INFO, + "USING_STRATEGY", + new Object[] {failoverStrategy}); + } + return failoverStrategy; + } + + /** + * @return the logger to use + */ + protected Logger getLogger() { + return LOG; + } + + /** + * Returns delay (in milliseconds) between retries + * @return delay, 0 means no delay + */ + protected long getDelayBetweenRetries() { + FailoverStrategy strategy = getStrategy(); + if (strategy instanceof AbstractStaticFailoverStrategy) { + return ((AbstractStaticFailoverStrategy)strategy).getDelayBetweenRetries(); + } + //perhaps supporting FailoverTargetSelector specific property can make sense too + return 0; + } + + /** + * Check if the exchange is suitable for a failover. + * + * @param exchange the current Exchange + * @return boolean true if a failover should be attempted + */ + protected boolean requiresFailover(Exchange exchange) { + Message outMessage = exchange.getOutMessage(); + Exception ex = outMessage.get(Exception.class) != null + ? outMessage.get(Exception.class) + : exchange.get(Exception.class); + getLogger().log(Level.FINE, + "CHECK_LAST_INVOKE_FAILED", + new Object[] {ex != null}); + Throwable curr = ex; + boolean failover = false; + while (curr != null) { + failover = curr instanceof java.io.IOException; + curr = curr.getCause(); + } + if (ex != null) { + getLogger().log(Level.INFO, + "CHECK_FAILURE_IN_TRANSPORT", + new Object[] {ex, failover}); + } + return failover; + } + + /** + * Get the failover target endpoint, if a suitable one is available. + * + * @param exchange the current Exchange + * @param invocation the current InvocationContext + * @return a failover endpoint if one is available + */ + protected Endpoint getFailoverTarget(Exchange exchange, + InvocationContext invocation) { + List alternateAddresses = null; + if (!invocation.hasAlternates()) { + // no previous failover attempt on this invocation + // + alternateAddresses = + getStrategy().getAlternateAddresses(exchange); + if (alternateAddresses != null) { + invocation.setAlternateAddresses(alternateAddresses); + } else { + invocation.setAlternateEndpoints( + getStrategy().getAlternateEndpoints(exchange)); + } + } else { + alternateAddresses = invocation.getAlternateAddresses(); + } + + Endpoint failoverTarget = null; + if (alternateAddresses != null) { + String alternateAddress = + getStrategy().selectAlternateAddress(alternateAddresses); + if (alternateAddress != null) { + // re-use current endpoint + // + failoverTarget = getEndpoint(); + + failoverTarget.getEndpointInfo().setAddress(alternateAddress); + } + } else { + failoverTarget = getStrategy().selectAlternateEndpoint( + invocation.getAlternateEndpoints()); + } + return failoverTarget; + } + + /** + * Override the ENDPOINT_ADDRESS property in the request context + * + * @param context the request context + */ + protected void overrideAddressProperty(Map context) { + overrideAddressProperty(context, getEndpoint().getEndpointInfo().getAddress()); + } + + protected void overrideAddressProperty(Map context, + String address) { + Map requestContext = + CastUtils.cast((Map)context.get(Client.REQUEST_CONTEXT)); + if (requestContext != null) { + requestContext.put(Message.ENDPOINT_ADDRESS, address); + requestContext.put("javax.xml.ws.service.endpoint.address", address); + } + } + + // Some conduits may replace the endpoint address after it has already been prepared + // but before the invocation has been done (ex, org.apache.cxf.clustering.LoadDistributorTargetSelector) + // which may affect JAX-RS clients where actual endpoint address property may include additional path + // segments. + protected boolean replaceEndpointAddressPropertyIfNeeded(Message message, + String endpointAddress, + Conduit cond) { + String requestURI = (String)message.get(Message.REQUEST_URI); + if (requestURI != null && endpointAddress != null && !requestURI.startsWith(endpointAddress)) { + String basePath = (String)message.get(Message.BASE_PATH); + if (basePath != null && requestURI.startsWith(basePath)) { + String pathInfo = requestURI.substring(basePath.length()); + message.put(Message.BASE_PATH, endpointAddress); + final String slash = "/"; + boolean startsWithSlash = pathInfo.startsWith(slash); + if (endpointAddress.endsWith(slash)) { + endpointAddress = endpointAddress + (startsWithSlash ? pathInfo.substring(1) : pathInfo); + } else { + endpointAddress = endpointAddress + (startsWithSlash ? pathInfo : (slash + pathInfo)); + } + message.put(Message.ENDPOINT_ADDRESS, endpointAddress); + + Exchange exchange = message.getExchange(); + InvocationKey key = new InvocationKey(exchange); + InvocationContext invocation = inProgress.get(key); + if (invocation != null) { + overrideAddressProperty(invocation.getContext(), + cond.getTarget().getAddress().getValue()); + } + return true; + } + } + return false; + } + + /** + * Used to wrap an Exchange for usage as a Map key. The raw Exchange + * is not a suitable key type, as the hashCode is computed from its + * current contents, which may obviously change over the lifetime of + * an invocation. + */ + protected static class InvocationKey { + private Exchange exchange; + + InvocationKey(Exchange ex) { + exchange = ex; + } + + @Override + public int hashCode() { + return System.identityHashCode(exchange); + } + + @Override + public boolean equals(Object o) { + return o instanceof InvocationKey + && exchange == ((InvocationKey)o).exchange; + } + } + + + /** + * Records the context of an invocation. + */ + protected class InvocationContext { + private Endpoint originalEndpoint; + private String originalAddress; + private BindingOperationInfo bindingOperationInfo; + private Object[] params; + private Map context; + private List alternateEndpoints; + private List alternateAddresses; + + InvocationContext(Endpoint endpoint, + BindingOperationInfo boi, + Object[] prms, + Map ctx) { + originalEndpoint = endpoint; + originalAddress = endpoint.getEndpointInfo().getAddress(); + bindingOperationInfo = boi; + params = prms; + context = ctx; + } + + Endpoint retrieveOriginalEndpoint(Endpoint endpoint) { + if (endpoint != originalEndpoint) { + getLogger().log(Level.INFO, + "REVERT_TO_ORIGINAL_TARGET", + endpoint.getEndpointInfo().getName()); + } + if (!endpoint.getEndpointInfo().getAddress().equals(originalAddress)) { + endpoint.getEndpointInfo().setAddress(originalAddress); + getLogger().log(Level.INFO, + "REVERT_TO_ORIGINAL_ADDRESS", + endpoint.getEndpointInfo().getAddress()); + } + return originalEndpoint; + } + + BindingOperationInfo getBindingOperationInfo() { + return bindingOperationInfo; + } + + Object[] getParams() { + return params; + } + + Map getContext() { + return context; + } + + List getAlternateEndpoints() { + return alternateEndpoints; + } + + List getAlternateAddresses() { + return alternateAddresses; + } + + void setAlternateEndpoints(List alternates) { + alternateEndpoints = alternates; + } + + void setAlternateAddresses(List alternates) { + alternateAddresses = alternates; + } + + boolean hasAlternates() { + return !(alternateEndpoints == null && alternateAddresses == null); + } + } +} Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java?rev=1310446&r1=1310445&r2=1310446&view=diff ============================================================================== --- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java (original) +++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java Fri Apr 6 16:21:14 2012 @@ -1,36 +1,36 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.clustering; - -import org.apache.cxf.common.injection.NoJSR250Annotations; - -/** - * This feature may be applied to a Client so as to enable - * load distribution amongst a set of target endpoints or addresses - * Note that this feature changes the conduit on the fly and thus makes - * the Client not thread safe. - */ -@NoJSR250Annotations -public class LoadDistributorFeature extends FailoverFeature { - - @Override - public FailoverTargetSelector getTargetSelector() { - return new LoadDistributorTargetSelector(); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.clustering; + +import org.apache.cxf.common.injection.NoJSR250Annotations; + +/** + * This feature may be applied to a Client so as to enable + * load distribution amongst a set of target endpoints or addresses + * Note that this feature changes the conduit on the fly and thus makes + * the Client not thread safe. + */ +@NoJSR250Annotations +public class LoadDistributorFeature extends FailoverFeature { + + @Override + public FailoverTargetSelector getTargetSelector() { + return new LoadDistributorTargetSelector(); + } +}