Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 05166200D07 for ; Wed, 23 Aug 2017 19:09:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 037821632E0; Wed, 23 Aug 2017 17:09:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 41379161F86 for ; Wed, 23 Aug 2017 19:09:15 +0200 (CEST) Received: (qmail 25144 invoked by uid 500); 23 Aug 2017 17:09:14 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 23745 invoked by uid 99); 23 Aug 2017 17:09:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 17:09:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8715BF5EFD; Wed, 23 Aug 2017 17:09:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: iemejia@apache.org To: commits@beam.apache.org Date: Wed, 23 Aug 2017 17:09:34 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [26/55] [abbrv] beam git commit: Remove NexmarkDrivers and make execution runner-agnostic archived-at: Wed, 23 Aug 2017 17:09:17 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java deleted file mode 100644 index afddbd8..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.integration.nexmark.io; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.Pubsub.Builder; -import com.google.api.services.pubsub.model.AcknowledgeRequest; -import com.google.api.services.pubsub.model.ListSubscriptionsResponse; -import com.google.api.services.pubsub.model.ListTopicsResponse; -import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PublishResponse; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.api.services.pubsub.model.PullRequest; -import com.google.api.services.pubsub.model.PullResponse; -import com.google.api.services.pubsub.model.ReceivedMessage; -import com.google.api.services.pubsub.model.Subscription; -import com.google.api.services.pubsub.model.Topic; -import com.google.auth.Credentials; -import com.google.auth.http.HttpCredentialsAdapter; -import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; -import org.apache.beam.sdk.util.RetryHttpRequestInitializer; -import org.apache.beam.sdk.util.Transport; - -/** - * A Pubsub client using JSON transport. - */ -class PubsubJsonClient extends PubsubClient { - - private static class PubsubJsonClientFactory implements PubsubClientFactory { - private static HttpRequestInitializer chainHttpRequestInitializer( - Credentials credential, HttpRequestInitializer httpRequestInitializer) { - if (credential == null) { - return httpRequestInitializer; - } else { - return new ChainingHttpRequestInitializer( - new HttpCredentialsAdapter(credential), - httpRequestInitializer); - } - } - - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - Pubsub pubsub = new Builder( - Transport.getTransport(), - Transport.getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setRootUrl(options.getPubsubRootUrl()) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) - .build(); - return new PubsubJsonClient(timestampLabel, idLabel, pubsub); - } - - @Override - public String getKind() { - return "Json"; - } - } - - /** - * Factory for creating Pubsub clients using Json transport. - */ - public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory(); - - /** - * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time - * instead. - */ - @Nullable - private final String timestampLabel; - - /** - * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. - */ - @Nullable - private final String idLabel; - - /** - * Underlying JSON transport. - */ - private Pubsub pubsub; - - @VisibleForTesting PubsubJsonClient( - @Nullable String timestampLabel, - @Nullable String idLabel, - Pubsub pubsub) { - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.pubsub = pubsub; - } - - @Override - public void close() { - // Nothing to close. - } - - @Override - public int publish(TopicPath topic, List outgoingMessages) - throws IOException { - List pubsubMessages = new ArrayList<>(outgoingMessages.size()); - for (OutgoingMessage outgoingMessage : outgoingMessages) { - PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); - - Map attributes = outgoingMessage.attributes; - if ((timestampLabel != null || idLabel != null) && attributes == null) { - attributes = new TreeMap<>(); - } - if (attributes != null) { - pubsubMessage.setAttributes(attributes); - } - - if (timestampLabel != null) { - attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); - } - - if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { - attributes.put(idLabel, outgoingMessage.recordId); - } - - pubsubMessages.add(pubsubMessage); - } - PublishRequest request = new PublishRequest().setMessages(pubsubMessages); - PublishResponse response = pubsub.projects() - .topics() - .publish(topic.getPath(), request) - .execute(); - return response.getMessageIds().size(); - } - - @Override - public List pull( - long requestTimeMsSinceEpoch, - SubscriptionPath subscription, - int batchSize, - boolean returnImmediately) throws IOException { - PullRequest request = new PullRequest() - .setReturnImmediately(returnImmediately) - .setMaxMessages(batchSize); - PullResponse response = pubsub.projects() - .subscriptions() - .pull(subscription.getPath(), request) - .execute(); - if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) { - return ImmutableList.of(); - } - List incomingMessages = new ArrayList<>(response.getReceivedMessages().size()); - for (ReceivedMessage message : response.getReceivedMessages()) { - PubsubMessage pubsubMessage = message.getMessage(); - @Nullable Map attributes = pubsubMessage.getAttributes(); - - // Payload. - byte[] elementBytes = pubsubMessage.decodeData(); - - // Timestamp. - long timestampMsSinceEpoch = - extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes); - - // Ack id. - String ackId = message.getAckId(); - checkState(!Strings.isNullOrEmpty(ackId)); - - // Record id, if any. - @Nullable String recordId = null; - if (idLabel != null && attributes != null) { - recordId = attributes.get(idLabel); - } - if (Strings.isNullOrEmpty(recordId)) { - // Fall back to the Pubsub provided message id. - recordId = pubsubMessage.getMessageId(); - } - - incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, - requestTimeMsSinceEpoch, ackId, recordId)); - } - - return incomingMessages; - } - - @Override - public void acknowledge(SubscriptionPath subscription, List ackIds) throws IOException { - AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds); - pubsub.projects() - .subscriptions() - .acknowledge(subscription.getPath(), request) - .execute(); // ignore Empty result. - } - - @Override - public void modifyAckDeadline( - SubscriptionPath subscription, List ackIds, int deadlineSeconds) - throws IOException { - ModifyAckDeadlineRequest request = - new ModifyAckDeadlineRequest().setAckIds(ackIds) - .setAckDeadlineSeconds(deadlineSeconds); - pubsub.projects() - .subscriptions() - .modifyAckDeadline(subscription.getPath(), request) - .execute(); // ignore Empty result. - } - - @Override - public void createTopic(TopicPath topic) throws IOException { - pubsub.projects() - .topics() - .create(topic.getPath(), new Topic()) - .execute(); // ignore Topic result. - } - - @Override - public void deleteTopic(TopicPath topic) throws IOException { - pubsub.projects() - .topics() - .delete(topic.getPath()) - .execute(); // ignore Empty result. - } - - @Override - public List listTopics(ProjectPath project) throws IOException { - ListTopicsResponse response = pubsub.projects() - .topics() - .list(project.getPath()) - .execute(); - if (response.getTopics() == null || response.getTopics().isEmpty()) { - return ImmutableList.of(); - } - List topics = new ArrayList<>(response.getTopics().size()); - for (Topic topic : response.getTopics()) { - topics.add(topicPathFromPath(topic.getName())); - } - return topics; - } - - @Override - public void createSubscription( - TopicPath topic, SubscriptionPath subscription, - int ackDeadlineSeconds) throws IOException { - Subscription request = new Subscription() - .setTopic(topic.getPath()) - .setAckDeadlineSeconds(ackDeadlineSeconds); - pubsub.projects() - .subscriptions() - .create(subscription.getPath(), request) - .execute(); // ignore Subscription result. - } - - @Override - public void deleteSubscription(SubscriptionPath subscription) throws IOException { - pubsub.projects() - .subscriptions() - .delete(subscription.getPath()) - .execute(); // ignore Empty result. - } - - @Override - public List listSubscriptions(ProjectPath project, TopicPath topic) - throws IOException { - ListSubscriptionsResponse response = pubsub.projects() - .subscriptions() - .list(project.getPath()) - .execute(); - if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) { - return ImmutableList.of(); - } - List subscriptions = new ArrayList<>(response.getSubscriptions().size()); - for (Subscription subscription : response.getSubscriptions()) { - if (subscription.getTopic().equals(topic.getPath())) { - subscriptions.add(subscriptionPathFromPath(subscription.getName())); - } - } - return subscriptions; - } - - @Override - public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { - Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute(); - return response.getAckDeadlineSeconds(); - } - - @Override - public boolean isEOF() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java deleted file mode 100644 index 69ba2b0..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.integration.nexmark.io; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.client.util.Clock; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; - -/** - * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for - * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline} - * methods. Relies on statics to mimic the Pubsub service, though we try to hide that. - */ -class PubsubTestClient extends PubsubClient implements Serializable { - /** - * Mimic the state of the simulated Pubsub 'service'. - * - *

Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running - * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created - * from the same client factory and run in parallel. Thus we can't enforce aliasing of the - * following data structures over all clients and must resort to a static. - */ - private static class State { - /** - * True if has been primed for a test but not yet validated. - */ - boolean isActive; - - /** - * Publish mode only: Only publish calls for this topic are allowed. - */ - @Nullable - TopicPath expectedTopic; - - /** - * Publish mode only: Messages yet to seen in a {@link #publish} call. - */ - @Nullable - Set remainingExpectedOutgoingMessages; - - /** - * Publish mode only: Messages which should throw when first sent to simulate transient publish - * failure. - */ - @Nullable - Set remainingFailingOutgoingMessages; - - /** - * Pull mode only: Clock from which to get current time. - */ - @Nullable - Clock clock; - - /** - * Pull mode only: Only pull calls for this subscription are allowed. - */ - @Nullable - SubscriptionPath expectedSubscription; - - /** - * Pull mode only: Timeout to simulate. - */ - int ackTimeoutSec; - - /** - * Pull mode only: Messages waiting to be received by a {@link #pull} call. - */ - @Nullable - List remainingPendingIncomingMessages; - - /** - * Pull mode only: Messages which have been returned from a {@link #pull} call and - * not yet ACKed by an {@link #acknowledge} call. - */ - @Nullable - Map pendingAckIncomingMessages; - - /** - * Pull mode only: When above messages are due to have their ACK deadlines expire. - */ - @Nullable - Map ackDeadline; - } - - private static final State STATE = new State(); - - /** Closing the factory will validate all expected messages were processed. */ - public interface PubsubTestClientFactory - extends PubsubClientFactory, Closeable, Serializable { - } - - /** - * Return a factory for testing publishers. Only one factory may be in-flight at a time. - * The factory must be closed when the test is complete, at which point final validation will - * occur. - */ - static PubsubTestClientFactory createFactoryForPublish( - final TopicPath expectedTopic, - final Iterable expectedOutgoingMessages, - final Iterable failingOutgoingMessages) { - synchronized (STATE) { - checkState(!STATE.isActive, "Test still in flight"); - STATE.expectedTopic = expectedTopic; - STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages); - STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages); - STATE.isActive = true; - } - return new PubsubTestClientFactory() { - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - return new PubsubTestClient(); - } - - @Override - public String getKind() { - return "PublishTest"; - } - - @Override - public void close() { - synchronized (STATE) { - checkState(STATE.isActive, "No test still in flight"); - checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(), - "Still waiting for %s messages to be published", - STATE.remainingExpectedOutgoingMessages.size()); - STATE.isActive = false; - STATE.remainingExpectedOutgoingMessages = null; - } - } - }; - } - - /** - * Return a factory for testing subscribers. Only one factory may be in-flight at a time. - * The factory must be closed when the test in complete - */ - public static PubsubTestClientFactory createFactoryForPull( - final Clock clock, - final SubscriptionPath expectedSubscription, - final int ackTimeoutSec, - final Iterable expectedIncomingMessages) { - synchronized (STATE) { - checkState(!STATE.isActive, "Test still in flight"); - STATE.clock = clock; - STATE.expectedSubscription = expectedSubscription; - STATE.ackTimeoutSec = ackTimeoutSec; - STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages); - STATE.pendingAckIncomingMessages = new HashMap<>(); - STATE.ackDeadline = new HashMap<>(); - STATE.isActive = true; - } - return new PubsubTestClientFactory() { - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - return new PubsubTestClient(); - } - - @Override - public String getKind() { - return "PullTest"; - } - - @Override - public void close() { - synchronized (STATE) { - checkState(STATE.isActive, "No test still in flight"); - checkState(STATE.remainingPendingIncomingMessages.isEmpty(), - "Still waiting for %s messages to be pulled", - STATE.remainingPendingIncomingMessages.size()); - checkState(STATE.pendingAckIncomingMessages.isEmpty(), - "Still waiting for %s messages to be ACKed", - STATE.pendingAckIncomingMessages.size()); - checkState(STATE.ackDeadline.isEmpty(), - "Still waiting for %s messages to be ACKed", - STATE.ackDeadline.size()); - STATE.isActive = false; - STATE.remainingPendingIncomingMessages = null; - STATE.pendingAckIncomingMessages = null; - STATE.ackDeadline = null; - } - } - }; - } - - public static PubsubTestClientFactory createFactoryForCreateSubscription() { - return new PubsubTestClientFactory() { - int numCalls = 0; - - @Override - public void close() throws IOException { - checkState( - numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls); - } - - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - return new PubsubTestClient() { - @Override - public void createSubscription( - TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) - throws IOException { - checkState(numCalls == 0, "Expected at most one subscription to be created"); - numCalls++; - } - }; - } - - @Override - public String getKind() { - return "CreateSubscriptionTest"; - } - }; - } - - /** - * Return true if in pull mode. - */ - private boolean inPullMode() { - checkState(STATE.isActive, "No test is active"); - return STATE.expectedSubscription != null; - } - - /** - * Return true if in publish mode. - */ - private boolean inPublishMode() { - checkState(STATE.isActive, "No test is active"); - return STATE.expectedTopic != null; - } - - /** - * For subscription mode only: - * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub - * expiring - * outstanding ACKs. - */ - public void advance() { - synchronized (STATE) { - checkState(inPullMode(), "Can only advance in pull mode"); - // Any messages who's ACKs timed out are available for re-pulling. - Iterator> deadlineItr = STATE.ackDeadline.entrySet().iterator(); - while (deadlineItr.hasNext()) { - Map.Entry entry = deadlineItr.next(); - if (entry.getValue() <= STATE.clock.currentTimeMillis()) { - STATE.remainingPendingIncomingMessages.add( - STATE.pendingAckIncomingMessages.remove(entry.getKey())); - deadlineItr.remove(); - } - } - } - } - - @Override - public void close() { - } - - @Override - public int publish( - TopicPath topic, List outgoingMessages) throws IOException { - synchronized (STATE) { - checkState(inPublishMode(), "Can only publish in publish mode"); - checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic, - STATE.expectedTopic); - for (OutgoingMessage outgoingMessage : outgoingMessages) { - if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) { - throw new RuntimeException("Simulating failure for " + outgoingMessage); - } - checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage), - "Unexpected outgoing message %s", outgoingMessage); - } - return outgoingMessages.size(); - } - } - - @Override - public List pull( - long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize, - boolean returnImmediately) throws IOException { - synchronized (STATE) { - checkState(inPullMode(), "Can only pull in pull mode"); - long now = STATE.clock.currentTimeMillis(); - checkState(requestTimeMsSinceEpoch == now, - "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch); - checkState(subscription.equals(STATE.expectedSubscription), - "Subscription %s does not match expected %s", subscription, - STATE.expectedSubscription); - checkState(returnImmediately, "Pull only supported if returning immediately"); - - List incomingMessages = new ArrayList<>(); - Iterator pendItr = STATE.remainingPendingIncomingMessages.iterator(); - while (pendItr.hasNext()) { - IncomingMessage incomingMessage = pendItr.next(); - pendItr.remove(); - IncomingMessage incomingMessageWithRequestTime = - incomingMessage.withRequestTime(requestTimeMsSinceEpoch); - incomingMessages.add(incomingMessageWithRequestTime); - STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId, - incomingMessageWithRequestTime); - STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId, - requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000); - if (incomingMessages.size() >= batchSize) { - break; - } - } - return incomingMessages; - } - } - - @Override - public void acknowledge( - SubscriptionPath subscription, - List ackIds) throws IOException { - synchronized (STATE) { - checkState(inPullMode(), "Can only acknowledge in pull mode"); - checkState(subscription.equals(STATE.expectedSubscription), - "Subscription %s does not match expected %s", subscription, - STATE.expectedSubscription); - - for (String ackId : ackIds) { - checkState(STATE.ackDeadline.remove(ackId) != null, - "No message with ACK id %s is waiting for an ACK", ackId); - checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null, - "No message with ACK id %s is waiting for an ACK", ackId); - } - } - } - - @Override - public void modifyAckDeadline( - SubscriptionPath subscription, List ackIds, int deadlineSeconds) throws IOException { - synchronized (STATE) { - checkState(inPullMode(), "Can only modify ack deadline in pull mode"); - checkState(subscription.equals(STATE.expectedSubscription), - "Subscription %s does not match expected %s", subscription, - STATE.expectedSubscription); - - for (String ackId : ackIds) { - if (deadlineSeconds > 0) { - checkState(STATE.ackDeadline.remove(ackId) != null, - "No message with ACK id %s is waiting for an ACK", ackId); - checkState(STATE.pendingAckIncomingMessages.containsKey(ackId), - "No message with ACK id %s is waiting for an ACK", ackId); - STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000); - } else { - checkState(STATE.ackDeadline.remove(ackId) != null, - "No message with ACK id %s is waiting for an ACK", ackId); - IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId); - checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId); - STATE.remainingPendingIncomingMessages.add(message); - } - } - } - } - - @Override - public void createTopic(TopicPath topic) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void deleteTopic(TopicPath topic) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public List listTopics(ProjectPath project) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void createSubscription( - TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void deleteSubscription(SubscriptionPath subscription) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public List listSubscriptions( - ProjectPath project, TopicPath topic) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { - synchronized (STATE) { - return STATE.ackTimeoutSec; - } - } - - @Override - public boolean isEOF() { - synchronized (STATE) { - checkState(inPullMode(), "Can only check EOF in pull mode"); - return STATE.remainingPendingIncomingMessages.isEmpty(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java deleted file mode 100644 index 1161f3e..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Nexmark Beam IO related utilities. - */ -package org.apache.beam.integration.nexmark.io; http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java index c5d7725..d95461a 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java @@ -18,7 +18,7 @@ package org.apache.beam.integration.nexmark.sources; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.integration.nexmark.NexmarkOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -38,7 +38,7 @@ public class BoundedEventSourceTest { @Test public void sourceAndReadersWork() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class); long n = 200L; BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); @@ -48,7 +48,7 @@ public class BoundedEventSourceTest { @Test public void splitAtFractionRespectsContract() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class); long n = 20L; BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); @@ -62,7 +62,7 @@ public class BoundedEventSourceTest { @Test public void splitIntoBundlesRespectsContract() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class); long n = 200L; BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); SourceTestUtils.assertSourcesEqualReferenceSource(