Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 536D7200C79 for ; Fri, 5 May 2017 03:14:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 51C45160BC4; Fri, 5 May 2017 01:14:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F40BC160BB0 for ; Fri, 5 May 2017 03:14:39 +0200 (CEST) Received: (qmail 19561 invoked by uid 500); 5 May 2017 01:14:39 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 19548 invoked by uid 99); 5 May 2017 01:14:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 May 2017 01:14:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3982ADFB91; Fri, 5 May 2017 01:14:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Message-Id: <7a53f7d15294492e88203ef1e1f5087d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: beam git commit: Revert "This closes #2905" Date: Fri, 5 May 2017 01:14:38 +0000 (UTC) archived-at: Fri, 05 May 2017 01:14:42 -0000 Repository: beam Updated Branches: refs/heads/master 9f27c33ec -> b130d7aac Revert "This closes #2905" I misread Jenkins and should not have merged. I am sorry. This reverts commit 9f27c33ec7e7c61afbca0395f932275b354eb428, reversing changes made to 5fc3d335919207c23bc6fd2047e9e38351754ff1. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b130d7aa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b130d7aa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b130d7aa Branch: refs/heads/master Commit: b130d7aac466860ed5a7abec0bef33d9e0dd3c6d Parents: 9f27c33 Author: Dan Halperin Authored: Thu May 4 18:14:13 2017 -0700 Committer: Dan Halperin Committed: Thu May 4 18:14:13 2017 -0700 ---------------------------------------------------------------------- examples/java/pom.xml | 12 - .../beam/examples/WindowedWordCountIT.java | 8 +- .../org/apache/beam/examples/WordCountIT.java | 2 +- .../examples/testing/ExplicitShardedFile.java | 126 ----------- .../examples/testing/FileChecksumMatcher.java | 168 -------------- .../testing/FileChecksumMatcherTest.java | 148 ------------ .../examples/testing/NumberedShardedFile.java | 226 ------------------- .../testing/NumberedShardedFileTest.java | 182 --------------- .../beam/examples/testing/ShardedFile.java | 42 ---- .../beam/sdk/coders/StructuralByteArray.java | 5 +- .../beam/sdk/testing/FileChecksumMatcher.java | 169 ++++++++++++++ .../beam/sdk/testing/MatcherDeserializer.java | 4 +- .../beam/sdk/testing/MatcherSerializer.java | 4 +- .../org/apache/beam/sdk/util/CoderUtils.java | 8 +- .../beam/sdk/util/ExplicitShardedFile.java | 125 ++++++++++ .../beam/sdk/util/NumberedShardedFile.java | 225 ++++++++++++++++++ .../org/apache/beam/sdk/util/ShardedFile.java | 42 ++++ .../sdk/testing/FileChecksumMatcherTest.java | 147 ++++++++++++ .../beam/sdk/util/NumberedShardedFileTest.java | 182 +++++++++++++++ 19 files changed, 905 insertions(+), 920 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 09473cd..d673da2 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -492,11 +492,6 @@ - com.google.code.findbugs - jsr305 - - - org.apache.avro avro @@ -559,13 +554,6 @@ --> org.apache.beam - beam-sdks-java-core - tests - test - - - - org.apache.beam beam-runners-direct-java test http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 01bc402..b5eddb5 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -33,21 +33,21 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles; -import org.apache.beam.examples.testing.ExplicitShardedFile; -import org.apache.beam.examples.testing.FileChecksumMatcher; -import org.apache.beam.examples.testing.NumberedShardedFile; -import org.apache.beam.examples.testing.ShardedFile; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.FileChecksumMatcher; import org.apache.beam.sdk.testing.SerializableMatcher; import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.util.ExplicitShardedFile; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.NumberedShardedFile; +import org.apache.beam.sdk.util.ShardedFile; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java index 236ca9c..1660b61 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java @@ -20,10 +20,10 @@ package org.apache.beam.examples; import java.util.Date; import org.apache.beam.examples.WordCount.WordCountOptions; -import org.apache.beam.examples.testing.FileChecksumMatcher; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.FileChecksumMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/ExplicitShardedFile.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/ExplicitShardedFile.java b/examples/java/src/test/java/org/apache/beam/examples/testing/ExplicitShardedFile.java deleted file mode 100644 index 1dc7a62..0000000 --- a/examples/java/src/test/java/org/apache/beam/examples/testing/ExplicitShardedFile.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.examples.testing; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import com.google.common.io.CharStreams; -import java.io.IOException; -import java.io.Reader; -import java.nio.channels.Channels; -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.util.FluentBackoff; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** A sharded file where the file names are simply provided. */ -public class ExplicitShardedFile implements ShardedFile { - - private static final Logger LOG = LoggerFactory.getLogger(ExplicitShardedFile.class); - - private static final int MAX_READ_RETRIES = 4; - private static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); - static final FluentBackoff BACK_OFF_FACTORY = - FluentBackoff.DEFAULT - .withInitialBackoff(DEFAULT_SLEEP_DURATION) - .withMaxRetries(MAX_READ_RETRIES); - - private final List files; - - /** Constructs an {@link ExplicitShardedFile} for the given files. */ - public ExplicitShardedFile(Collection files) throws IOException { - this.files = new LinkedList<>(); - for (String file: files) { - this.files.add(FileSystems.matchSingleFileSpec(file)); - } - } - - @Override - public List readFilesWithRetries(Sleeper sleeper, BackOff backOff) - throws IOException, InterruptedException { - if (files.isEmpty()) { - return Collections.emptyList(); - } - - IOException lastException = null; - - do { - try { - // Read data from file paths - return readLines(files); - } catch (IOException e) { - // Ignore and retry - lastException = e; - LOG.warn("Error in file reading. Ignore and retry."); - } - } while (BackOffUtils.next(sleeper, backOff)); - // Failed after max retries - throw new IOException( - String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), - lastException); - } - - /** - * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. - * - *

Because of eventual consistency, reads may discover no files or fewer files than the shard - * template implies. In this case, the read is considered to have failed. - */ - public List readFilesWithRetries() throws IOException, InterruptedException { - return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); - } - - @Override - public String toString() { - return String.format("explicit sharded file (%s)", Joiner.on(", ").join(files)); - } - - /** - * Reads all the lines of all the files. - * - *

Not suitable for use except in testing of small data, since the data size may be far more - * than can be reasonably processed serially, in-memory, by a single thread. - */ - @VisibleForTesting - List readLines(Collection files) throws IOException { - List allLines = Lists.newArrayList(); - int i = 1; - for (Metadata file : files) { - try (Reader reader = Channels.newReader(FileSystems.open(file.resourceId()), - StandardCharsets.UTF_8.name())) { - List lines = CharStreams.readLines(reader); - allLines.addAll(lines); - LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); - } - i++; - } - return allLines; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcher.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcher.java b/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcher.java deleted file mode 100644 index 8a0af11..0000000 --- a/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcher.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.examples.testing; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.client.util.Sleeper; -import com.google.common.base.Strings; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Pattern; -import javax.annotation.Nonnull; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.testing.SerializableMatcher; -import org.apache.beam.sdk.util.FluentBackoff; -import org.hamcrest.Description; -import org.hamcrest.TypeSafeMatcher; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Matcher to verify file checksum in E2E test. - * - *

For example: - *

{@code
- *   assertThat(job, new FileChecksumMatcher(checksumString, filePath));
- * }
- * or - *
{@code
- *   assertThat(job, new FileChecksumMatcher(checksumString, filePath, shardTemplate));
- * }
- * - *

Checksum of outputs is generated based on SHA-1 algorithm. If output file is empty, - * SHA-1 hash of empty string (da39a3ee5e6b4b0d3255bfef95601890afd80709) is used as expected. - */ -public class FileChecksumMatcher extends TypeSafeMatcher - implements SerializableMatcher { - - private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class); - - static final int MAX_READ_RETRIES = 4; - static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); - static final FluentBackoff BACK_OFF_FACTORY = - FluentBackoff.DEFAULT - .withInitialBackoff(DEFAULT_SLEEP_DURATION) - .withMaxRetries(MAX_READ_RETRIES); - - private static final Pattern DEFAULT_SHARD_TEMPLATE = - Pattern.compile("(?x) \\S* (? \\d+) -of- (? \\d+)"); - - private final String expectedChecksum; - private String actualChecksum; - private final ShardedFile shardedFile; - - /** - * Constructor that uses default shard template. - * - * @param checksum expected checksum string used to verify file content. - * @param filePath path of files that's to be verified. - */ - public FileChecksumMatcher(String checksum, String filePath) { - this(checksum, filePath, DEFAULT_SHARD_TEMPLATE); - } - - /** - * Constructor using a custom shard template. - * - * @param checksum expected checksum string used to verify file content. - * @param filePath path of files that's to be verified. - * @param shardTemplate template of shard name to parse out the total number of shards - * which is used in I/O retry to avoid inconsistency of filesystem. - * Customized template should assign name "numshards" to capturing - * group - total shard number. - */ - public FileChecksumMatcher(String checksum, String filePath, Pattern shardTemplate) { - checkArgument( - !Strings.isNullOrEmpty(checksum), - "Expected valid checksum, but received %s", checksum); - checkArgument( - !Strings.isNullOrEmpty(filePath), - "Expected valid file path, but received %s", filePath); - checkNotNull( - shardTemplate, - "Expected non-null shard pattern. " - + "Please call the other constructor to use default pattern: %s", - DEFAULT_SHARD_TEMPLATE); - - this.expectedChecksum = checksum; - this.shardedFile = new NumberedShardedFile(filePath, shardTemplate); - } - - /** - * Constructor using an entirely custom {@link ShardedFile} implementation. - * - *

For internal use only. - */ - public FileChecksumMatcher(String expectedChecksum, ShardedFile shardedFile) { - this.expectedChecksum = expectedChecksum; - this.shardedFile = shardedFile; - } - - @Override - public boolean matchesSafely(PipelineResult pipelineResult) { - // Load output data - List outputs; - try { - outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); - } catch (Exception e) { - throw new RuntimeException( - String.format("Failed to read from: %s", shardedFile), e); - } - - // Verify outputs. Checksum is computed using SHA-1 algorithm - actualChecksum = computeHash(outputs); - LOG.debug("Generated checksum: {}", actualChecksum); - - return actualChecksum.equals(expectedChecksum); - } - - private String computeHash(@Nonnull List strs) { - if (strs.isEmpty()) { - return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); - } - - List hashCodes = new ArrayList<>(); - for (String str : strs) { - hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); - } - return Hashing.combineUnordered(hashCodes).toString(); - } - - @Override - public void describeTo(Description description) { - description - .appendText("Expected checksum is (") - .appendText(expectedChecksum) - .appendText(")"); - } - - @Override - public void describeMismatchSafely(PipelineResult pResult, Description description) { - description - .appendText("was (") - .appendText(actualChecksum) - .appendText(")"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcherTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcherTest.java b/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcherTest.java deleted file mode 100644 index 4d6eb6b..0000000 --- a/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcherTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.testing; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; - -import com.google.api.client.util.BackOff; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.regex.Pattern; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; - -/** Tests for {@link FileChecksumMatcher}. */ -@RunWith(JUnit4.class) -public class FileChecksumMatcherTest { - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule - public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); - - @Mock - private PipelineResult pResult = Mockito.mock(PipelineResult.class); - - private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff(); - - @Test - public void testPreconditionChecksumIsNull() throws IOException { - String tmpPath = tmpFolder.newFile().getPath(); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected valid checksum, but received")); - new FileChecksumMatcher(null, tmpPath); - } - - @Test - public void testPreconditionChecksumIsEmpty() throws IOException { - String tmpPath = tmpFolder.newFile().getPath(); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected valid checksum, but received")); - new FileChecksumMatcher("", tmpPath); - } - - @Test - public void testPreconditionFilePathIsEmpty() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected valid file path, but received")); - new FileChecksumMatcher("checksumString", ""); - } - - @Test - public void testPreconditionShardTemplateIsNull() throws IOException { - String tmpPath = tmpFolder.newFile().getPath(); - - thrown.expect(NullPointerException.class); - thrown.expectMessage( - containsString( - "Expected non-null shard pattern. " - + "Please call the other constructor to use default pattern:")); - new FileChecksumMatcher("checksumString", tmpPath, null); - } - - @Test - public void testMatcherThatVerifiesSingleFile() throws IOException{ - File tmpFile = tmpFolder.newFile("result-000-of-001"); - Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - FileChecksumMatcher matcher = - new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath()); - - assertThat(pResult, matcher); - } - - @Test - public void testMatcherThatVerifiesMultipleFiles() throws IOException { - File tmpFile1 = tmpFolder.newFile("result-000-of-002"); - File tmpFile2 = tmpFolder.newFile("result-001-of-002"); - File tmpFile3 = tmpFolder.newFile("tmp"); - Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); - Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); - Files.write("tmp", tmpFile3, StandardCharsets.UTF_8); - - FileChecksumMatcher matcher = - new FileChecksumMatcher( - "90552392c28396935fe4f123bd0b5c2d0f6260c8", - tmpFolder.getRoot().toPath().resolve("result-*").toString()); - - assertThat(pResult, matcher); - } - - @Test - public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException { - File emptyFile = tmpFolder.newFile("result-000-of-001"); - Files.write("", emptyFile, StandardCharsets.UTF_8); - FileChecksumMatcher matcher = - new FileChecksumMatcher( - "da39a3ee5e6b4b0d3255bfef95601890afd80709", - tmpFolder.getRoot().toPath().resolve("*").toString()); - - assertThat(pResult, matcher); - } - - @Test - public void testMatcherThatUsesCustomizedTemplate() throws Exception { - // Customized template: resultSSS-totalNNN - File tmpFile1 = tmpFolder.newFile("result0-total2"); - File tmpFile2 = tmpFolder.newFile("result1-total2"); - Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); - Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); - - Pattern customizedTemplate = - Pattern.compile("(?x) result (?\\d+) - total (?\\d+)"); - FileChecksumMatcher matcher = new FileChecksumMatcher( - "90552392c28396935fe4f123bd0b5c2d0f6260c8", - tmpFolder.getRoot().toPath().resolve("*").toString(), - customizedTemplate); - - assertThat(pResult, matcher); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFile.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFile.java b/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFile.java deleted file mode 100644 index f0b9c2d..0000000 --- a/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFile.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.examples.testing; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; -import com.google.common.io.CharStreams; -import java.io.IOException; -import java.io.Reader; -import java.nio.channels.Channels; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nonnull; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.util.FluentBackoff; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utility methods for working with sharded files. For internal use only; many parameters - * are just hardcoded to allow existing uses to work OK. - */ -public class NumberedShardedFile implements ShardedFile { - - private static final Logger LOG = LoggerFactory.getLogger(NumberedShardedFile.class); - - static final int MAX_READ_RETRIES = 4; - static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); - static final FluentBackoff BACK_OFF_FACTORY = - FluentBackoff.DEFAULT - .withInitialBackoff(DEFAULT_SLEEP_DURATION) - .withMaxRetries(MAX_READ_RETRIES); - - private static final Pattern DEFAULT_SHARD_TEMPLATE = - Pattern.compile("(?x) \\S* (? \\d+) -of- (? \\d+)"); - - private final String filePattern; - private final Pattern shardTemplate; - - /** - * Constructor that uses default shard template. - * - * @param filePattern path or glob of files to include - */ - public NumberedShardedFile(String filePattern) { - this(filePattern, DEFAULT_SHARD_TEMPLATE); - } - - /** - * Constructor. - * - * @param filePattern path or glob of files to include - * @param shardTemplate template of shard name to parse out the total number of shards - * which is used in I/O retry to avoid inconsistency of filesystem. - * Customized template should assign name "numshards" to capturing - * group - total shard number. - */ - public NumberedShardedFile(String filePattern, Pattern shardTemplate) { - checkArgument( - !Strings.isNullOrEmpty(filePattern), - "Expected valid file path, but received %s", filePattern); - checkNotNull( - shardTemplate, - "Expected non-null shard pattern. " - + "Please call the other constructor to use default pattern: %s", - DEFAULT_SHARD_TEMPLATE); - - this.filePattern = filePattern; - this.shardTemplate = shardTemplate; - } - - public String getFilePattern() { - return filePattern; - } - - /** - * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. - * - *

Because of eventual consistency, reads may discover no files or fewer files than - * the shard template implies. In this case, the read is considered to have failed. - */ - @Override - public List readFilesWithRetries(Sleeper sleeper, BackOff backOff) - throws IOException, InterruptedException { - IOException lastException = null; - - do { - try { - // Match inputPath which may contains glob - Collection files = Iterables.getOnlyElement( - FileSystems.match(Collections.singletonList(filePattern))).metadata(); - - LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern); - - if (files.isEmpty() || !checkTotalNumOfFiles(files)) { - continue; - } - - // Read data from file paths - return readLines(files); - } catch (IOException e) { - // Ignore and retry - lastException = e; - LOG.warn("Error in file reading. Ignore and retry."); - } - } while(BackOffUtils.next(sleeper, backOff)); - // Failed after max retries - throw new IOException( - String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), - lastException); - } - - /** - * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. - * - *

Because of eventual consistency, reads may discover no files or fewer files than - * the shard template implies. In this case, the read is considered to have failed. - */ - public List readFilesWithRetries() - throws IOException, InterruptedException { - return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); - } - - @Override - public String toString() { - return String.format("%s with shard template '%s'", filePattern, shardTemplate); - } - - /** - * Reads all the lines of all the files. - * - *

Not suitable for use except in testing of small data, since the data size may be far more - * than can be reasonably processed serially, in-memory, by a single thread. - */ - @VisibleForTesting - List readLines(Collection files) throws IOException { - List allLines = Lists.newArrayList(); - int i = 1; - for (Metadata file : files) { - try (Reader reader = - Channels.newReader(FileSystems.open(file.resourceId()), - StandardCharsets.UTF_8.name())) { - List lines = CharStreams.readLines(reader); - allLines.addAll(lines); - LOG.debug( - "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); - } - i++; - } - return allLines; - } - - /** - * Check if total number of files is correct by comparing with the number that - * is parsed from shard name using a name template. If no template is specified, - * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total - * number of files. - * - * @return {@code true} if at least one shard name matches template and total number - * of given files equals the number that is parsed from shard name. - */ - @VisibleForTesting - boolean checkTotalNumOfFiles(Collection files) { - for (Metadata fileMedadata : files) { - String fileName = fileMedadata.resourceId().getFilename(); - - if (fileName == null) { - // this path has zero elements - continue; - } - Matcher matcher = shardTemplate.matcher(fileName); - if (!matcher.matches()) { - // shard name doesn't match the pattern, check with the next shard - continue; - } - // once match, extract total number of shards and compare to file list - return files.size() == Integer.parseInt(matcher.group("numshards")); - } - return false; - } - - private String computeHash(@Nonnull List strs) { - if (strs.isEmpty()) { - return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); - } - - List hashCodes = new ArrayList<>(); - for (String str : strs) { - hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); - } - return Hashing.combineUnordered(hashCodes).toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFileTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFileTest.java b/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFileTest.java deleted file mode 100644 index 83b8a4f..0000000 --- a/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFileTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.testing; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; -import static org.mockito.Matchers.anyCollection; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; - -import com.google.api.client.util.BackOff; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.regex.Pattern; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.LocalResources; -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; - -/** Tests for {@link NumberedShardedFile}. */ -@RunWith(JUnit4.class) -public class NumberedShardedFileTest { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException thrown = ExpectedException.none(); - @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); - - @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); - - private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff(); - private String filePattern; - - @Before - public void setup() throws IOException { - filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve( - "*", StandardResolveOptions.RESOLVE_FILE).toString(); - } - - @Test - public void testPreconditionFilePathIsNull() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected valid file path, but received")); - new NumberedShardedFile(null); - } - - @Test - public void testPreconditionFilePathIsEmpty() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected valid file path, but received")); - new NumberedShardedFile(""); - } - - @Test - public void testReadMultipleShards() throws Exception { - String - contents1 = "To be or not to be, ", - contents2 = "it is not a question.", - contents3 = "should not be included"; - - File tmpFile1 = tmpFolder.newFile("result-000-of-002"); - File tmpFile2 = tmpFolder.newFile("result-001-of-002"); - File tmpFile3 = tmpFolder.newFile("tmp"); - Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); - Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); - Files.write(contents3, tmpFile3, StandardCharsets.UTF_8); - - filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve( - "result-*", StandardResolveOptions.RESOLVE_FILE).toString(); - NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); - - assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); - } - - @Test - public void testReadEmpty() throws Exception { - File emptyFile = tmpFolder.newFile("result-000-of-001"); - Files.write("", emptyFile, StandardCharsets.UTF_8); - NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); - - assertThat(shardedFile.readFilesWithRetries(), empty()); - } - - @Test - public void testReadCustomTemplate() throws Exception { - String contents1 = "To be or not to be, ", contents2 = "it is not a question."; - - // Customized template: resultSSS-totalNNN - File tmpFile1 = tmpFolder.newFile("result0-total2"); - File tmpFile2 = tmpFolder.newFile("result1-total2"); - Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); - Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); - - Pattern customizedTemplate = - Pattern.compile("(?x) result (?\\d+) - total (?\\d+)"); - NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, customizedTemplate); - - assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); - } - - @Test - public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { - File tmpFile = tmpFolder.newFile(); - Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - - NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, - Pattern.compile("incorrect-template")); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); - shardedFile.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { - File tmpFile = tmpFolder.newFile(); - Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - NumberedShardedFile shardedFile = spy(new NumberedShardedFile(filePattern)); - doThrow(IOException.class) - .when(shardedFile) - .readLines(anyCollection()); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); - shardedFile.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { - NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); - shardedFile.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception { - tmpFolder.newFile("result-000-of-001"); - tmpFolder.newFile("tmp-result-000-of-001"); - - NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); - shardedFile.readFilesWithRetries(fastClock, backOff); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/ShardedFile.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/ShardedFile.java b/examples/java/src/test/java/org/apache/beam/examples/testing/ShardedFile.java deleted file mode 100644 index cd9537c..0000000 --- a/examples/java/src/test/java/org/apache/beam/examples/testing/ShardedFile.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.examples.testing; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.Sleeper; -import java.io.IOException; -import java.io.Serializable; -import java.util.List; -import org.apache.beam.sdk.testing.SerializableMatcher; - -/** - * Bare-bones class for using sharded files. - * - *

For internal use only; used only in SDK tests. Must be {@link Serializable} so it can be - * shipped as a {@link SerializableMatcher}. - */ -public interface ShardedFile extends Serializable { - - /** - * Reads the lines from all shards of this file using the provided {@link Sleeper} and {@link - * BackOff}. - */ - List readFilesWithRetries(Sleeper sleeper, BackOff backOff) - throws IOException, InterruptedException; -} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java index 0ab0dea..226f79c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java @@ -17,7 +17,8 @@ */ package org.apache.beam.sdk.coders; -import com.google.common.io.BaseEncoding; +import static com.google.api.client.util.Base64.encodeBase64String; + import java.util.Arrays; /** @@ -52,6 +53,6 @@ public class StructuralByteArray { @Override public String toString() { - return "base64:" + BaseEncoding.base64().encode(value); + return "base64:" + encodeBase64String(value); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java new file mode 100644 index 0000000..82a6b71 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.util.Sleeper; +import com.google.common.base.Strings; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.NumberedShardedFile; +import org.apache.beam.sdk.util.ShardedFile; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Matcher to verify file checksum in E2E test. + * + *

For example: + *

{@code
+ *   assertThat(job, new FileChecksumMatcher(checksumString, filePath));
+ * }
+ * or + *
{@code
+ *   assertThat(job, new FileChecksumMatcher(checksumString, filePath, shardTemplate));
+ * }
+ * + *

Checksum of outputs is generated based on SHA-1 algorithm. If output file is empty, + * SHA-1 hash of empty string (da39a3ee5e6b4b0d3255bfef95601890afd80709) is used as expected. + */ +public class FileChecksumMatcher extends TypeSafeMatcher + implements SerializableMatcher { + + private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class); + + static final int MAX_READ_RETRIES = 4; + static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private static final Pattern DEFAULT_SHARD_TEMPLATE = + Pattern.compile("(?x) \\S* (? \\d+) -of- (? \\d+)"); + + private final String expectedChecksum; + private String actualChecksum; + private final ShardedFile shardedFile; + + /** + * Constructor that uses default shard template. + * + * @param checksum expected checksum string used to verify file content. + * @param filePath path of files that's to be verified. + */ + public FileChecksumMatcher(String checksum, String filePath) { + this(checksum, filePath, DEFAULT_SHARD_TEMPLATE); + } + + /** + * Constructor using a custom shard template. + * + * @param checksum expected checksum string used to verify file content. + * @param filePath path of files that's to be verified. + * @param shardTemplate template of shard name to parse out the total number of shards + * which is used in I/O retry to avoid inconsistency of filesystem. + * Customized template should assign name "numshards" to capturing + * group - total shard number. + */ + public FileChecksumMatcher(String checksum, String filePath, Pattern shardTemplate) { + checkArgument( + !Strings.isNullOrEmpty(checksum), + "Expected valid checksum, but received %s", checksum); + checkArgument( + !Strings.isNullOrEmpty(filePath), + "Expected valid file path, but received %s", filePath); + checkNotNull( + shardTemplate, + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern: %s", + DEFAULT_SHARD_TEMPLATE); + + this.expectedChecksum = checksum; + this.shardedFile = new NumberedShardedFile(filePath, shardTemplate); + } + + /** + * Constructor using an entirely custom {@link ShardedFile} implementation. + * + *

For internal use only. + */ + public FileChecksumMatcher(String expectedChecksum, ShardedFile shardedFile) { + this.expectedChecksum = expectedChecksum; + this.shardedFile = shardedFile; + } + + @Override + public boolean matchesSafely(PipelineResult pipelineResult) { + // Load output data + List outputs; + try { + outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to read from: %s", shardedFile), e); + } + + // Verify outputs. Checksum is computed using SHA-1 algorithm + actualChecksum = computeHash(outputs); + LOG.debug("Generated checksum: {}", actualChecksum); + + return actualChecksum.equals(expectedChecksum); + } + + private String computeHash(@Nonnull List strs) { + if (strs.isEmpty()) { + return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); + } + + List hashCodes = new ArrayList<>(); + for (String str : strs) { + hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); + } + return Hashing.combineUnordered(hashCodes).toString(); + } + + @Override + public void describeTo(Description description) { + description + .appendText("Expected checksum is (") + .appendText(expectedChecksum) + .appendText(")"); + } + + @Override + public void describeMismatchSafely(PipelineResult pResult, Description description) { + description + .appendText("was (") + .appendText(actualChecksum) + .appendText(")"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java index e7aa5a7..6ca07ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.io.BaseEncoding; +import com.google.api.client.util.Base64; import java.io.IOException; import org.apache.beam.sdk.util.SerializableUtils; @@ -36,7 +36,7 @@ class MatcherDeserializer extends JsonDeserializer> { throws IOException, JsonProcessingException { ObjectNode node = jsonParser.readValueAsTree(); String matcher = node.get("matcher").asText(); - byte[] in = BaseEncoding.base64().decode(matcher); + byte[] in = Base64.decodeBase64(matcher); return (SerializableMatcher) SerializableUtils .deserializeFromByteArray(in, "SerializableMatcher"); } http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java index 35375f6..2b4584c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; -import com.google.common.io.BaseEncoding; +import com.google.api.client.util.Base64; import java.io.IOException; import org.apache.beam.sdk.util.SerializableUtils; @@ -33,7 +33,7 @@ class MatcherSerializer extends JsonSerializer> { public void serialize(SerializableMatcher matcher, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException { byte[] out = SerializableUtils.serializeToByteArray(matcher); - String encodedString = BaseEncoding.base64().encode(out); + String encodedString = Base64.encodeBase64String(out); jsonGenerator.writeStartObject(); jsonGenerator.writeStringField("matcher", encodedString); jsonGenerator.writeEndObject(); http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java index e3ae485..3380a10 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.util; +import com.google.api.client.util.Base64; import com.google.common.base.Throwables; -import com.google.common.io.BaseEncoding; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -163,7 +163,7 @@ public final class CoderUtils { public static String encodeToBase64(Coder coder, T value) throws CoderException { byte[] rawValue = encodeToByteArray(coder, value); - return BaseEncoding.base64Url().omitPadding().encode(rawValue); + return Base64.encodeBase64URLSafeString(rawValue); } /** @@ -171,9 +171,7 @@ public final class CoderUtils { */ public static T decodeFromBase64(Coder coder, String encodedValue) throws CoderException { return decodeFromSafeStream( - coder, - new ByteArrayInputStream(BaseEncoding.base64Url().omitPadding().decode(encodedValue)), - Coder.Context.OUTER); + coder, new ByteArrayInputStream(Base64.decodeBase64(encodedValue)), Coder.Context.OUTER); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java new file mode 100644 index 0000000..0f184de --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A sharded file where the file names are simply provided. */ +public class ExplicitShardedFile implements ShardedFile { + + private static final Logger LOG = LoggerFactory.getLogger(ExplicitShardedFile.class); + + private static final int MAX_READ_RETRIES = 4; + private static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private final List files; + + /** Constructs an {@link ExplicitShardedFile} for the given files. */ + public ExplicitShardedFile(Collection files) throws IOException { + this.files = new LinkedList<>(); + for (String file: files) { + this.files.add(FileSystems.matchSingleFileSpec(file)); + } + } + + @Override + public List readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + if (files.isEmpty()) { + return Collections.emptyList(); + } + + IOException lastException = null; + + do { + try { + // Read data from file paths + return readLines(files); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while (BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + *

Because of eventual consistency, reads may discover no files or fewer files than the shard + * template implies. In this case, the read is considered to have failed. + */ + public List readFilesWithRetries() throws IOException, InterruptedException { + return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } + + @Override + public String toString() { + return String.format("explicit sharded file (%s)", Joiner.on(", ").join(files)); + } + + /** + * Reads all the lines of all the files. + * + *

Not suitable for use except in testing of small data, since the data size may be far more + * than can be reasonably processed serially, in-memory, by a single thread. + */ + @VisibleForTesting + List readLines(Collection files) throws IOException { + List allLines = Lists.newArrayList(); + int i = 1; + for (Metadata file : files) { + try (Reader reader = Channels.newReader(FileSystems.open(file.resourceId()), + StandardCharsets.UTF_8.name())) { + List lines = CharStreams.readLines(reader); + allLines.addAll(lines); + LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); + } + i++; + } + return allLines; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java new file mode 100644 index 0000000..e18dd96 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility methods for working with sharded files. For internal use only; many parameters + * are just hardcoded to allow existing uses to work OK. + */ +public class NumberedShardedFile implements ShardedFile { + + private static final Logger LOG = LoggerFactory.getLogger(NumberedShardedFile.class); + + static final int MAX_READ_RETRIES = 4; + static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private static final Pattern DEFAULT_SHARD_TEMPLATE = + Pattern.compile("(?x) \\S* (? \\d+) -of- (? \\d+)"); + + private final String filePattern; + private final Pattern shardTemplate; + + /** + * Constructor that uses default shard template. + * + * @param filePattern path or glob of files to include + */ + public NumberedShardedFile(String filePattern) { + this(filePattern, DEFAULT_SHARD_TEMPLATE); + } + + /** + * Constructor. + * + * @param filePattern path or glob of files to include + * @param shardTemplate template of shard name to parse out the total number of shards + * which is used in I/O retry to avoid inconsistency of filesystem. + * Customized template should assign name "numshards" to capturing + * group - total shard number. + */ + public NumberedShardedFile(String filePattern, Pattern shardTemplate) { + checkArgument( + !Strings.isNullOrEmpty(filePattern), + "Expected valid file path, but received %s", filePattern); + checkNotNull( + shardTemplate, + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern: %s", + DEFAULT_SHARD_TEMPLATE); + + this.filePattern = filePattern; + this.shardTemplate = shardTemplate; + } + + public String getFilePattern() { + return filePattern; + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + *

Because of eventual consistency, reads may discover no files or fewer files than + * the shard template implies. In this case, the read is considered to have failed. + */ + @Override + public List readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOException lastException = null; + + do { + try { + // Match inputPath which may contains glob + Collection files = Iterables.getOnlyElement( + FileSystems.match(Collections.singletonList(filePattern))).metadata(); + + LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern); + + if (files.isEmpty() || !checkTotalNumOfFiles(files)) { + continue; + } + + // Read data from file paths + return readLines(files); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while(BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + *

Because of eventual consistency, reads may discover no files or fewer files than + * the shard template implies. In this case, the read is considered to have failed. + */ + public List readFilesWithRetries() + throws IOException, InterruptedException { + return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } + + @Override + public String toString() { + return String.format("%s with shard template '%s'", filePattern, shardTemplate); + } + + /** + * Reads all the lines of all the files. + * + *

Not suitable for use except in testing of small data, since the data size may be far more + * than can be reasonably processed serially, in-memory, by a single thread. + */ + @VisibleForTesting + List readLines(Collection files) throws IOException { + List allLines = Lists.newArrayList(); + int i = 1; + for (Metadata file : files) { + try (Reader reader = + Channels.newReader(FileSystems.open(file.resourceId()), + StandardCharsets.UTF_8.name())) { + List lines = CharStreams.readLines(reader); + allLines.addAll(lines); + LOG.debug( + "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); + } + i++; + } + return allLines; + } + + /** + * Check if total number of files is correct by comparing with the number that + * is parsed from shard name using a name template. If no template is specified, + * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total + * number of files. + * + * @return {@code true} if at least one shard name matches template and total number + * of given files equals the number that is parsed from shard name. + */ + @VisibleForTesting + boolean checkTotalNumOfFiles(Collection files) { + for (Metadata fileMedadata : files) { + String fileName = fileMedadata.resourceId().getFilename(); + + if (fileName == null) { + // this path has zero elements + continue; + } + Matcher matcher = shardTemplate.matcher(fileName); + if (!matcher.matches()) { + // shard name doesn't match the pattern, check with the next shard + continue; + } + // once match, extract total number of shards and compare to file list + return files.size() == Integer.parseInt(matcher.group("numshards")); + } + return false; + } + + private String computeHash(@Nonnull List strs) { + if (strs.isEmpty()) { + return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); + } + + List hashCodes = new ArrayList<>(); + for (String str : strs) { + hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); + } + return Hashing.combineUnordered(hashCodes).toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java new file mode 100644 index 0000000..ec9ed64 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.testing.SerializableMatcher; + +/** + * Bare-bones class for using sharded files. + * + *

For internal use only; used only in SDK tests. Must be {@link Serializable} so it can be + * shipped as a {@link SerializableMatcher}. + */ +public interface ShardedFile extends Serializable { + + /** + * Reads the lines from all shards of this file using the provided {@link Sleeper} and {@link + * BackOff}. + */ + List readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java new file mode 100644 index 0000000..4ee6750 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; + +import com.google.api.client.util.BackOff; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.regex.Pattern; +import org.apache.beam.sdk.PipelineResult; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; + +/** Tests for {@link FileChecksumMatcher}. */ +@RunWith(JUnit4.class) +public class FileChecksumMatcherTest { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Rule + public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); + + @Mock + private PipelineResult pResult = Mockito.mock(PipelineResult.class); + + private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff(); + + @Test + public void testPreconditionChecksumIsNull() throws IOException { + String tmpPath = tmpFolder.newFile().getPath(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid checksum, but received")); + new FileChecksumMatcher(null, tmpPath); + } + + @Test + public void testPreconditionChecksumIsEmpty() throws IOException { + String tmpPath = tmpFolder.newFile().getPath(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid checksum, but received")); + new FileChecksumMatcher("", tmpPath); + } + + @Test + public void testPreconditionFilePathIsEmpty() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new FileChecksumMatcher("checksumString", ""); + } + + @Test + public void testPreconditionShardTemplateIsNull() throws IOException { + String tmpPath = tmpFolder.newFile().getPath(); + + thrown.expect(NullPointerException.class); + thrown.expectMessage( + containsString( + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern:")); + new FileChecksumMatcher("checksumString", tmpPath, null); + } + + @Test + public void testMatcherThatVerifiesSingleFile() throws IOException{ + File tmpFile = tmpFolder.newFile("result-000-of-001"); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + FileChecksumMatcher matcher = + new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath()); + + assertThat(pResult, matcher); + } + + @Test + public void testMatcherThatVerifiesMultipleFiles() throws IOException { + File tmpFile1 = tmpFolder.newFile("result-000-of-002"); + File tmpFile2 = tmpFolder.newFile("result-001-of-002"); + File tmpFile3 = tmpFolder.newFile("tmp"); + Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); + Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); + Files.write("tmp", tmpFile3, StandardCharsets.UTF_8); + + FileChecksumMatcher matcher = + new FileChecksumMatcher( + "90552392c28396935fe4f123bd0b5c2d0f6260c8", + tmpFolder.getRoot().toPath().resolve("result-*").toString()); + + assertThat(pResult, matcher); + } + + @Test + public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException { + File emptyFile = tmpFolder.newFile("result-000-of-001"); + Files.write("", emptyFile, StandardCharsets.UTF_8); + FileChecksumMatcher matcher = + new FileChecksumMatcher( + "da39a3ee5e6b4b0d3255bfef95601890afd80709", + tmpFolder.getRoot().toPath().resolve("*").toString()); + + assertThat(pResult, matcher); + } + + @Test + public void testMatcherThatUsesCustomizedTemplate() throws Exception { + // Customized template: resultSSS-totalNNN + File tmpFile1 = tmpFolder.newFile("result0-total2"); + File tmpFile2 = tmpFolder.newFile("result1-total2"); + Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); + Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); + + Pattern customizedTemplate = + Pattern.compile("(?x) result (?\\d+) - total (?\\d+)"); + FileChecksumMatcher matcher = new FileChecksumMatcher( + "90552392c28396935fe4f123bd0b5c2d0f6260c8", + tmpFolder.getRoot().toPath().resolve("*").toString(), + customizedTemplate); + + assertThat(pResult, matcher); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java new file mode 100644 index 0000000..43a9166 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import com.google.api.client.util.BackOff; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.regex.Pattern; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.LocalResources; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; + +/** Tests for {@link NumberedShardedFile}. */ +@RunWith(JUnit4.class) +public class NumberedShardedFileTest { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); + + @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); + + private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff(); + private String filePattern; + + @Before + public void setup() throws IOException { + filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve( + "*", StandardResolveOptions.RESOLVE_FILE).toString(); + } + + @Test + public void testPreconditionFilePathIsNull() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new NumberedShardedFile(null); + } + + @Test + public void testPreconditionFilePathIsEmpty() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new NumberedShardedFile(""); + } + + @Test + public void testReadMultipleShards() throws Exception { + String + contents1 = "To be or not to be, ", + contents2 = "it is not a question.", + contents3 = "should not be included"; + + File tmpFile1 = tmpFolder.newFile("result-000-of-002"); + File tmpFile2 = tmpFolder.newFile("result-001-of-002"); + File tmpFile3 = tmpFolder.newFile("tmp"); + Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); + Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); + Files.write(contents3, tmpFile3, StandardCharsets.UTF_8); + + filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve( + "result-*", StandardResolveOptions.RESOLVE_FILE).toString(); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); + + assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); + } + + @Test + public void testReadEmpty() throws Exception { + File emptyFile = tmpFolder.newFile("result-000-of-001"); + Files.write("", emptyFile, StandardCharsets.UTF_8); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); + + assertThat(shardedFile.readFilesWithRetries(), empty()); + } + + @Test + public void testReadCustomTemplate() throws Exception { + String contents1 = "To be or not to be, ", contents2 = "it is not a question."; + + // Customized template: resultSSS-totalNNN + File tmpFile1 = tmpFolder.newFile("result0-total2"); + File tmpFile2 = tmpFolder.newFile("result1-total2"); + Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); + Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); + + Pattern customizedTemplate = + Pattern.compile("(?x) result (?\\d+) - total (?\\d+)"); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, customizedTemplate); + + assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); + } + + @Test + public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, + Pattern.compile("incorrect-template")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + NumberedShardedFile shardedFile = spy(new NumberedShardedFile(filePattern)); + doThrow(IOException.class) + .when(shardedFile) + .readLines(anyCollection()); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception { + tmpFolder.newFile("result-000-of-001"); + tmpFolder.newFile("tmp-result-000-of-001"); + + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } +}