camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [1/2] camel git commit: CAMEL-10486: The consumer threading/message undelelivered issue fix
Date Thu, 01 Dec 2016 07:36:30 GMT
Repository: camel
Updated Branches:
  refs/heads/master 7561e4766 -> 2310011d1


CAMEL-10486: The consumer threading/message undelelivered issue fix


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc2138a9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc2138a9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc2138a9

Branch: refs/heads/master
Commit: dc2138a9d39f15ec4a7c657edb4bd608115fcbf8
Parents: 7561e47
Author: Evgeny Minkevich <evgeny.minkevich@gmail.com>
Authored: Thu Nov 17 14:50:55 2016 +1100
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Thu Dec 1 08:21:45 2016 +0100

----------------------------------------------------------------------
 .../src/main/docs/google-pubsub-component.adoc  | 23 +++++++++-
 .../pubsub/GooglePubsubConnectionFactory.java   | 38 ++++++++++------
 .../google/pubsub/GooglePubsubConstants.java    |  1 +
 .../google/pubsub/GooglePubsubConsumer.java     | 46 +++++++++++---------
 .../google/pubsub/GooglePubsubEndpoint.java     | 22 ++++++----
 .../pubsub/consumer/ExchangeAckTransaction.java | 22 +++++++++-
 .../pubsub/consumer/PubsubAcknowledgement.java  |  8 ++--
 .../google/pubsub/PubsubTestSupport.java        |  2 +-
 .../PubsubConnectionFactoryTest.java            |  2 +-
 9 files changed, 113 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
index 38fefef..0eb947f 100644
--- a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
+++ b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
@@ -87,7 +87,7 @@ The Google Pubsub component supports 11 endpoint options which are listed
below:
 | destinationName | common |  | String | *Required* Destination Name
 | ackMode | common | AUTO | AckMode | AUTO = exchange gets ack'ed/nack'ed on completion.
NONE = downstream process has to ack/nack explicitly
 | concurrentConsumers | common | 1 | Integer | The number of parallel streams consuming from
the subscription
-| connectionFactory | common |  | GooglePubsubConnectionFactory | ConnectionFactory to obtain
connection to PubSub Service. If non provided the default one will be used
+| connectionFactory | common |  | GooglePubsubConnectionFactory | ConnectionFactory to obtain
connection to PubSub Service. If non provided the default will be used.
 | loggerId | common |  | String | Logger ID to use when a match to the parent route required
 | maxMessagesPerPoll | common | 1 | Integer | The max number of messages to receive from
the server in a single API call
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the
Camel routing Error Handler which mean any exceptions occurred while the consumer is trying
to pickup incoming messages or the likes will now be processed as a message and handled by
the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler
to deal with exceptions that will be logged at WARN/ERROR level and ignored.
@@ -146,3 +146,24 @@ Message Body
 
 The consumer endpoint returns the content of the message as byte[] - exactly as the underlying
system sends it.
 It is up for the route to convert/unmarshall the contents.
+
+[[GooglePubsub-RollbackRedelivery]]
+Rollback and Redelivery
+^^^^^^^^^^^^
+
+The rollback for Google PubSub relies on the idea of the Acknowledgement Deadline - the time
period where Google PubSub expects to receive the acknowledgement.
+If the acknowledgement has not been received, the message is redelivered.
+
+Google provides an API to extend the deadline for a message.
+
+More information in https://cloud.google.com/pubsub/docs/subscriber#ack_deadline[Google PubSub
Documentation]
+
+So, rollback is effectively a deadline extension API call with zero value - i.e. deadline
is reached now and message can
+be redelivered to the next consumer.
+
+It is possible to delay the message redelivery by setting the acknowledgement deadline explicitly
for the rollback by
+setting the message header
+
+* GooglePubsubConstants.ACK_DEADLINE
+
+to the value in seconds.

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
index 104bd63..59bdb8b 100644
--- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
+++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
 import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
 import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.apache.ApacheHttpTransport;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.client.util.Base64;
@@ -33,13 +34,15 @@ import com.google.api.client.util.Strings;
 import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.pubsub.PubsubScopes;
 
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class GooglePubsubConnectionFactory {
 
-    private static JsonFactory jsonFactory;
-    private static HttpTransport transport;
+    private static JsonFactory jsonFactory = new JacksonFactory();
 
     private final Logger logger = LoggerFactory.getLogger(GooglePubsubConnectionFactory.class);
 
@@ -51,23 +54,30 @@ public class GooglePubsubConnectionFactory {
     private Pubsub client;
 
     public GooglePubsubConnectionFactory() {
-        jsonFactory = new JacksonFactory();
-
-        try {
-            transport = GoogleNetHttpTransport.newTrustedTransport();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
     }
 
-    public synchronized Pubsub getClient() throws Exception {
+    public synchronized Pubsub getDefaultClient() throws Exception {
         if (this.client == null) {
             this.client = buildClient();
         }
         return this.client;
     }
 
+    public Pubsub getMultiThreadClient(int parallelThreads) throws Exception {
+
+        PoolingHttpClientConnectionManager cm=new PoolingHttpClientConnectionManager();
+        cm.setDefaultMaxPerRoute(parallelThreads);
+        cm.setMaxTotal(parallelThreads);
+        CloseableHttpClient httpClient = HttpClients.createMinimal(cm);
+
+        return buildClient(new ApacheHttpTransport(httpClient));
+    }
+
     private Pubsub buildClient() throws Exception {
+        return buildClient(GoogleNetHttpTransport.newTrustedTransport());
+    };
+
+    private Pubsub buildClient(HttpTransport httpTransport) throws Exception {
 
         GoogleCredential credential = null;
 
@@ -75,7 +85,7 @@ public class GooglePubsubConnectionFactory {
             if (logger.isDebugEnabled()) {
                 logger.debug("Service Account and Key have been set explicitly. Initialising
PubSub using Service Account " + serviceAccount);
             }
-            credential = createFromAccountKeyPair();
+            credential = createFromAccountKeyPair(httpTransport);
         }
 
         if (credential == null && !Strings.isNullOrEmpty(credentialsFileLocation))
{
@@ -92,7 +102,7 @@ public class GooglePubsubConnectionFactory {
             credential = createDefault();
         }
 
-        Pubsub.Builder builder = new Pubsub.Builder(transport, jsonFactory, credential)
+        Pubsub.Builder builder = new Pubsub.Builder(httpTransport, jsonFactory, credential)
                 .setApplicationName("camel-google-pubsub");
 
         // Local emulator, SOCKS proxy, etc.
@@ -126,10 +136,10 @@ public class GooglePubsubConnectionFactory {
         return credential;
     }
 
-    private GoogleCredential createFromAccountKeyPair() {
+    private GoogleCredential createFromAccountKeyPair(HttpTransport httpTransport) {
         try {
             GoogleCredential credential = new GoogleCredential.Builder()
-                    .setTransport(transport)
+                    .setTransport(httpTransport)
                     .setJsonFactory(jsonFactory)
                     .setServiceAccountId(serviceAccount)
                     .setServiceAccountScopes(PubsubScopes.all())

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
index 669a521..f3e8785 100644
--- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
+++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
@@ -22,6 +22,7 @@ public final class GooglePubsubConstants {
     public static final String ACK_ID = "CamelGooglePubsub.MsgAckId";
     public static final String PUBLISH_TIME = "CamelGooglePubsub.PublishTime";
     public static final String ATTRIBUTES = "CamelGooglePubsub.Attributes";
+    public static final String ACK_DEADLINE = "CamelGooglePubsub.AckDeadline";
 
     public enum AckMode {
         AUTO, NONE

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 24390ae..dffc63c 100644
--- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import com.google.api.client.repackaged.com.google.common.base.Strings;
+import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.pubsub.model.PubsubMessage;
 import com.google.api.services.pubsub.model.PullRequest;
 import com.google.api.services.pubsub.model.PullResponse;
@@ -40,17 +41,21 @@ class GooglePubsubConsumer extends DefaultConsumer {
 
     private Logger localLog;
 
-    private ExecutorService executor;
     private final GooglePubsubEndpoint endpoint;
     private final Processor processor;
     private final Synchronization ackStrategy;
 
-    GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) {
+    private ExecutorService executor;
+    private Pubsub pubsub;
+
+    GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) throws Exception
{
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
         this.ackStrategy = new ExchangeAckTransaction(this.endpoint);
 
+        pubsub = endpoint.getConnectionFactory().getMultiThreadClient(this.endpoint.getConcurrentConsumers());
+
         String loggerId = endpoint.getLoggerId();
 
         if (Strings.isNullOrEmpty(loggerId)) {
@@ -63,7 +68,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        localLog.info("Starting Google PubSub consumer");
+        localLog.info("Starting Google PubSub consumer for {}/{}", endpoint.getProjectId(),
endpoint.getDestinationName());
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
 
@@ -75,7 +80,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        localLog.info("Stopping Google PubSub consumer");
+        localLog.info("Stopping Google PubSub consumer for {}/{}", endpoint.getProjectId(),
endpoint.getDestinationName());
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null)
{
@@ -101,24 +106,23 @@ class GooglePubsubConsumer extends DefaultConsumer {
 
         @Override
         public void run() {
-            try {
-                if (localLog.isDebugEnabled()) {
-                    localLog.debug("Subscribing {} to {}", threadId, subscriptionFullName);
-                }
+            if (localLog.isDebugEnabled()) {
+                localLog.debug("Subscribing {} to {}", threadId, subscriptionFullName);
+            }
 
-                while (isRunAllowed() && !isSuspendingOrSuspended()) {
+            while (isRunAllowed() && !isSuspendingOrSuspended()) {
+                try {
                     PullRequest pullRequest = new PullRequest().setMaxMessages(endpoint.getMaxMessagesPerPoll());
                     PullResponse pullResponse;
                     try {
                         if (localLog.isTraceEnabled()) {
                             localLog.trace("Polling : {}", threadId);
                         }
-                        pullResponse = GooglePubsubConsumer.this.endpoint
-                                               .getPubsub()
-                                               .projects()
-                                               .subscriptions()
-                                               .pull(subscriptionFullName, pullRequest)
-                                               .execute();
+                        pullResponse = GooglePubsubConsumer.this.pubsub
+                                .projects()
+                                .subscriptions()
+                                .pull(subscriptionFullName, pullRequest)
+                                .execute();
                     } catch (SocketTimeoutException ste) {
                         if (localLog.isTraceEnabled()) {
                             localLog.trace("Socket timeout : {}", threadId);
@@ -126,6 +130,10 @@ class GooglePubsubConsumer extends DefaultConsumer {
                         continue;
                     }
 
+                    if (null == pullResponse.getReceivedMessages()) {
+                        continue;
+                    }
+
                     List<ReceivedMessage> receivedMessages = pullResponse.getReceivedMessages();
 
                     for (ReceivedMessage receivedMessage : receivedMessages) {
@@ -158,11 +166,9 @@ class GooglePubsubConsumer extends DefaultConsumer {
                             exchange.setException(e);
                         }
                     }
+                } catch (Exception e) {
+                    localLog.error("Failure getting messages from PubSub : ", e);
                 }
-            } catch (Exception e) {
-                localLog.error("Requesting messages from PubSub Failed:", e);
-                RuntimeCamelException rce = wrapRuntimeCamelException(e);
-                throw rce;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
index 8163b61..03c7b93 100644
--- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
+++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -33,7 +33,7 @@ import org.apache.camel.spi.UriPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
- /**
+/**
  * Messaging client for Google Cloud Platform PubSub Service:
  * https://cloud.google.com/pubsub/
  *
@@ -91,15 +91,14 @@ public class GooglePubsubEndpoint extends DefaultEndpoint {
             log = LoggerFactory.getLogger(loggerId);
         }
 
-        GooglePubsubConnectionFactory cf = (null == connectionFactory)
-                ? getComponent().getConnectionFactory()
-                : connectionFactory;
-
-        pubsub = cf.getClient();
+        // Default pubsub connection.
+        // With the publisher endpoints - the main publisher
+        // with the consumer endpoints  - the ack client
+        pubsub = getConnectionFactory().getDefaultClient();
 
+        log.trace("Credential file location : {}", getConnectionFactory().getCredentialsFileLocation());
         log.trace("Project ID: {}", this.projectId);
         log.trace("Destination Name: {}", this.destinationName);
-        log.trace("From file : {}", cf.getCredentialsFileLocation());
     }
 
     public Producer createProducer() throws Exception {
@@ -177,8 +176,13 @@ public class GooglePubsubEndpoint extends DefaultEndpoint {
         return pubsub;
     }
 
+    /**
+     * ConnectionFactory to obtain connection to PubSub Service. If non provided the default
will be used.
+     */
     public GooglePubsubConnectionFactory getConnectionFactory() {
-        return connectionFactory;
+        return (null == connectionFactory)
+                ? getComponent().getConnectionFactory()
+                : connectionFactory;
     }
 
     public void setConnectionFactory(GooglePubsubConnectionFactory connectionFactory) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
index b1e7c41..b807496 100644
--- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
+++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
@@ -37,7 +37,27 @@ public class ExchangeAckTransaction extends PubsubAcknowledgement implements
Syn
 
     @Override
     public void onFailure(Exchange exchange) {
-        resetAckDeadline(getAckIdList(exchange));
+
+        Integer deadline = 0;
+        Object configuredDeadline = exchange.getIn().getHeader(GooglePubsubConstants.ACK_DEADLINE);
+
+        if ( configuredDeadline != null && Integer.class.isInstance(configuredDeadline)
) {
+            deadline = (Integer) configuredDeadline;
+        }
+
+        if ( configuredDeadline != null && String.class.isInstance(configuredDeadline)
) {
+            try {
+                deadline = Integer.valueOf((String) configuredDeadline);
+            } catch (Exception e) {
+                logger.warn("Unable to parse ACK Deadline header value", e);
+            }
+        }
+
+        if (deadline != 0) {
+            logger.trace(" Exchange {} : Ack deadline : {}", exchange.getExchangeId(), deadline);
+        }
+
+        resetAckDeadline(getAckIdList(exchange), deadline);
     }
 
     private List<String> getAckIdList(Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
index 742c469..267abc0 100644
--- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
+++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
@@ -28,11 +28,11 @@ import org.slf4j.LoggerFactory;
 
 public abstract class PubsubAcknowledgement {
 
-    private Logger logger;
     private final String subscriptionFullName;
-
     private final GooglePubsubEndpoint endpoint;
 
+    protected Logger logger;
+
     public PubsubAcknowledgement(GooglePubsubEndpoint endpoint) {
         super();
         this.endpoint = endpoint;
@@ -61,11 +61,11 @@ public abstract class PubsubAcknowledgement {
         }
     }
 
-    void resetAckDeadline(List<String> ackIdList) {
+    void resetAckDeadline(List<String> ackIdList, Integer seconds) {
 
         ModifyAckDeadlineRequest nackRequest = new ModifyAckDeadlineRequest()
                 .setAckIds(ackIdList)
-                .setAckDeadlineSeconds(0);
+                .setAckDeadlineSeconds(seconds);
 
         try {
             endpoint.getPubsub()

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
index 47ed6cc..7aa2800 100644
--- a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
+++ b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
@@ -94,7 +94,7 @@ public class PubsubTestSupport extends CamelTestSupport {
             .setServiceAccount(SERVICE_ACCOUNT)
             .setServiceAccountKey(SERVICE_KEY)
             .setServiceURL(SERVICE_URL)
-            .getClient();
+            .getDefaultClient();
 
         String topicFullName = String.format("projects/%s/topics/%s",
                                          PubsubTestSupport.PROJECT_ID,

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
index 60dbacd..99128c2 100644
--- a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
+++ b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
@@ -50,7 +50,7 @@ public class PubsubConnectionFactoryTest extends PubsubTestSupport {
                 .setCredentialsFileLocation(file.getAbsolutePath())
                 .setServiceURL(SERVICE_URL);
 
-        Pubsub pubsub = cf.getClient();
+        Pubsub pubsub = cf.getDefaultClient();
 
         String query = String.format("projects/%s", PROJECT_ID);
         // [ DEPENDS on actual project being available]


Mime
View raw message