Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B6ADA200BB1 for ; Thu, 20 Oct 2016 01:36:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B530D160AFB; Wed, 19 Oct 2016 23:36:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 846CD160AEA for ; Thu, 20 Oct 2016 01:36:01 +0200 (CEST) Received: (qmail 84302 invoked by uid 500); 19 Oct 2016 23:36:00 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 84293 invoked by uid 99); 19 Oct 2016 23:36:00 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 23:36:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3843718068A for ; Wed, 19 Oct 2016 23:36:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id cb7SaL1P_7Vt for ; Wed, 19 Oct 2016 23:35:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 6ACEC5FBCD for ; Wed, 19 Oct 2016 23:35:56 +0000 (UTC) Received: (qmail 63874 invoked by uid 99); 19 Oct 2016 23:29:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 23:29:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D8A8E098D; Wed, 19 Oct 2016 23:29:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Wed, 19 Oct 2016 23:29:16 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] incubator-beam git commit: [BEAM-605] Create Bigquery Verifier archived-at: Wed, 19 Oct 2016 23:36:02 -0000 Repository: incubator-beam Updated Branches: refs/heads/master c472e1227 -> 6c6f824aa [BEAM-605] Create Bigquery Verifier Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8e225d7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8e225d7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8e225d7c Branch: refs/heads/master Commit: 8e225d7c36812cbcc831269d76279700e29131f7 Parents: c472e12 Author: Mark Liu Authored: Thu Sep 22 14:34:20 2016 -0700 Committer: Luke Cwik Committed: Wed Oct 19 16:22:56 2016 -0700 ---------------------------------------------------------------------- .../examples/cookbook/BigQueryTornadoesIT.java | 14 +- .../beam/sdk/testing/BigqueryMatcher.java | 235 +++++++++++++++++++ .../beam/sdk/testing/BigqueryMatcherTest.java | 176 ++++++++++++++ 3 files changed, 424 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e225d7c/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java index 8bcab4a..7e15389 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java @@ -18,7 +18,10 @@ package org.apache.beam.examples.cookbook; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.BigqueryMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.junit.Test; @@ -35,7 +38,10 @@ public class BigQueryTornadoesIT { * Options for the BigQueryTornadoes Integration Test. */ public interface BigQueryTornadoesITOptions - extends TestPipelineOptions, BigQueryTornadoes.Options { + extends TestPipelineOptions, BigQueryTornadoes.Options, BigQueryOptions { + @Default.String("043e8e6ee32384df0cda4c241b8ab897f2ce0f2f") + String getChecksum(); + void setChecksum(String value); } @Test @@ -46,6 +52,12 @@ public class BigQueryTornadoesIT { options.setOutput(String.format("%s.%s", "BigQueryTornadoesIT", "monthly_tornadoes_" + System.currentTimeMillis())); + String query = + String.format("SELECT month, tornado_count FROM [%s]", options.getOutput()); + options.setOnSuccessMatcher( + new BigqueryMatcher( + options.getAppName(), options.getProject(), query, options.getChecksum())); + BigQueryTornadoes.main(TestPipeline.convertToArgs(options)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e225d7c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java new file mode 100644 index 0000000..7646caa --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.BigqueryScopes; +import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Transport; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A matcher to verify data in BigQuery by processing given query + * and comparing with content's checksum. + * + *

Example: + *

{@code [
+ *   assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum));
+ * ]}
+ */ +@NotThreadSafe +public class BigqueryMatcher extends TypeSafeMatcher + implements SerializableMatcher { + private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class); + + // The maximum number of retries to execute a BigQuery RPC + static final int MAX_QUERY_RETRIES = 4; + + // The initial backoff for executing a BigQuery RPC + private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L); + + // The total number of rows in query response to be formatted for debugging purpose + private static final int TOTAL_FORMATTED_ROWS = 20; + + // The backoff factory with initial configs + static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_QUERY_RETRIES) + .withInitialBackoff(INITIAL_BACKOFF); + + private final String applicationName; + private final String projectId; + private final String query; + private final String expectedChecksum; + private String actualChecksum; + private transient QueryResponse response; + + public BigqueryMatcher( + String applicationName, String projectId, String query, String expectedChecksum) { + validateArgument("applicationName", applicationName); + validateArgument("projectId", projectId); + validateArgument("query", query); + validateArgument("expectedChecksum", expectedChecksum); + + this.applicationName = applicationName; + this.projectId = projectId; + this.query = query; + this.expectedChecksum = expectedChecksum; + } + + @Override + protected boolean matchesSafely(PipelineResult pipelineResult) { + LOG.info("Verifying Bigquery data"); + Bigquery bigqueryClient = newBigqueryClient(applicationName); + + // execute query + LOG.debug("Executing query: {}", query); + try { + QueryRequest queryContent = new QueryRequest(); + queryContent.setQuery(query); + + response = queryWithRetries( + bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff()); + } catch (Exception e) { + throw new RuntimeException("Failed to fetch BigQuery data.", e); + } + + // validate BigQuery response + if (response == null || response.getRows() == null || response.getRows().isEmpty()) { + return false; + } + + // compute checksum + actualChecksum = generateHash(response.getRows()); + LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum); + + return expectedChecksum.equals(actualChecksum); + } + + @VisibleForTesting + Bigquery newBigqueryClient(String applicationName) { + HttpTransport transport = Transport.getTransport(); + JsonFactory jsonFactory = Transport.getJsonFactory(); + Credential credential = getDefaultCredential(transport, jsonFactory); + + return new Bigquery.Builder(transport, jsonFactory, credential) + .setApplicationName(applicationName) + .build(); + } + + @VisibleForTesting + QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent, + Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOException lastException = null; + do { + try { + return bigqueryClient.jobs().query(projectId, queryContent).execute(); + } catch (IOException e) { + // ignore and retry + LOG.warn("Ignore the error and retry the query."); + lastException = e; + } + } while(BackOffUtils.next(sleeper, backOff)); + throw new IOException( + String.format( + "Unable to get BigQuery response after retrying %d times", MAX_QUERY_RETRIES), + lastException); + } + + private void validateArgument(String name, String value) { + checkArgument( + !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value); + } + + private Credential getDefaultCredential(HttpTransport transport, JsonFactory jsonFactory) { + GoogleCredential credential; + try { + credential = GoogleCredential.getApplicationDefault(transport, jsonFactory); + } catch (IOException e) { + throw new RuntimeException("Failed to get application default credential.", e); + } + + if (credential.createScopedRequired()) { + Collection bigqueryScope = + Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY); + credential = credential.createScoped(bigqueryScope); + } + return credential; + } + + private String generateHash(List rows) { + List rowHashes = Lists.newArrayList(); + for (TableRow row : rows) { + List cellHashes = Lists.newArrayList(); + for (TableCell cell : row.getF()) { + cellHashes.add(Hashing.sha1().hashString(cell.toString(), StandardCharsets.UTF_8)); + } + rowHashes.add(Hashing.combineUnordered(cellHashes)); + } + return Hashing.combineUnordered(rowHashes).toString(); + } + + @Override + public void describeTo(Description description) { + description + .appendText("Expected checksum is (") + .appendText(expectedChecksum) + .appendText(")"); + } + + @Override + public void describeMismatchSafely(PipelineResult pResult, Description description) { + String info; + if (response == null || response.getRows() == null || response.getRows().isEmpty()) { + // invalid query response + info = String.format("Invalid BigQuery response: %s", Objects.toString(response)); + } else { + // checksum mismatch + info = String.format("was (%s).%n" + + "\tTotal number of rows are: %d.%n" + + "\tQueried data details:%s", + actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS)); + } + description.appendText(info); + } + + private String formatRows(int totalNumRows) { + StringBuilder samples = new StringBuilder(); + List rows = response.getRows(); + for (int i = 0; i < totalNumRows && i < rows.size(); i++) { + samples.append("\n\t\t"); + for (TableCell field : rows.get(i).getF()) { + samples.append(String.format("%-10s", field.getV())); + } + } + if (rows.size() > totalNumRows) { + samples.append("\n\t\t...."); + } + return samples.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e225d7c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java new file mode 100644 index 0000000..1c427f8 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.math.BigInteger; +import org.apache.beam.sdk.PipelineResult; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link BigqueryMatcher}. + */ +@RunWith(JUnit4.class) +public class BigqueryMatcherTest { + private final String appName = "test-app"; + private final String projectId = "test-project"; + private final String query = "test-query"; + + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); + @Mock private Bigquery mockBigqueryClient; + @Mock private Bigquery.Jobs mockJobs; + @Mock private Bigquery.Jobs.Query mockQuery; + @Mock private PipelineResult mockResult; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(mockBigqueryClient.jobs()).thenReturn(mockJobs); + when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery); + } + + @Test + public void testBigqueryMatcherThatSucceeds() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher( + appName, projectId, query, "8d1bbbf1f523f924b98c88b00c5811e041c2f855")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(createResponseContainingTestData()); + + assertThat(mockResult, matcher); + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + + @Test + public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "incorrect-checksum")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(createResponseContainingTestData()); + + try { + assertThat(mockResult, matcher); + } catch (AssertionError expected) { + assertThat(expected.getMessage(), containsString("Total number of rows are: 1")); + assertThat(expected.getMessage(), containsString("abc")); + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + } + + @Test + public void testBigqueryMatcherFailsWhenResponseIsNull() throws IOException { + testMatcherFailsSinceInvalidQueryResponse(null); + } + + @Test + public void testBigqueryMatcherFailsWhenNullRowsInResponse() throws IOException { + testMatcherFailsSinceInvalidQueryResponse(new QueryResponse()); + } + + @Test + public void testBigqueryMatcherFailsWhenEmptyRowsInResponse() throws IOException { + QueryResponse response = new QueryResponse(); + response.setRows(Lists.newArrayList()); + + testMatcherFailsSinceInvalidQueryResponse(response); + } + + private void testMatcherFailsSinceInvalidQueryResponse(QueryResponse response) + throws IOException { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "some-checksum")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(response); + + try { + assertThat(mockResult, matcher); + } catch (AssertionError expected) { + assertThat(expected.getMessage(), containsString("Invalid BigQuery response:")); + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError is expected."); + } + + @Test + public void testQueryWithRetriesWhenServiceFails() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "some-checksum")); + when(mockQuery.execute()).thenThrow(new IOException()); + + thrown.expect(IOException.class); + thrown.expectMessage("Unable to get BigQuery response after retrying"); + + matcher.queryWithRetries( + mockBigqueryClient, + new QueryRequest(), + fastClock, + BigqueryMatcher.BACKOFF_FACTORY.backoff()); + + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs, times(BigqueryMatcher.MAX_QUERY_RETRIES)) + .query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + + private QueryResponse createResponseContainingTestData() { + TableCell field1 = new TableCell(); + field1.setV("abc"); + TableCell field2 = new TableCell(); + field2.setV("2"); + TableCell field3 = new TableCell(); + field3.setV("testing BigQuery matcher."); + TableRow row = new TableRow(); + row.setF(Lists.newArrayList(field1, field2, field3)); + + QueryResponse response = new QueryResponse(); + response.setRows(Lists.newArrayList(row)); + response.setTotalRows(BigInteger.ONE); + return response; + } +}