Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0D8E9200AC0 for ; Mon, 9 May 2016 20:46:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0C51B1609A8; Mon, 9 May 2016 18:46:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 30AA216099C for ; Mon, 9 May 2016 20:46:58 +0200 (CEST) Received: (qmail 51597 invoked by uid 500); 9 May 2016 18:46:57 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 51588 invoked by uid 99); 9 May 2016 18:46:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 May 2016 18:46:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A5EFDC7355 for ; Mon, 9 May 2016 18:46:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.299 X-Spam-Level: X-Spam-Status: No, score=-5.299 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.079] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id SCFuvTIneB9l for ; Mon, 9 May 2016 18:46:55 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 65DF65F3DA for ; Mon, 9 May 2016 18:46:55 +0000 (UTC) Received: (qmail 51570 invoked by uid 99); 9 May 2016 18:46:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 May 2016 18:46:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 898AFDFA6C; Mon, 9 May 2016 18:46:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Mon, 09 May 2016 18:46:54 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] incubator-beam git commit: Refactor CompletionCallbacks archived-at: Mon, 09 May 2016 18:46:59 -0000 Repository: incubator-beam Updated Branches: refs/heads/master da1b7556b -> 272493ed7 Refactor CompletionCallbacks The default and timerful completion callbacks are identical, excepting their calls to evaluationContext.commitResult; factor that code into a common location. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e7df160a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e7df160a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e7df160a Branch: refs/heads/master Commit: e7df160a2cde6dead6c4f7e0ec0aaa5e4808239d Parents: 9b9d73f Author: Thomas Groh Authored: Tue May 3 13:22:13 2016 -0700 Committer: Thomas Groh Committed: Tue May 3 14:27:22 2016 -0700 ---------------------------------------------------------------------- .../direct/ExecutorServiceParallelExecutor.java | 49 ++++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7df160a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 18af363..6f26b6b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -210,16 +210,20 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } /** - * The default {@link CompletionCallback}. The default completion callback is used to complete - * transform evaluations that are triggered due to the arrival of elements from an upstream - * transform, or for a source transform. + * The base implementation of {@link CompletionCallback} that provides implementations for + * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and + * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of + * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}. */ - private class DefaultCompletionCallback implements CompletionCallback { + private abstract class CompletionCallbackBase implements CompletionCallback { + protected abstract CommittedResult getCommittedResult( + CommittedBundle inputBundle, + InProcessTransformResult result); + @Override - public CommittedResult handleResult( + public final CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - CommittedResult committedResult = - evaluationContext.handleResult(inputBundle, Collections.emptyList(), result); + CommittedResult committedResult = getCommittedResult(inputBundle, result); for (CommittedBundle outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } @@ -233,12 +237,27 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } /** + * The default {@link CompletionCallback}. The default completion callback is used to complete + * transform evaluations that are triggered due to the arrival of elements from an upstream + * transform, or for a source transform. + */ + private class DefaultCompletionCallback extends CompletionCallbackBase { + @Override + public CommittedResult getCommittedResult( + CommittedBundle inputBundle, InProcessTransformResult result) { + return evaluationContext.handleResult(inputBundle, + Collections.emptyList(), + result); + } + } + + /** * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the * timers used to create the input to the {@link InProcessEvaluationContext evaluation context} * as part of the result. */ - private class TimerCompletionCallback implements CompletionCallback { + private class TimerCompletionCallback extends CompletionCallbackBase { private final Iterable timers; private TimerCompletionCallback(Iterable timers) { @@ -246,19 +265,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } @Override - public CommittedResult handleResult( + public CommittedResult getCommittedResult( CommittedBundle inputBundle, InProcessTransformResult result) { - CommittedResult committedResult = - evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle outputBundle : committedResult.getOutputs()) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); - } - return committedResult; - } - - @Override - public void handleThrowable(CommittedBundle inputBundle, Throwable t) { - allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + return evaluationContext.handleResult(inputBundle, timers, result); } }