Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 81BCA200BD4 for ; Thu, 1 Dec 2016 08:36:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 804F6160B0F; Thu, 1 Dec 2016 07:36:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2BAD6160B0B for ; Thu, 1 Dec 2016 08:36:31 +0100 (CET) Received: (qmail 79949 invoked by uid 500); 1 Dec 2016 07:36:30 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 79931 invoked by uid 99); 1 Dec 2016 07:36:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Dec 2016 07:36:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 39FE0E0A77; Thu, 1 Dec 2016 07:36:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: acosentino@apache.org To: commits@camel.apache.org Date: Thu, 01 Dec 2016 07:36:30 -0000 Message-Id: <7bf5f0f455a34c70b33b9cc44e1674ad@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] camel git commit: CAMEL-10486: The consumer threading/message undelelivered issue fix archived-at: Thu, 01 Dec 2016 07:36:32 -0000 Repository: camel Updated Branches: refs/heads/master 7561e4766 -> 2310011d1 CAMEL-10486: The consumer threading/message undelelivered issue fix Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc2138a9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc2138a9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc2138a9 Branch: refs/heads/master Commit: dc2138a9d39f15ec4a7c657edb4bd608115fcbf8 Parents: 7561e47 Author: Evgeny Minkevich Authored: Thu Nov 17 14:50:55 2016 +1100 Committer: Andrea Cosentino Committed: Thu Dec 1 08:21:45 2016 +0100 ---------------------------------------------------------------------- .../src/main/docs/google-pubsub-component.adoc | 23 +++++++++- .../pubsub/GooglePubsubConnectionFactory.java | 38 ++++++++++------ .../google/pubsub/GooglePubsubConstants.java | 1 + .../google/pubsub/GooglePubsubConsumer.java | 46 +++++++++++--------- .../google/pubsub/GooglePubsubEndpoint.java | 22 ++++++---- .../pubsub/consumer/ExchangeAckTransaction.java | 22 +++++++++- .../pubsub/consumer/PubsubAcknowledgement.java | 8 ++-- .../google/pubsub/PubsubTestSupport.java | 2 +- .../PubsubConnectionFactoryTest.java | 2 +- 9 files changed, 113 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc index 38fefef..0eb947f 100644 --- a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc +++ b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc @@ -87,7 +87,7 @@ The Google Pubsub component supports 11 endpoint options which are listed below: | destinationName | common | | String | *Required* Destination Name | ackMode | common | AUTO | AckMode | AUTO = exchange gets ack'ed/nack'ed on completion. NONE = downstream process has to ack/nack explicitly | concurrentConsumers | common | 1 | Integer | The number of parallel streams consuming from the subscription -| connectionFactory | common | | GooglePubsubConnectionFactory | ConnectionFactory to obtain connection to PubSub Service. If non provided the default one will be used +| connectionFactory | common | | GooglePubsubConnectionFactory | ConnectionFactory to obtain connection to PubSub Service. If non provided the default will be used. | loggerId | common | | String | Logger ID to use when a match to the parent route required | maxMessagesPerPoll | common | 1 | Integer | The max number of messages to receive from the server in a single API call | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored. @@ -146,3 +146,24 @@ Message Body The consumer endpoint returns the content of the message as byte[] - exactly as the underlying system sends it. It is up for the route to convert/unmarshall the contents. + +[[GooglePubsub-RollbackRedelivery]] +Rollback and Redelivery +^^^^^^^^^^^^ + +The rollback for Google PubSub relies on the idea of the Acknowledgement Deadline - the time period where Google PubSub expects to receive the acknowledgement. +If the acknowledgement has not been received, the message is redelivered. + +Google provides an API to extend the deadline for a message. + +More information in https://cloud.google.com/pubsub/docs/subscriber#ack_deadline[Google PubSub Documentation] + +So, rollback is effectively a deadline extension API call with zero value - i.e. deadline is reached now and message can +be redelivered to the next consumer. + +It is possible to delay the message redelivery by setting the acknowledgement deadline explicitly for the rollback by +setting the message header + +* GooglePubsubConstants.ACK_DEADLINE + +to the value in seconds. http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java index 104bd63..59bdb8b 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java @@ -26,6 +26,7 @@ import java.util.Collections; import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.apache.ApacheHttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.client.util.Base64; @@ -33,13 +34,15 @@ import com.google.api.client.util.Strings; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.PubsubScopes; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class GooglePubsubConnectionFactory { - private static JsonFactory jsonFactory; - private static HttpTransport transport; + private static JsonFactory jsonFactory = new JacksonFactory(); private final Logger logger = LoggerFactory.getLogger(GooglePubsubConnectionFactory.class); @@ -51,23 +54,30 @@ public class GooglePubsubConnectionFactory { private Pubsub client; public GooglePubsubConnectionFactory() { - jsonFactory = new JacksonFactory(); - - try { - transport = GoogleNetHttpTransport.newTrustedTransport(); - } catch (Exception e) { - throw new RuntimeException(e); - } } - public synchronized Pubsub getClient() throws Exception { + public synchronized Pubsub getDefaultClient() throws Exception { if (this.client == null) { this.client = buildClient(); } return this.client; } + public Pubsub getMultiThreadClient(int parallelThreads) throws Exception { + + PoolingHttpClientConnectionManager cm=new PoolingHttpClientConnectionManager(); + cm.setDefaultMaxPerRoute(parallelThreads); + cm.setMaxTotal(parallelThreads); + CloseableHttpClient httpClient = HttpClients.createMinimal(cm); + + return buildClient(new ApacheHttpTransport(httpClient)); + } + private Pubsub buildClient() throws Exception { + return buildClient(GoogleNetHttpTransport.newTrustedTransport()); + }; + + private Pubsub buildClient(HttpTransport httpTransport) throws Exception { GoogleCredential credential = null; @@ -75,7 +85,7 @@ public class GooglePubsubConnectionFactory { if (logger.isDebugEnabled()) { logger.debug("Service Account and Key have been set explicitly. Initialising PubSub using Service Account " + serviceAccount); } - credential = createFromAccountKeyPair(); + credential = createFromAccountKeyPair(httpTransport); } if (credential == null && !Strings.isNullOrEmpty(credentialsFileLocation)) { @@ -92,7 +102,7 @@ public class GooglePubsubConnectionFactory { credential = createDefault(); } - Pubsub.Builder builder = new Pubsub.Builder(transport, jsonFactory, credential) + Pubsub.Builder builder = new Pubsub.Builder(httpTransport, jsonFactory, credential) .setApplicationName("camel-google-pubsub"); // Local emulator, SOCKS proxy, etc. @@ -126,10 +136,10 @@ public class GooglePubsubConnectionFactory { return credential; } - private GoogleCredential createFromAccountKeyPair() { + private GoogleCredential createFromAccountKeyPair(HttpTransport httpTransport) { try { GoogleCredential credential = new GoogleCredential.Builder() - .setTransport(transport) + .setTransport(httpTransport) .setJsonFactory(jsonFactory) .setServiceAccountId(serviceAccount) .setServiceAccountScopes(PubsubScopes.all()) http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java index 669a521..f3e8785 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java @@ -22,6 +22,7 @@ public final class GooglePubsubConstants { public static final String ACK_ID = "CamelGooglePubsub.MsgAckId"; public static final String PUBLISH_TIME = "CamelGooglePubsub.PublishTime"; public static final String ATTRIBUTES = "CamelGooglePubsub.Attributes"; + public static final String ACK_DEADLINE = "CamelGooglePubsub.AckDeadline"; public enum AckMode { AUTO, NONE http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index 24390ae..dffc63c 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import com.google.api.client.repackaged.com.google.common.base.Strings; +import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.model.PubsubMessage; import com.google.api.services.pubsub.model.PullRequest; import com.google.api.services.pubsub.model.PullResponse; @@ -40,17 +41,21 @@ class GooglePubsubConsumer extends DefaultConsumer { private Logger localLog; - private ExecutorService executor; private final GooglePubsubEndpoint endpoint; private final Processor processor; private final Synchronization ackStrategy; - GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) { + private ExecutorService executor; + private Pubsub pubsub; + + GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) throws Exception { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; this.ackStrategy = new ExchangeAckTransaction(this.endpoint); + pubsub = endpoint.getConnectionFactory().getMultiThreadClient(this.endpoint.getConcurrentConsumers()); + String loggerId = endpoint.getLoggerId(); if (Strings.isNullOrEmpty(loggerId)) { @@ -63,7 +68,7 @@ class GooglePubsubConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { super.doStart(); - localLog.info("Starting Google PubSub consumer"); + localLog.info("Starting Google PubSub consumer for {}/{}", endpoint.getProjectId(), endpoint.getDestinationName()); executor = endpoint.createExecutor(); for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) { @@ -75,7 +80,7 @@ class GooglePubsubConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { super.doStop(); - localLog.info("Stopping Google PubSub consumer"); + localLog.info("Stopping Google PubSub consumer for {}/{}", endpoint.getProjectId(), endpoint.getDestinationName()); if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { @@ -101,24 +106,23 @@ class GooglePubsubConsumer extends DefaultConsumer { @Override public void run() { - try { - if (localLog.isDebugEnabled()) { - localLog.debug("Subscribing {} to {}", threadId, subscriptionFullName); - } + if (localLog.isDebugEnabled()) { + localLog.debug("Subscribing {} to {}", threadId, subscriptionFullName); + } - while (isRunAllowed() && !isSuspendingOrSuspended()) { + while (isRunAllowed() && !isSuspendingOrSuspended()) { + try { PullRequest pullRequest = new PullRequest().setMaxMessages(endpoint.getMaxMessagesPerPoll()); PullResponse pullResponse; try { if (localLog.isTraceEnabled()) { localLog.trace("Polling : {}", threadId); } - pullResponse = GooglePubsubConsumer.this.endpoint - .getPubsub() - .projects() - .subscriptions() - .pull(subscriptionFullName, pullRequest) - .execute(); + pullResponse = GooglePubsubConsumer.this.pubsub + .projects() + .subscriptions() + .pull(subscriptionFullName, pullRequest) + .execute(); } catch (SocketTimeoutException ste) { if (localLog.isTraceEnabled()) { localLog.trace("Socket timeout : {}", threadId); @@ -126,6 +130,10 @@ class GooglePubsubConsumer extends DefaultConsumer { continue; } + if (null == pullResponse.getReceivedMessages()) { + continue; + } + List receivedMessages = pullResponse.getReceivedMessages(); for (ReceivedMessage receivedMessage : receivedMessages) { @@ -158,11 +166,9 @@ class GooglePubsubConsumer extends DefaultConsumer { exchange.setException(e); } } + } catch (Exception e) { + localLog.error("Failure getting messages from PubSub : ", e); } - } catch (Exception e) { - localLog.error("Requesting messages from PubSub Failed:", e); - RuntimeCamelException rce = wrapRuntimeCamelException(e); - throw rce; } } } http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java index 8163b61..03c7b93 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -33,7 +33,7 @@ import org.apache.camel.spi.UriPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** +/** * Messaging client for Google Cloud Platform PubSub Service: * https://cloud.google.com/pubsub/ * @@ -91,15 +91,14 @@ public class GooglePubsubEndpoint extends DefaultEndpoint { log = LoggerFactory.getLogger(loggerId); } - GooglePubsubConnectionFactory cf = (null == connectionFactory) - ? getComponent().getConnectionFactory() - : connectionFactory; - - pubsub = cf.getClient(); + // Default pubsub connection. + // With the publisher endpoints - the main publisher + // with the consumer endpoints - the ack client + pubsub = getConnectionFactory().getDefaultClient(); + log.trace("Credential file location : {}", getConnectionFactory().getCredentialsFileLocation()); log.trace("Project ID: {}", this.projectId); log.trace("Destination Name: {}", this.destinationName); - log.trace("From file : {}", cf.getCredentialsFileLocation()); } public Producer createProducer() throws Exception { @@ -177,8 +176,13 @@ public class GooglePubsubEndpoint extends DefaultEndpoint { return pubsub; } + /** + * ConnectionFactory to obtain connection to PubSub Service. If non provided the default will be used. + */ public GooglePubsubConnectionFactory getConnectionFactory() { - return connectionFactory; + return (null == connectionFactory) + ? getComponent().getConnectionFactory() + : connectionFactory; } public void setConnectionFactory(GooglePubsubConnectionFactory connectionFactory) { http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java index b1e7c41..b807496 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java @@ -37,7 +37,27 @@ public class ExchangeAckTransaction extends PubsubAcknowledgement implements Syn @Override public void onFailure(Exchange exchange) { - resetAckDeadline(getAckIdList(exchange)); + + Integer deadline = 0; + Object configuredDeadline = exchange.getIn().getHeader(GooglePubsubConstants.ACK_DEADLINE); + + if ( configuredDeadline != null && Integer.class.isInstance(configuredDeadline) ) { + deadline = (Integer) configuredDeadline; + } + + if ( configuredDeadline != null && String.class.isInstance(configuredDeadline) ) { + try { + deadline = Integer.valueOf((String) configuredDeadline); + } catch (Exception e) { + logger.warn("Unable to parse ACK Deadline header value", e); + } + } + + if (deadline != 0) { + logger.trace(" Exchange {} : Ack deadline : {}", exchange.getExchangeId(), deadline); + } + + resetAckDeadline(getAckIdList(exchange), deadline); } private List getAckIdList(Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java index 742c469..267abc0 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java @@ -28,11 +28,11 @@ import org.slf4j.LoggerFactory; public abstract class PubsubAcknowledgement { - private Logger logger; private final String subscriptionFullName; - private final GooglePubsubEndpoint endpoint; + protected Logger logger; + public PubsubAcknowledgement(GooglePubsubEndpoint endpoint) { super(); this.endpoint = endpoint; @@ -61,11 +61,11 @@ public abstract class PubsubAcknowledgement { } } - void resetAckDeadline(List ackIdList) { + void resetAckDeadline(List ackIdList, Integer seconds) { ModifyAckDeadlineRequest nackRequest = new ModifyAckDeadlineRequest() .setAckIds(ackIdList) - .setAckDeadlineSeconds(0); + .setAckDeadlineSeconds(seconds); try { endpoint.getPubsub() http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java index 47ed6cc..7aa2800 100644 --- a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java +++ b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java @@ -94,7 +94,7 @@ public class PubsubTestSupport extends CamelTestSupport { .setServiceAccount(SERVICE_ACCOUNT) .setServiceAccountKey(SERVICE_KEY) .setServiceURL(SERVICE_URL) - .getClient(); + .getDefaultClient(); String topicFullName = String.format("projects/%s/topics/%s", PubsubTestSupport.PROJECT_ID, http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java index 60dbacd..99128c2 100644 --- a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java +++ b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java @@ -50,7 +50,7 @@ public class PubsubConnectionFactoryTest extends PubsubTestSupport { .setCredentialsFileLocation(file.getAbsolutePath()) .setServiceURL(SERVICE_URL); - Pubsub pubsub = cf.getClient(); + Pubsub pubsub = cf.getDefaultClient(); String query = String.format("projects/%s", PROJECT_ID); // [ DEPENDS on actual project being available]