Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8825B200BB1 for ; Thu, 20 Oct 2016 01:36:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 86B9F160AEA; Wed, 19 Oct 2016 23:36:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A384E160AFC for ; Thu, 20 Oct 2016 01:35:59 +0200 (CEST) Received: (qmail 84194 invoked by uid 500); 19 Oct 2016 23:35:58 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 84185 invoked by uid 99); 19 Oct 2016 23:35:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 23:35:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6BB0D1A083F for ; Wed, 19 Oct 2016 23:35:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.969 X-Spam-Level: X-Spam-Status: No, score=-5.969 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id fvsr4dO6K7vE for ; Wed, 19 Oct 2016 23:35:56 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 6AD685FC9E for ; Wed, 19 Oct 2016 23:35:56 +0000 (UTC) Received: (qmail 63876 invoked by uid 99); 19 Oct 2016 23:29:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 23:29:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A0D69E0FC4; Wed, 19 Oct 2016 23:29:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Wed, 19 Oct 2016 23:29:17 -0000 Message-Id: <881f67ec4a6b423481e7bbc8da3e5541@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-beam git commit: Add BigQuery Verifier to WindowedWordCountIT archived-at: Wed, 19 Oct 2016 23:36:00 -0000 Add BigQuery Verifier to WindowedWordCountIT Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd46523d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd46523d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd46523d Branch: refs/heads/master Commit: dd46523dc2bca4aee11265a2fb065cc137920b1d Parents: 8e225d7 Author: Mark Liu Authored: Thu Oct 6 14:34:55 2016 -0700 Committer: Luke Cwik Committed: Wed Oct 19 16:22:57 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/examples/WindowedWordCountIT.java | 11 +++++++++++ .../beam/examples/cookbook/BigQueryTornadoesIT.java | 2 +- .../apache/beam/sdk/testing/BigqueryMatcher.java | 16 ++++++++++------ .../beam/sdk/testing/BigqueryMatcherTest.java | 2 +- 4 files changed, 23 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 379d1b0..6742654 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -19,8 +19,10 @@ package org.apache.beam.examples; import java.io.IOException; import org.apache.beam.examples.WindowedWordCount.Options; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.BigqueryMatcher; import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; @@ -40,6 +42,9 @@ public class WindowedWordCountIT { */ public interface WindowedWordCountITOptions extends Options, TestPipelineOptions, StreamingOptions { + @Default.String("ff54f6f42b2afeb146206c1e8e915deaee0362b4") + String getChecksum(); + void setChecksum(String value); } @Test @@ -59,6 +64,12 @@ public class WindowedWordCountIT { TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); options.setStreaming(isStreaming); + String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word", + options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); + options.setOnSuccessMatcher( + new BigqueryMatcher( + options.getAppName(), options.getProject(), query, options.getChecksum())); + WindowedWordCount.main(TestPipeline.convertToArgs(options)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java index 7e15389..27a5a8f 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java @@ -39,7 +39,7 @@ public class BigQueryTornadoesIT { */ public interface BigQueryTornadoesITOptions extends TestPipelineOptions, BigQueryTornadoes.Options, BigQueryOptions { - @Default.String("043e8e6ee32384df0cda4c241b8ab897f2ce0f2f") + @Default.String("1ab4c7ec460b94bbb3c3885b178bf0e6bed56e1f") String getChecksum(); void setChecksum(String value); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java index 7646caa..95208ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java @@ -40,8 +40,10 @@ import com.google.common.hash.Hashing; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.util.FluentBackoff; @@ -182,14 +184,16 @@ public class BigqueryMatcher extends TypeSafeMatcher return credential; } - private String generateHash(List rows) { + private String generateHash(@Nonnull List rows) { List rowHashes = Lists.newArrayList(); for (TableRow row : rows) { - List cellHashes = Lists.newArrayList(); + List cellsInOneRow = Lists.newArrayList(); for (TableCell cell : row.getF()) { - cellHashes.add(Hashing.sha1().hashString(cell.toString(), StandardCharsets.UTF_8)); + cellsInOneRow.add(Objects.toString(cell.getV())); + Collections.sort(cellsInOneRow); } - rowHashes.add(Hashing.combineUnordered(cellHashes)); + rowHashes.add( + Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8)); } return Hashing.combineUnordered(rowHashes).toString(); } @@ -222,13 +226,13 @@ public class BigqueryMatcher extends TypeSafeMatcher StringBuilder samples = new StringBuilder(); List rows = response.getRows(); for (int i = 0; i < totalNumRows && i < rows.size(); i++) { - samples.append("\n\t\t"); + samples.append(String.format("%n\t\t")); for (TableCell field : rows.get(i).getF()) { samples.append(String.format("%-10s", field.getV())); } } if (rows.size() > totalNumRows) { - samples.append("\n\t\t...."); + samples.append(String.format("%n\t\t...")); } return samples.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java index 1c427f8..d0ae765 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java @@ -74,7 +74,7 @@ public class BigqueryMatcherTest { public void testBigqueryMatcherThatSucceeds() throws Exception { BigqueryMatcher matcher = spy( new BigqueryMatcher( - appName, projectId, query, "8d1bbbf1f523f924b98c88b00c5811e041c2f855")); + appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc")); doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); when(mockQuery.execute()).thenReturn(createResponseContainingTestData());