Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5E41D200C41 for ; Fri, 24 Mar 2017 19:28:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5CFE3160B9B; Fri, 24 Mar 2017 18:28:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 85E6C160B75 for ; Fri, 24 Mar 2017 19:28:12 +0100 (CET) Received: (qmail 70798 invoked by uid 500); 24 Mar 2017 18:28:10 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 70787 invoked by uid 99); 24 Mar 2017 18:28:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Mar 2017 18:28:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EDB2D1A04BE for ; Fri, 24 Mar 2017 18:28:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.02 X-Spam-Level: X-Spam-Status: No, score=-4.02 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id UEELKYKL8e3j for ; Fri, 24 Mar 2017 18:28:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D572A5FAD1 for ; Fri, 24 Mar 2017 18:28:07 +0000 (UTC) Received: (qmail 70271 invoked by uid 99); 24 Mar 2017 18:28:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Mar 2017 18:28:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3F49ADF984; Fri, 24 Mar 2017 18:28:06 +0000 (UTC) From: gyfora To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee... Content-Type: text/plain Message-Id: <20170324182806.3F49ADF984@git1-us-west.apache.org> Date: Fri, 24 Mar 2017 18:28:06 +0000 (UTC) archived-at: Fri, 24 Mar 2017 18:28:13 -0000 Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107969496 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + *

+ * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask { +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // ------------------------------------------------------------------------ - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); + } + @Override protected void run() throws Exception { - + final String iterationId = getConfiguration().getIterationId(); if (iterationId == null || iterationId.length() == 0) { throw new Exception("Missing iteration ID in the task configuration"); } - - final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId , - getEnvironment().getTaskInfo().getIndexOfThisSubtask()); - + final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId, + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); final long iterationWaitTime = getConfiguration().getIterationWaitTime(); final boolean shouldWait = iterationWaitTime > 0; - final BlockingQueue> dataChannel = new ArrayBlockingQueue>(1); + final BlockingQueue, CheckpointBarrier>> dataChannel + = new ArrayBlockingQueue<>(1); // offer the queue for the tail BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel); LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID); // do the work try { - @SuppressWarnings("unchecked") - RecordWriterOutput[] outputs = (RecordWriterOutput[]) getStreamOutputs(); + outputs = (RecordWriterOutput[]) getStreamOutputs(); // If timestamps are enabled we make sure to remove cyclic watermark dependencies if (isSerializingTimestamps()) { - for (RecordWriterOutput output : outputs) { + for (RecordWriterOutput output : outputs) { output.emitWatermark(new Watermark(Long.MAX_VALUE)); } } + synchronized (lock) { + //emit in-flight events in the upstream log upon recovery + for (StreamRecord rec : upstreamLogger.getReplayLog()) { --- End diff -- Maybe it would make sense to put this in a while loop and check "running" as well, to cancel early if the job is cancelled during replay --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---