Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0FFE3200B0F for ; Fri, 17 Jun 2016 13:33:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0E7FE160A61; Fri, 17 Jun 2016 11:33:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 30B00160A50 for ; Fri, 17 Jun 2016 13:33:27 +0200 (CEST) Received: (qmail 46506 invoked by uid 500); 17 Jun 2016 11:33:26 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 46497 invoked by uid 99); 17 Jun 2016 11:33:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jun 2016 11:33:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 32352E049D; Fri, 17 Jun 2016 11:33:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Message-Id: <7dafb9a5e64846a6b0297a815c55aa06@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis Date: Fri, 17 Jun 2016 11:33:26 +0000 (UTC) archived-at: Fri, 17 Jun 2016 11:33:28 -0000 Repository: flink Updated Branches: refs/heads/master ec6d97528 -> fc4abd7ff [FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc4abd7f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc4abd7f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc4abd7f Branch: refs/heads/master Commit: fc4abd7fff5fa9bbfbd2196e61bf696a1dd57ad7 Parents: ec6d975 Author: kl0u Authored: Thu Jun 16 18:18:28 2016 +0200 Committer: Aljoscha Krettek Committed: Fri Jun 17 13:32:02 2016 +0200 ---------------------------------------------------------------------- .../flink/api/java/io/AvroInputFormat.java | 23 +++++++++++--------- .../flink/api/common/io/BinaryInputFormat.java | 17 +++++++++------ .../api/common/io/DelimitedInputFormat.java | 12 +++++++--- .../source/ContinuousFileReaderOperator.java | 16 +++++++++++--- 4 files changed, 45 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java index a920275..73067c1 100644 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java +++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java @@ -68,7 +68,7 @@ public class AvroInputFormat extends FileInputFormat implements ResultType private transient long recordsReadSinceLastSync; - private transient long lastSync = -1l; + private long lastSync = -1l; public AvroInputFormat(Path filePath, Class type) { super(filePath); @@ -186,18 +186,21 @@ public class AvroInputFormat extends FileInputFormat implements ResultType Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); - this.open(split); - if (state.f0 != -1) { + try { + this.open(split); + } finally { + if (state.f0 != -1) { + lastSync = state.f0; + recordsReadSinceLastSync = state.f1; + } + } - // go to the block we stopped - lastSync = state.f0; + if (lastSync != -1) { + // open and read until the record we were before + // the checkpoint and discard the values dataFileReader.seek(lastSync); - - // read until the record we were before the checkpoint and discard the values - long recordsToDiscard = state.f1; - for(int i = 0; i < recordsToDiscard; i++) { + for(int i = 0; i < recordsReadSinceLastSync; i++) { dataFileReader.next(null); - recordsReadSinceLastSync++; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index eb83bda..96e0e0d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -390,14 +390,17 @@ public abstract class BinaryInputFormat extends FileInputFormat Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); - this.open(split); - this.blockInfo = this.createAndReadBlockInfo(); + try { + this.open(split); + } finally { + this.blockInfo = this.createAndReadBlockInfo(); - long blockPos = state.f0; - this.readRecords = state.f1; + long blockPos = state.f0; + this.readRecords = state.f1; - this.stream.seek(this.splitStart + blockPos); - this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength); - this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput); + this.stream.seek(this.splitStart + blockPos); + this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength); + this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index 3a77200..4cd200d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -144,7 +144,7 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple private transient boolean end; - private transient long offset = -1; + private long offset = -1; // -------------------------------------------------------------------------------------------- // The configuration parameters. Configured on the instance and serialized to be shipped. @@ -638,9 +638,15 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple public void reopen(FileInputSplit split, Long state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); + Preconditions.checkArgument(state == -1 || state >= split.getStart(), + " Illegal offset "+ state +", smaller than the splits start=" + split.getStart()); + + try { + this.open(split); + } finally { + this.offset = state; + } - this.open(split); - this.offset = state; if (state > this.splitStart + split.getLength()) { this.end = true; } else if (state > split.getStart()) { http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index e26c534..9319338 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -188,6 +188,8 @@ public class ContinuousFileReaderOperator extends A private S restoredFormatState = null; + private volatile boolean isSplitOpen = false; + SplitReader(FileInputFormat format, TypeSerializer serializer, TimestampedCollector collector, @@ -271,6 +273,7 @@ public class ContinuousFileReaderOperator extends A } this.format.open(currentSplit); } + this.isSplitOpen = true; } LOG.info("Reading split: " + currentSplit); @@ -290,8 +293,11 @@ public class ContinuousFileReaderOperator extends A } finally { // close and prepare for the next iteration - this.format.close(); - this.currentSplit = null; + synchronized (checkpointLock) { + this.format.close(); + this.isSplitOpen = false; + this.currentSplit = null; + } } } @@ -303,8 +309,12 @@ public class ContinuousFileReaderOperator extends A } finally { synchronized (checkpointLock) { LOG.info("Reader terminated, and exiting..."); + this.format.closeInputFormat(); + this.isSplitOpen = false; + this.currentSplit = null; this.isRunning = false; + checkpointLock.notifyAll(); } } @@ -321,7 +331,7 @@ public class ContinuousFileReaderOperator extends A this.pendingSplits.remove(); } - if (this.format instanceof CheckpointableInputFormat) { + if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState(); return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState); } else {