flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
Date Tue, 02 May 2017 15:54:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993146#comment-15993146
] 

ASF GitHub Bot commented on FLINK-5969:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3778#discussion_r114353483
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.fs.bucketing;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.OperatorStateStore;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.connectors.fs.StringWriter;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OperatorSnapshotUtil;
    +import org.apache.hadoop.fs.Path;
    +import org.junit.Assert;
    +import org.junit.ClassRule;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +/**
    + * Tests for checking whether {@link BucketingSink} can restore from snapshots that were
done
    + * using the Flink 1.2 {@link BucketingSink}.
    + *
    + * <p>For regenerating the binary snapshot file you have to run the {@code write*()}
method on
    + * the Flink 1.2 branch.
    + */
    +
    +public class BucketingSinkFrom12MigrationTest {
    +
    +	@ClassRule
    +	public static TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	private static final String PART_PREFIX = "part";
    +	private static final String PENDING_SUFFIX = ".pending";
    +	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
    +	private static final String VALID_LENGTH_SUFFIX = ".valid";
    +
    +	/**
    +	 * Manually run this to write binary snapshot data. Remove @Ignore to run.
    +	 */
    +	@Ignore
    +	@Test
    +	public void writeSnapshot() throws Exception {
    +
    +		final File outDir = tempFolder.newFolder();
    +
    +		BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath())
    +			.setWriter(new StringWriter<String>())
    +			.setBatchSize(5)
    +			.setPartPrefix(PART_PREFIX)
    +			.setInProgressPrefix("")
    +			.setPendingPrefix("")
    +			.setValidLengthPrefix("")
    +			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
    +			.setPendingSuffix(PENDING_SUFFIX)
    +			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("test1", 0L));
    +		testHarness.processElement(new StreamRecord<>("test2", 0L));
    +
    +		checkFs(outDir, 1, 1, 0, 0);
    +
    +		testHarness.processElement(new StreamRecord<>("test3", 0L));
    +		testHarness.processElement(new StreamRecord<>("test4", 0L));
    +		testHarness.processElement(new StreamRecord<>("test5", 0L));
    +
    +		checkFs(outDir, 1, 4, 0, 0);
    +
    +		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
    +
    +		OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot");
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testRestore() throws Exception {
    +		final File outDir = tempFolder.newFolder();
    +
    +		ValidatingBucketingSink<String> sink = (ValidatingBucketingSink<String>)
new ValidatingBucketingSink<String>(outDir.getAbsolutePath())
    +			.setWriter(new StringWriter<String>())
    +			.setBatchSize(5)
    +			.setPartPrefix(PART_PREFIX)
    +			.setInProgressPrefix("")
    +			.setPendingPrefix("")
    +			.setValidLengthPrefix("")
    +			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
    +			.setPendingSuffix(PENDING_SUFFIX)
    +			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
    +			new StreamSink<>(sink), 10, 1, 0);
    +		testHarness.setup();
    +		testHarness.initializeState(
    +				OperatorSnapshotUtil.readStateHandle(
    +						OperatorSnapshotUtil.getResourceFilename("bucketing-sink-migration-test-flink1.2-snapshot")));
    +		testHarness.open();
    +
    +		assertTrue(sink.initializeCalled);
    +
    +		testHarness.processElement(new StreamRecord<>("test1", 0L));
    +		testHarness.processElement(new StreamRecord<>("test2", 0L));
    +
    +		checkFs(outDir, 1, 1, 0, 0);
    +
    +		testHarness.close();
    +	}
    +
    +	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid)
throws IOException {
    +		int inProg = 0;
    +		int pend = 0;
    +		int compl = 0;
    +		int val = 0;
    +
    +		for (File file: FileUtils.listFiles(outDir, null, true)) {
    +			if (file.getAbsolutePath().endsWith("crc")) {
    +				continue;
    +			}
    +			String path = file.getPath();
    +			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
    +				inProg++;
    +			} else if (path.endsWith(PENDING_SUFFIX)) {
    +				pend++;
    +			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
    +				val++;
    +			} else if (path.contains(PART_PREFIX)) {
    +				compl++;
    +			}
    +		}
    +
    +		Assert.assertEquals(inprogress, inProg);
    +		Assert.assertEquals(pending, pend);
    +		Assert.assertEquals(completed, compl);
    +		Assert.assertEquals(valid, val);
    +	}
    +
    +	static class ValidatingBucketingSink<T> extends BucketingSink<T> {
    +
    +		private static final long serialVersionUID = -4263974081712009141L;
    +
    +		public boolean initializeCalled = false;
    +
    +		ValidatingBucketingSink(String basePath) {
    +			super(basePath);
    +		}
    +
    +		/**
    +		 * The actual paths in this depend on the binary checkpoint so it you update this the
paths
    +		 * here have to be updated as well.
    +		 */
    +		@Override
    +		public void initializeState(FunctionInitializationContext context) throws Exception
{
    +			OperatorStateStore stateStore = context.getOperatorStateStore();
    +
    +			ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states");
    +
    +			int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
    --- End diff --
    
    Fixing


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> -----------------------------------------------------------
>
>                 Key: FLINK-5969
>                 URL: https://issues.apache.org/jira/browse/FLINK-5969
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 1.3.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we added these
tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message