Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3E8CD188B0 for ; Sun, 2 Aug 2015 15:14:30 +0000 (UTC) Received: (qmail 25069 invoked by uid 500); 2 Aug 2015 15:14:30 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 24984 invoked by uid 500); 2 Aug 2015 15:14:30 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 24951 invoked by uid 99); 2 Aug 2015 15:14:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Aug 2015 15:14:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D39EFE0522; Sun, 2 Aug 2015 15:14:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Sun, 02 Aug 2015 15:14:32 -0000 Message-Id: <0304ce6c17da4595963abccdfea499cf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/8] flink git commit: [FLINK-2438] [runtime] Improve channel event serialization performance. http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java deleted file mode 100644 index d94b5b4..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.event.task.TaskEvent; - -/** - * Checkpoint barriers are used to synchronize checkpoints throughout the streaming topology. The - * barriers are emitted by the sources when instructed to do so by the JobManager. When - * operators receive a {@link CheckpointBarrier} on one of its inputs it must block processing - * of further elements on this input until all inputs received the checkpoint barrier - * corresponding to to that checkpoint. Once all inputs received the checkpoint barrier for - * a checkpoint the operator is to perform the checkpoint and then broadcast the barrier to - * downstream operators. - * - *

- * The checkpoint barrier IDs are advancing. Once an operator receives a {@link CheckpointBarrier} - * for a checkpoint with a higher id it is to discard all barriers that it received from previous - * checkpoints and unblock all other inputs. - */ -public class CheckpointBarrier extends TaskEvent { - - protected long id; - protected long timestamp; - - public CheckpointBarrier() {} - - public CheckpointBarrier(long id, long timestamp) { - this.id = id; - this.timestamp = timestamp; - } - - public long getId() { - return id; - } - - public long getTimestamp() { - return id; - } - - // ------------------------------------------------------------------------ - - @Override - public void write(DataOutputView out) throws IOException { - out.writeLong(id); - out.writeLong(timestamp); - } - - @Override - public void read(DataInputView in) throws IOException { - id = in.readLong(); - timestamp = in.readLong(); - } - - // ------------------------------------------------------------------------ - - @Override - public int hashCode() { - return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32)); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof CheckpointBarrier)) { - return false; - } - else { - CheckpointBarrier that = (CheckpointBarrier) other; - return that.id == this.id && that.timestamp == this.timestamp; - } - } - - @Override - public String toString() { - return String.format("CheckpointBarrier %d @ %d", id, timestamp); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java index 84614bf..c8fa9e3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 2098da8..aabc95d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -32,6 +32,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 4007da8..173e894 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; @@ -31,7 +31,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 7350516..678b145 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -22,7 +22,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Random; -import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -31,7 +31,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index b8b3a8c..872e226 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -19,15 +19,16 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.junit.AfterClass; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 532078c..fb61633 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -19,12 +19,13 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java index ae384e1..05ce4be 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.junit.After; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java index fb3beea..f07e3a5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.types.LongValue; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java index 4a77757..286477a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.util.StringUtils; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 4399a10..296324a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamMap; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index a8029e6..4f07fdb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index df0c9ee..bbc64f1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -18,14 +18,13 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 435831f..0f372cb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memorymanager.MemoryManager; http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 3c7204d..f87d7ea 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.RichCoMapFunction; @@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.TestHarnessUtil; -import org.joda.time.Instant; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,7 +36,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.util.List; -import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java index f37eb66..2b20101 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java @@ -45,7 +45,7 @@ import java.util.List; * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task * thread to finish. Use {@link #processElement} * to send elements to the task. Use - * {@link #processEvent(org.apache.flink.runtime.event.task.AbstractEvent)} to send events to the task. + * {@link #processEvent(org.apache.flink.runtime.event.AbstractEvent)} to send events to the task. * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task * that data entry is finished. * http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index a34ec15..8d41292 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; @@ -74,7 +75,7 @@ public class StreamingScalabilityAndLatency { env.getConfig().enableObjectReuse(); env.setBufferTimeout(5L); -// env.enableCheckpointing(1000); + env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); env .addSource(new TimeStampingSource())