Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 10FE7200C78 for ; Wed, 12 Apr 2017 21:56:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0F5A7160B95; Wed, 12 Apr 2017 19:56:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8FA5C160BA8 for ; Wed, 12 Apr 2017 21:56:35 +0200 (CEST) Received: (qmail 61817 invoked by uid 500); 12 Apr 2017 19:56:33 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 60741 invoked by uid 99); 12 Apr 2017 19:56:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Apr 2017 19:56:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3BCBEE9809; Wed, 12 Apr 2017 19:56:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbonofre@apache.org To: commits@beam.apache.org Date: Wed, 12 Apr 2017 19:57:02 -0000 Message-Id: In-Reply-To: <75c8ab1209c4423a85fad7497d6c2c70@git.apache.org> References: <75c8ab1209c4423a85fad7497d6c2c70@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] beam git commit: [BEAM-1722] Move PubsubIO into the google-cloud-platform module archived-at: Wed, 12 Apr 2017 19:56:38 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java deleted file mode 100644 index 6d4cf4e..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; - -import com.google.auth.Credentials; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; -import com.google.pubsub.v1.PublishRequest; -import com.google.pubsub.v1.PublishResponse; -import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase; -import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.PullRequest; -import com.google.pubsub.v1.PullResponse; -import com.google.pubsub.v1.ReceivedMessage; -import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase; -import io.grpc.ManagedChannel; -import io.grpc.Server; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.stub.StreamObserver; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for PubsubGrpcClient. - */ -@RunWith(JUnit4.class) -public class PubsubGrpcClientTest { - private ManagedChannel inProcessChannel; - private Credentials testCredentials; - - private PubsubClient client; - private String channelName; - - private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); - private static final SubscriptionPath SUBSCRIPTION = - PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); - private static final long REQ_TIME = 1234L; - private static final long PUB_TIME = 3456L; - private static final long MESSAGE_TIME = 6789L; - private static final String TIMESTAMP_LABEL = "timestamp"; - private static final String ID_LABEL = "id"; - private static final String MESSAGE_ID = "testMessageId"; - private static final String DATA = "testData"; - private static final String RECORD_ID = "testRecordId"; - private static final String ACK_ID = "testAckId"; - private static final Map ATTRIBUTES = - ImmutableMap.builder().put("a", "b").put("c", "d").build(); - - @Before - public void setup() { - channelName = String.format("%s-%s", - PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt()); - inProcessChannel = InProcessChannelBuilder.forName(channelName).directExecutor().build(); - testCredentials = new TestCredential(); - client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, testCredentials); - } - - @After - public void teardown() throws IOException { - client.close(); - inProcessChannel.shutdownNow(); - } - - @Test - public void pullOneMessage() throws IOException { - String expectedSubscription = SUBSCRIPTION.getPath(); - final PullRequest expectedRequest = - PullRequest.newBuilder() - .setSubscription(expectedSubscription) - .setReturnImmediately(true) - .setMaxMessages(10) - .build(); - Timestamp timestamp = Timestamp.newBuilder() - .setSeconds(PUB_TIME / 1000) - .setNanos((int) (PUB_TIME % 1000) * 1000) - .build(); - PubsubMessage expectedPubsubMessage = - PubsubMessage.newBuilder() - .setMessageId(MESSAGE_ID) - .setData( - ByteString.copyFrom(DATA.getBytes())) - .setPublishTime(timestamp) - .putAllAttributes(ATTRIBUTES) - .putAllAttributes( - ImmutableMap.of(TIMESTAMP_LABEL, - String.valueOf(MESSAGE_TIME), - ID_LABEL, RECORD_ID)) - .build(); - ReceivedMessage expectedReceivedMessage = - ReceivedMessage.newBuilder() - .setMessage(expectedPubsubMessage) - .setAckId(ACK_ID) - .build(); - final PullResponse response = - PullResponse.newBuilder() - .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage)) - .build(); - - final List requestsReceived = new ArrayList<>(); - SubscriberImplBase subscriberImplBase = new SubscriberImplBase() { - @Override - public void pull(PullRequest request, StreamObserver responseObserver) { - requestsReceived.add(request); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - }; - Server server = InProcessServerBuilder.forName(channelName) - .addService(subscriberImplBase) - .build() - .start(); - try { - List acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); - assertEquals(1, acutalMessages.size()); - IncomingMessage actualMessage = acutalMessages.get(0); - assertEquals(ACK_ID, actualMessage.ackId); - assertEquals(DATA, new String(actualMessage.elementBytes)); - assertEquals(RECORD_ID, actualMessage.recordId); - assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); - assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); - assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); - } finally { - server.shutdownNow(); - } - } - - @Test - public void publishOneMessage() throws IOException { - String expectedTopic = TOPIC.getPath(); - PubsubMessage expectedPubsubMessage = - PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(DATA.getBytes())) - .putAllAttributes(ATTRIBUTES) - .putAllAttributes( - ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, RECORD_ID)) - .build(); - final PublishRequest expectedRequest = - PublishRequest.newBuilder() - .setTopic(expectedTopic) - .addAllMessages( - ImmutableList.of(expectedPubsubMessage)) - .build(); - final PublishResponse response = - PublishResponse.newBuilder() - .addAllMessageIds(ImmutableList.of(MESSAGE_ID)) - .build(); - - final List requestsReceived = new ArrayList<>(); - PublisherImplBase publisherImplBase = new PublisherImplBase() { - @Override - public void publish( - PublishRequest request, StreamObserver responseObserver) { - requestsReceived.add(request); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - }; - Server server = InProcessServerBuilder.forName(channelName) - .addService(publisherImplBase) - .build() - .start(); - try { - OutgoingMessage actualMessage = new OutgoingMessage( - DATA.getBytes(), ATTRIBUTES, MESSAGE_TIME, RECORD_ID); - int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); - assertEquals(1, n); - assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); - } finally { - server.shutdownNow(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java deleted file mode 100644 index 019190b..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; - -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PublishResponse; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.api.services.pubsub.model.PullRequest; -import com.google.api.services.pubsub.model.PullResponse; -import com.google.api.services.pubsub.model.ReceivedMessage; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -/** - * Tests for PubsubJsonClient. - */ -@RunWith(JUnit4.class) -public class PubsubJsonClientTest { - private Pubsub mockPubsub; - private PubsubClient client; - - private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); - private static final SubscriptionPath SUBSCRIPTION = - PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); - private static final long REQ_TIME = 1234L; - private static final long PUB_TIME = 3456L; - private static final long MESSAGE_TIME = 6789L; - private static final String TIMESTAMP_LABEL = "timestamp"; - private static final String ID_LABEL = "id"; - private static final String MESSAGE_ID = "testMessageId"; - private static final String DATA = "testData"; - private static final String RECORD_ID = "testRecordId"; - private static final String ACK_ID = "testAckId"; - - @Before - public void setup() throws IOException { - mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS); - client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub); - } - - @After - public void teardown() throws IOException { - client.close(); - client = null; - mockPubsub = null; - } - - @Test - public void pullOneMessage() throws IOException { - String expectedSubscription = SUBSCRIPTION.getPath(); - PullRequest expectedRequest = - new PullRequest().setReturnImmediately(true).setMaxMessages(10); - PubsubMessage expectedPubsubMessage = new PubsubMessage() - .setMessageId(MESSAGE_ID) - .encodeData(DATA.getBytes()) - .setPublishTime(String.valueOf(PUB_TIME)) - .setAttributes( - ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, RECORD_ID)); - ReceivedMessage expectedReceivedMessage = - new ReceivedMessage().setMessage(expectedPubsubMessage) - .setAckId(ACK_ID); - PullResponse expectedResponse = - new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage)); - Mockito.when((Object) (mockPubsub.projects() - .subscriptions() - .pull(expectedSubscription, expectedRequest) - .execute())) - .thenReturn(expectedResponse); - List acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); - assertEquals(1, acutalMessages.size()); - IncomingMessage actualMessage = acutalMessages.get(0); - assertEquals(ACK_ID, actualMessage.ackId); - assertEquals(DATA, new String(actualMessage.elementBytes)); - assertEquals(RECORD_ID, actualMessage.recordId); - assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); - assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); - } - - @Test - public void publishOneMessage() throws IOException { - String expectedTopic = TOPIC.getPath(); - PubsubMessage expectedPubsubMessage = new PubsubMessage() - .encodeData(DATA.getBytes()) - .setAttributes( - ImmutableMap. builder() - .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME)) - .put(ID_LABEL, RECORD_ID) - .put("k", "v").build()); - PublishRequest expectedRequest = new PublishRequest() - .setMessages(ImmutableList.of(expectedPubsubMessage)); - PublishResponse expectedResponse = new PublishResponse() - .setMessageIds(ImmutableList.of(MESSAGE_ID)); - Mockito.when((Object) (mockPubsub.projects() - .topics() - .publish(expectedTopic, expectedRequest) - .execute())) - .thenReturn(expectedResponse); - Map attrs = new HashMap<>(); - attrs.put("k", "v"); - OutgoingMessage actualMessage = new OutgoingMessage( - DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID); - int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); - assertEquals(1, n); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java deleted file mode 100644 index a1b7daf..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; - -import com.google.api.client.util.Clock; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for PubsubTestClient. - */ -@RunWith(JUnit4.class) -public class PubsubTestClientTest { - private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); - private static final SubscriptionPath SUBSCRIPTION = - PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); - private static final long REQ_TIME = 1234L; - private static final long MESSAGE_TIME = 6789L; - private static final String MESSAGE_ID = "testMessageId"; - private static final String DATA = "testData"; - private static final String ACK_ID = "testAckId"; - private static final int ACK_TIMEOUT_S = 60; - - @Test - public void pullOneMessage() throws IOException { - final AtomicLong now = new AtomicLong(); - Clock clock = new Clock() { - @Override - public long currentTimeMillis() { - return now.get(); - } - }; - IncomingMessage expectedIncomingMessage = - new IncomingMessage(DATA.getBytes(), null, MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID); - try (PubsubTestClientFactory factory = - PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, - Lists.newArrayList(expectedIncomingMessage))) { - try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) { - now.set(REQ_TIME); - client.advance(); - List incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); - assertEquals(1, incomingMessages.size()); - assertEquals(expectedIncomingMessage, incomingMessages.get(0)); - // Timeout on ACK. - now.addAndGet((ACK_TIMEOUT_S + 10) * 1000); - client.advance(); - incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); - assertEquals(1, incomingMessages.size()); - assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0)); - now.addAndGet(10 * 1000); - client.advance(); - // Extend ack - client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); - // Timeout on extended ACK - now.addAndGet(30 * 1000); - client.advance(); - incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); - assertEquals(1, incomingMessages.size()); - assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0)); - // Extend ack - client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); - // Ack - now.addAndGet(15 * 1000); - client.advance(); - client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID)); - } - } - } - - @Test - public void publishOneMessage() throws IOException { - OutgoingMessage expectedOutgoingMessage = - new OutgoingMessage(DATA.getBytes(), null, MESSAGE_TIME, MESSAGE_ID); - try (PubsubTestClientFactory factory = - PubsubTestClient.createFactoryForPublish( - TOPIC, - Sets.newHashSet(expectedOutgoingMessage), - ImmutableList.of())) { - try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) { - client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 1a2e100..d22c6c5 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -84,6 +84,16 @@ + com.google.apis + google-api-services-pubsub + + + + com.google.api.grpc + grpc-google-pubsub-v1 + + + com.google.auto.service auto-service true @@ -106,10 +116,44 @@ io.grpc + grpc-auth + + + + io.grpc grpc-core + io.grpc + grpc-netty + + + + io.netty + netty-handler + + + + io.grpc + grpc-stub + + + + + io.grpc + grpc-all + runtime + + + + io.grpc + grpc-protobuf + runtime + + + joda-time joda-time http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java new file mode 100644 index 0000000..750178c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.util.DateTime; +import com.google.common.base.Objects; +import com.google.common.base.Strings; +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PubsubOptions; + +/** + * An (abstract) helper class for talking to Pubsub via an underlying transport. + */ +abstract class PubsubClient implements Closeable { + /** + * Factory for creating clients. + */ + public interface PubsubClientFactory extends Serializable { + /** + * Construct a new Pubsub client. It should be closed via {@link #close} in order + * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources + * construct). Uses {@code options} to derive pubsub endpoints and application credentials. + * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom + * timestamps/ids within message metadata. + */ + PubsubClient newClient( + @Nullable String timestampLabel, + @Nullable String idLabel, + PubsubOptions options) throws IOException; + + /** + * Return the display name for this factory. Eg "Json", "gRPC". + */ + String getKind(); + } + + /** + * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}. + * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException} + * if timestamp cannot be recognized. + */ + @Nullable + private static Long asMsSinceEpoch(@Nullable String timestamp) { + if (Strings.isNullOrEmpty(timestamp)) { + return null; + } + try { + // Try parsing as milliseconds since epoch. Note there is no way to parse a + // string in RFC 3339 format here. + // Expected IllegalArgumentException if parsing fails; we use that to fall back + // to RFC 3339. + return Long.parseLong(timestamp); + } catch (IllegalArgumentException e1) { + // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an + // IllegalArgumentException if parsing fails, and the caller should handle. + return DateTime.parseRfc3339(timestamp).getValue(); + } + } + + /** + * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code + * attributes} and {@code pubsubTimestamp}. + * + *

If {@code timestampLabel} is non-{@literal null} then the message attributes must contain + * that label, and the value of that label will be taken as the timestamp. + * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code + * pubsubTimestamp}. + * + * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch + * or RFC3339 time. + */ + protected static long extractTimestamp( + @Nullable String timestampLabel, + @Nullable String pubsubTimestamp, + @Nullable Map attributes) { + Long timestampMsSinceEpoch; + if (Strings.isNullOrEmpty(timestampLabel)) { + timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp); + checkArgument(timestampMsSinceEpoch != null, + "Cannot interpret PubSub publish timestamp: %s", + pubsubTimestamp); + } else { + String value = attributes == null ? null : attributes.get(timestampLabel); + checkArgument(value != null, + "PubSub message is missing a value for timestamp label %s", + timestampLabel); + timestampMsSinceEpoch = asMsSinceEpoch(value); + checkArgument(timestampMsSinceEpoch != null, + "Cannot interpret value of label %s as timestamp: %s", + timestampLabel, value); + } + return timestampMsSinceEpoch; + } + + /** + * Path representing a cloud project id. + */ + static class ProjectPath implements Serializable { + private final String projectId; + + /** + * Creates a {@link ProjectPath} from a {@link String} representation, which + * must be of the form {@code "projects/" + projectId}. + */ + ProjectPath(String path) { + String[] splits = path.split("/"); + checkArgument( + splits.length == 2 && splits[0].equals("projects"), + "Malformed project path \"%s\": must be of the form \"projects/\" + ", + path); + this.projectId = splits[1]; + } + + public String getPath() { + return String.format("projects/%s", projectId); + } + + public String getId() { + return projectId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ProjectPath that = (ProjectPath) o; + + return projectId.equals(that.projectId); + } + + @Override + public int hashCode() { + return projectId.hashCode(); + } + + @Override + public String toString() { + return getPath(); + } + } + + public static ProjectPath projectPathFromPath(String path) { + return new ProjectPath(path); + } + + public static ProjectPath projectPathFromId(String projectId) { + return new ProjectPath(String.format("projects/%s", projectId)); + } + + /** + * Path representing a Pubsub subscription. + */ + public static class SubscriptionPath implements Serializable { + private final String projectId; + private final String subscriptionName; + + SubscriptionPath(String path) { + String[] splits = path.split("/"); + checkState( + splits.length == 4 && splits[0].equals("projects") && splits[2].equals("subscriptions"), + "Malformed subscription path %s: " + + "must be of the form \"projects/\" + + \"subscriptions\"", path); + this.projectId = splits[1]; + this.subscriptionName = splits[3]; + } + + public String getPath() { + return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName); + } + + public String getName() { + return subscriptionName; + } + + public String getV1Beta1Path() { + return String.format("/subscriptions/%s/%s", projectId, subscriptionName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubscriptionPath that = (SubscriptionPath) o; + return this.subscriptionName.equals(that.subscriptionName) + && this.projectId.equals(that.projectId); + } + + @Override + public int hashCode() { + return Objects.hashCode(projectId, subscriptionName); + } + + @Override + public String toString() { + return getPath(); + } + } + + public static SubscriptionPath subscriptionPathFromPath(String path) { + return new SubscriptionPath(path); + } + + public static SubscriptionPath subscriptionPathFromName( + String projectId, String subscriptionName) { + return new SubscriptionPath(String.format("projects/%s/subscriptions/%s", + projectId, subscriptionName)); + } + + /** + * Path representing a Pubsub topic. + */ + public static class TopicPath implements Serializable { + private final String path; + + TopicPath(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + public String getName() { + String[] splits = path.split("/"); + checkState(splits.length == 4, "Malformed topic path %s", path); + return splits[3]; + } + + public String getV1Beta1Path() { + String[] splits = path.split("/"); + checkState(splits.length == 4, "Malformed topic path %s", path); + return String.format("/topics/%s/%s", splits[1], splits[3]); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopicPath topicPath = (TopicPath) o; + return path.equals(topicPath.path); + } + + @Override + public int hashCode() { + return path.hashCode(); + } + + @Override + public String toString() { + return path; + } + } + + public static TopicPath topicPathFromPath(String path) { + return new TopicPath(path); + } + + public static TopicPath topicPathFromName(String projectId, String topicName) { + return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); + } + + /** + * A message to be sent to Pubsub. + * + *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. + * Java serialization is never used for non-test clients. + */ + static class OutgoingMessage implements Serializable { + /** + * Underlying (encoded) element. + */ + public final byte[] elementBytes; + + public final Map attributes; + + /** + * Timestamp for element (ms since epoch). + */ + public final long timestampMsSinceEpoch; + + /** + * If using an id label, the record id to associate with this record's metadata so the receiver + * can reject duplicates. Otherwise {@literal null}. + */ + @Nullable + public final String recordId; + + public OutgoingMessage(byte[] elementBytes, Map attributes, + long timestampMsSinceEpoch, @Nullable String recordId) { + this.elementBytes = elementBytes; + this.attributes = attributes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.recordId = recordId; + } + + @Override + public String toString() { + return String.format("OutgoingMessage(%db, %dms)", + elementBytes.length, timestampMsSinceEpoch); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OutgoingMessage that = (OutgoingMessage) o; + + return timestampMsSinceEpoch == that.timestampMsSinceEpoch + && Arrays.equals(elementBytes, that.elementBytes) + && Objects.equal(attributes, that.attributes) + && Objects.equal(recordId, that.recordId); + } + + @Override + public int hashCode() { + return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, + recordId); + } + } + + /** + * A message received from Pubsub. + * + *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. + * Java serialization is never used for non-test clients. + */ + static class IncomingMessage implements Serializable { + /** + * Underlying (encoded) element. + */ + public final byte[] elementBytes; + + public Map attributes; + + /** + * Timestamp for element (ms since epoch). Either Pubsub's processing time, + * or the custom timestamp associated with the message. + */ + public final long timestampMsSinceEpoch; + + /** + * Timestamp (in system time) at which we requested the message (ms since epoch). + */ + public final long requestTimeMsSinceEpoch; + + /** + * Id to pass back to Pubsub to acknowledge receipt of this message. + */ + public final String ackId; + + /** + * Id to pass to the runner to distinguish this message from all others. + */ + public final String recordId; + + public IncomingMessage( + byte[] elementBytes, + Map attributes, + long timestampMsSinceEpoch, + long requestTimeMsSinceEpoch, + String ackId, + String recordId) { + this.elementBytes = elementBytes; + this.attributes = attributes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; + this.ackId = ackId; + this.recordId = recordId; + } + + public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) { + return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, ackId, recordId); + } + + @Override + public String toString() { + return String.format("IncomingMessage(%db, %dms)", + elementBytes.length, timestampMsSinceEpoch); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + IncomingMessage that = (IncomingMessage) o; + + return timestampMsSinceEpoch == that.timestampMsSinceEpoch + && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch + && ackId.equals(that.ackId) + && recordId.equals(that.recordId) + && Arrays.equals(elementBytes, that.elementBytes) + && Objects.equal(attributes, that.attributes); + } + + @Override + public int hashCode() { + return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, + ackId, recordId); + } + } + + /** + * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages + * published. + */ + public abstract int publish(TopicPath topic, List outgoingMessages) + throws IOException; + + /** + * Request the next batch of up to {@code batchSize} messages from {@code subscription}. + * Return the received messages, or empty collection if none were available. Does not + * wait for messages to arrive if {@code returnImmediately} is {@literal true}. + * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}. + */ + public abstract List pull( + long requestTimeMsSinceEpoch, + SubscriptionPath subscription, + int batchSize, + boolean returnImmediately) + throws IOException; + + /** + * Acknowldege messages from {@code subscription} with {@code ackIds}. + */ + public abstract void acknowledge(SubscriptionPath subscription, List ackIds) + throws IOException; + + /** + * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to + * be {@code deadlineSeconds} from now. + */ + public abstract void modifyAckDeadline( + SubscriptionPath subscription, List ackIds, + int deadlineSeconds) throws IOException; + + /** + * Create {@code topic}. + */ + public abstract void createTopic(TopicPath topic) throws IOException; + + /* + * Delete {@code topic}. + */ + public abstract void deleteTopic(TopicPath topic) throws IOException; + + /** + * Return a list of topics for {@code project}. + */ + public abstract List listTopics(ProjectPath project) throws IOException; + + /** + * Create {@code subscription} to {@code topic}. + */ + public abstract void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; + + /** + * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It + * is the responsibility of the caller to later delete the subscription. + */ + public SubscriptionPath createRandomSubscription( + ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException { + // Create a randomized subscription derived from the topic name. + String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong(); + SubscriptionPath subscription = + PubsubClient.subscriptionPathFromName(project.getId(), subscriptionName); + createSubscription(topic, subscription, ackDeadlineSeconds); + return subscription; + } + + /** + * Delete {@code subscription}. + */ + public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException; + + /** + * Return a list of subscriptions for {@code topic} in {@code project}. + */ + public abstract List listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException; + + /** + * Return the ack deadline, in seconds, for {@code subscription}. + */ + public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException; + + /** + * Return {@literal true} if {@link #pull} will always return empty list. Actual clients + * will return {@literal false}. Test clients may return {@literal true} to signal that all + * expected messages have been pulled and the test may complete. + */ + public abstract boolean isEOF(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java new file mode 100644 index 0000000..912d59c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.auth.Credentials; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.DeleteSubscriptionRequest; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetSubscriptionRequest; +import com.google.pubsub.v1.ListSubscriptionsRequest; +import com.google.pubsub.v1.ListSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.SubscriberGrpc; +import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; +import io.grpc.Channel; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.auth.ClientAuthInterceptor; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.GcpOptions; +import org.apache.beam.sdk.options.PubsubOptions; + +/** + * A helper class for talking to Pubsub via grpc. + * + *

CAUTION: Currently uses the application default credentials and does not respect any + * credentials-related arguments in {@link GcpOptions}. + */ +class PubsubGrpcClient extends PubsubClient { + private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; + private static final int PUBSUB_PORT = 443; + private static final int LIST_BATCH_SIZE = 1000; + + private static final int DEFAULT_TIMEOUT_S = 15; + + private static class PubsubGrpcClientFactory implements PubsubClientFactory { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + ManagedChannel channel = NettyChannelBuilder + .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) + .build(); + + return new PubsubGrpcClient(timestampLabel, + idLabel, + DEFAULT_TIMEOUT_S, + channel, + options.getGcpCredential()); + } + + @Override + public String getKind() { + return "Grpc"; + } + } + + /** + * Factory for creating Pubsub clients using gRCP transport. + */ + public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory(); + + /** + * Timeout for grpc calls (in s). + */ + private final int timeoutSec; + + /** + * Underlying netty channel, or {@literal null} if closed. + */ + @Nullable + private ManagedChannel publisherChannel; + + /** + * Credentials determined from options and environment. + */ + private final Credentials credentials; + + /** + * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time + * instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. + */ + @Nullable + private final String idLabel; + + + /** + * Cached stubs, or null if not cached. + */ + @Nullable + private PublisherGrpc.PublisherBlockingStub cachedPublisherStub; + private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub; + + @VisibleForTesting + PubsubGrpcClient( + @Nullable String timestampLabel, + @Nullable String idLabel, + int timeoutSec, + ManagedChannel publisherChannel, + Credentials credentials) { + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.timeoutSec = timeoutSec; + this.publisherChannel = publisherChannel; + this.credentials = credentials; + } + + /** + * Gracefully close the underlying netty channel. + */ + @Override + public void close() { + if (publisherChannel == null) { + // Already closed. + return; + } + // Can gc the underlying stubs. + cachedPublisherStub = null; + cachedSubscriberStub = null; + // Mark the client as having been closed before going further + // in case we have an exception from the channel. + ManagedChannel publisherChannel = this.publisherChannel; + this.publisherChannel = null; + // Gracefully shutdown the channel. + publisherChannel.shutdown(); + try { + publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore. + Thread.currentThread().interrupt(); + } + } + + /** + * Return channel with interceptor for returning credentials. + */ + private Channel newChannel() throws IOException { + checkState(publisherChannel != null, "PubsubGrpcClient has been closed"); + ClientAuthInterceptor interceptor = + new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor()); + return ClientInterceptors.intercept(publisherChannel, interceptor); + } + + /** + * Return a stub for making a publish request with a timeout. + */ + private PublisherBlockingStub publisherStub() throws IOException { + if (cachedPublisherStub == null) { + cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel()); + } + return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); + } + + /** + * Return a stub for making a subscribe request with a timeout. + */ + private SubscriberBlockingStub subscriberStub() throws IOException { + if (cachedSubscriberStub == null) { + cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel()); + } + return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); + } + + @Override + public int publish(TopicPath topic, List outgoingMessages) + throws IOException { + PublishRequest.Builder request = PublishRequest.newBuilder() + .setTopic(topic.getPath()); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + PubsubMessage.Builder message = + PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(outgoingMessage.elementBytes)); + + if (outgoingMessage.attributes != null) { + message.putAllAttributes(outgoingMessage.attributes); + } + + if (timestampLabel != null) { + message.getMutableAttributes() + .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); + } + + if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + message.getMutableAttributes().put(idLabel, outgoingMessage.recordId); + } + + request.addMessages(message); + } + + PublishResponse response = publisherStub().publish(request.build()); + return response.getMessageIdsCount(); + } + + @Override + public List pull( + long requestTimeMsSinceEpoch, + SubscriptionPath subscription, + int batchSize, + boolean returnImmediately) throws IOException { + PullRequest request = PullRequest.newBuilder() + .setSubscription(subscription.getPath()) + .setReturnImmediately(returnImmediately) + .setMaxMessages(batchSize) + .build(); + PullResponse response = subscriberStub().pull(request); + if (response.getReceivedMessagesCount() == 0) { + return ImmutableList.of(); + } + List incomingMessages = new ArrayList<>(response.getReceivedMessagesCount()); + for (ReceivedMessage message : response.getReceivedMessagesList()) { + PubsubMessage pubsubMessage = message.getMessage(); + @Nullable Map attributes = pubsubMessage.getAttributes(); + + // Payload. + byte[] elementBytes = pubsubMessage.getData().toByteArray(); + + // Timestamp. + String pubsubTimestampString = null; + Timestamp timestampProto = pubsubMessage.getPublishTime(); + if (timestampProto != null) { + pubsubTimestampString = String.valueOf(timestampProto.getSeconds() + + timestampProto.getNanos() / 1000L); + } + long timestampMsSinceEpoch = + extractTimestamp(timestampLabel, pubsubTimestampString, attributes); + + // Ack id. + String ackId = message.getAckId(); + checkState(!Strings.isNullOrEmpty(ackId)); + + // Record id, if any. + @Nullable String recordId = null; + if (idLabel != null && attributes != null) { + recordId = attributes.get(idLabel); + } + if (Strings.isNullOrEmpty(recordId)) { + // Fall back to the Pubsub provided message id. + recordId = pubsubMessage.getMessageId(); + } + + incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, ackId, recordId)); + } + return incomingMessages; + } + + @Override + public void acknowledge(SubscriptionPath subscription, List ackIds) + throws IOException { + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(subscription.getPath()) + .addAllAckIds(ackIds) + .build(); + subscriberStub().acknowledge(request); // ignore Empty result. + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List ackIds, int deadlineSeconds) + throws IOException { + ModifyAckDeadlineRequest request = + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(subscription.getPath()) + .addAllAckIds(ackIds) + .setAckDeadlineSeconds(deadlineSeconds) + .build(); + subscriberStub().modifyAckDeadline(request); // ignore Empty result. + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + Topic request = Topic.newBuilder() + .setName(topic.getPath()) + .build(); + publisherStub().createTopic(request); // ignore Topic result. + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + DeleteTopicRequest request = DeleteTopicRequest.newBuilder() + .setTopic(topic.getPath()) + .build(); + publisherStub().deleteTopic(request); // ignore Empty result. + } + + @Override + public List listTopics(ProjectPath project) throws IOException { + ListTopicsRequest.Builder request = + ListTopicsRequest.newBuilder() + .setProject(project.getPath()) + .setPageSize(LIST_BATCH_SIZE); + ListTopicsResponse response = publisherStub().listTopics(request.build()); + if (response.getTopicsCount() == 0) { + return ImmutableList.of(); + } + List topics = new ArrayList<>(response.getTopicsCount()); + while (true) { + for (Topic topic : response.getTopicsList()) { + topics.add(topicPathFromPath(topic.getName())); + } + if (response.getNextPageToken().isEmpty()) { + break; + } + request.setPageToken(response.getNextPageToken()); + response = publisherStub().listTopics(request.build()); + } + return topics; + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, + int ackDeadlineSeconds) throws IOException { + Subscription request = Subscription.newBuilder() + .setTopic(topic.getPath()) + .setName(subscription.getPath()) + .setAckDeadlineSeconds(ackDeadlineSeconds) + .build(); + subscriberStub().createSubscription(request); // ignore Subscription result. + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + DeleteSubscriptionRequest request = + DeleteSubscriptionRequest.newBuilder() + .setSubscription(subscription.getPath()) + .build(); + subscriberStub().deleteSubscription(request); // ignore Empty result. + } + + @Override + public List listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException { + ListSubscriptionsRequest.Builder request = + ListSubscriptionsRequest.newBuilder() + .setProject(project.getPath()) + .setPageSize(LIST_BATCH_SIZE); + ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build()); + if (response.getSubscriptionsCount() == 0) { + return ImmutableList.of(); + } + List subscriptions = new ArrayList<>(response.getSubscriptionsCount()); + while (true) { + for (Subscription subscription : response.getSubscriptionsList()) { + if (subscription.getTopic().equals(topic.getPath())) { + subscriptions.add(subscriptionPathFromPath(subscription.getName())); + } + } + if (response.getNextPageToken().isEmpty()) { + break; + } + request.setPageToken(response.getNextPageToken()); + response = subscriberStub().listSubscriptions(request.build()); + } + return subscriptions; + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + GetSubscriptionRequest request = + GetSubscriptionRequest.newBuilder() + .setSubscription(subscription.getPath()) + .build(); + Subscription response = subscriberStub().getSubscription(request); + return response.getAckDeadlineSeconds(); + } + + @Override + public boolean isEOF() { + return false; + } +}