Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F3C09200CFC for ; Wed, 23 Aug 2017 19:09:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F2259167459; Wed, 23 Aug 2017 17:09:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 652291672C9 for ; Wed, 23 Aug 2017 19:09:18 +0200 (CEST) Received: (qmail 28498 invoked by uid 500); 23 Aug 2017 17:09:16 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 24203 invoked by uid 99); 23 Aug 2017 17:09:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 17:09:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BAA99F5F18; Wed, 23 Aug 2017 17:09:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: iemejia@apache.org To: commits@beam.apache.org Date: Wed, 23 Aug 2017 17:09:39 -0000 Message-Id: <08fe8de2e95e4171b8becb4617d0c4de@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/55] [abbrv] beam git commit: Fix static analysis issues archived-at: Wed, 23 Aug 2017 17:09:22 -0000 Fix static analysis issues Restrict access level on classes + other static analysis fixes Fix findbugs issues (issue #33) Fix compile after AvroIO, TextIO, PubsubIO and State refactor Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1541fad0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1541fad0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1541fad0 Branch: refs/heads/master Commit: 1541fad077e47df1d47636fd186a72aa827bbc42 Parents: a39cb80 Author: Ismaël Mejía Authored: Mon May 1 00:54:08 2017 +0200 Committer: Ismaël Mejía Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/pom.xml | 2 +- .../beam/integration/nexmark/Monitor.java | 4 +- .../beam/integration/nexmark/NexmarkDriver.java | 12 +- .../beam/integration/nexmark/NexmarkRunner.java | 124 +++++++++++-------- .../beam/integration/nexmark/NexmarkUtils.java | 34 +++-- .../beam/integration/nexmark/model/Auction.java | 8 +- .../integration/nexmark/model/AuctionCount.java | 6 +- .../integration/nexmark/model/AuctionPrice.java | 4 +- .../nexmark/model/BidsPerSession.java | 4 +- .../beam/integration/nexmark/model/Done.java | 2 +- .../beam/integration/nexmark/model/Event.java | 13 -- .../nexmark/model/IdNameReserve.java | 6 +- .../nexmark/model/NameCityStateId.java | 8 +- .../beam/integration/nexmark/model/Person.java | 6 +- .../integration/nexmark/model/SellerPrice.java | 2 +- .../nexmark/queries/AbstractSimulator.java | 10 +- .../nexmark/queries/NexmarkQuery.java | 34 ++--- .../nexmark/queries/NexmarkQueryModel.java | 17 +-- .../nexmark/queries/Query0Model.java | 2 +- .../integration/nexmark/queries/Query10.java | 6 +- .../integration/nexmark/queries/Query11.java | 3 +- .../nexmark/queries/Query1Model.java | 2 +- .../integration/nexmark/queries/Query3.java | 24 ++-- .../nexmark/queries/Query3Model.java | 2 +- .../nexmark/queries/Query4Model.java | 5 +- .../integration/nexmark/queries/Query5.java | 4 +- .../integration/nexmark/queries/Query6.java | 4 +- .../nexmark/queries/Query6Model.java | 5 +- .../nexmark/queries/WinningBids.java | 30 +++-- .../integration/nexmark/sources/Generator.java | 11 +- .../nexmark/sources/GeneratorConfig.java | 26 ++-- .../nexmark/sources/UnboundedEventSource.java | 2 +- .../sources/UnboundedEventSourceTest.java | 5 +- integration/pom.xml | 14 +++ 34 files changed, 221 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index fb213e9..8a65c0f 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -210,7 +210,7 @@ org.apache.beam - beam-sdks-java-extensions-gcp-core + beam-sdks-java-extensions-google-cloud-platform-core http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java index cb4d71c..2f0c56a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java @@ -63,8 +63,8 @@ public class Monitor implements Serializable { public final String name; public final String prefix; - final MonitorDoFn doFn; - final PTransform, PCollection> transform; + private final MonitorDoFn doFn; + private final PTransform, PCollection> transform; public Monitor(String name, String prefix) { this.name = name; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java index 7d532cc..a982a8d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java @@ -57,7 +57,7 @@ public class NexmarkDriver { /** * Entry point. */ - public void runAll(OptionT options, NexmarkRunner runner) { + void runAll(OptionT options, NexmarkRunner runner) { Instant start = Instant.now(); Map baseline = loadBaseline(options.getBaselineFilename()); Map actual = new LinkedHashMap<>(); @@ -87,7 +87,7 @@ public class NexmarkDriver { } if (!successful) { - System.exit(1); + throw new RuntimeException("Execution was not successful"); } } @@ -149,8 +149,6 @@ public class NexmarkDriver { /** * Print summary of {@code actual} vs (if non-null) {@code baseline}. - * - * @throws IOException */ private static void saveSummary( @Nullable String summaryFilename, @@ -227,7 +225,7 @@ public class NexmarkDriver { if (actualPerf != null) { List errors = actualPerf.errors; if (errors == null) { - errors = new ArrayList(); + errors = new ArrayList<>(); errors.add("NexmarkGoogleRunner returned null errors list"); } for (String error : errors) { @@ -300,7 +298,7 @@ public class NexmarkDriver { NexmarkOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(NexmarkOptions.class); - NexmarkRunner runner = new NexmarkRunner(options); - new NexmarkDriver().runAll(options, runner); + NexmarkRunner runner = new NexmarkRunner<>(options); + new NexmarkDriver<>().runAll(options, runner); } } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index a3c4d33..6df76f0 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ThreadLocalRandom; @@ -65,10 +66,12 @@ import org.apache.beam.integration.nexmark.queries.Query9; import org.apache.beam.integration.nexmark.queries.Query9Model; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; @@ -77,6 +80,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TimestampedValue; @@ -91,15 +95,15 @@ public class NexmarkRunner { /** * Minimum number of samples needed for 'stead-state' rate calculation. */ - protected static final int MIN_SAMPLES = 9; + private static final int MIN_SAMPLES = 9; /** * Minimum length of time over which to consider samples for 'steady-state' rate calculation. */ - protected static final Duration MIN_WINDOW = Duration.standardMinutes(2); + private static final Duration MIN_WINDOW = Duration.standardMinutes(2); /** * Delay between perf samples. */ - protected static final Duration PERF_DELAY = Duration.standardSeconds(15); + private static final Duration PERF_DELAY = Duration.standardSeconds(15); /** * How long to let streaming pipeline run after all events have been generated and we've * seen no activity. @@ -117,37 +121,37 @@ public class NexmarkRunner { /** * NexmarkOptions shared by all runs. */ - protected final OptionT options; + private final OptionT options; /** * Which configuration we are running. */ @Nullable - protected NexmarkConfiguration configuration; + private NexmarkConfiguration configuration; /** * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null. */ @Nullable - protected Monitor publisherMonitor; + private Monitor publisherMonitor; /** * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null. */ @Nullable - protected PipelineResult publisherResult; + private PipelineResult publisherResult; /** * Result for the main pipeline. */ @Nullable - protected PipelineResult mainResult; + private PipelineResult mainResult; /** * Query name we are running. */ @Nullable - protected String queryName; + private String queryName; public NexmarkRunner(OptionT options) { this.options = options; @@ -160,7 +164,7 @@ public class NexmarkRunner { /** * Is this query running in streaming mode? */ - protected boolean isStreaming() { + private boolean isStreaming() { return options.isStreaming(); } @@ -174,7 +178,7 @@ public class NexmarkRunner { /** * Return maximum number of workers. */ - protected int maxNumWorkers() { + private int maxNumWorkers() { return 5; } @@ -182,7 +186,7 @@ public class NexmarkRunner { * Return the current value for a long counter, or a default value if can't be retrieved. * Note this uses only attempted metrics because some runners don't support committed metrics. */ - protected long getCounterMetric(PipelineResult result, String namespace, String name, + private long getCounterMetric(PipelineResult result, String namespace, String name, long defaultValue) { //TODO Ismael calc this only once MetricQueryResults metrics = result.metrics().queryMetrics( @@ -201,7 +205,7 @@ public class NexmarkRunner { * Return the current value for a long counter, or a default value if can't be retrieved. * Note this uses only attempted metrics because some runners don't support committed metrics. */ - protected long getDistributionMetric(PipelineResult result, String namespace, String name, + private long getDistributionMetric(PipelineResult result, String namespace, String name, DistributionType distType, long defaultValue) { MetricQueryResults metrics = result.metrics().queryMetrics( MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); @@ -226,7 +230,7 @@ public class NexmarkRunner { /** * Return the current value for a time counter, or -1 if can't be retrieved. */ - protected long getTimestampMetric(long now, long value) { + private long getTimestampMetric(long now, long value) { //TODO Ismael improve doc if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) { return -1; @@ -238,8 +242,7 @@ public class NexmarkRunner { * Find a 'steady state' events/sec from {@code snapshots} and * store it in {@code perf} if found. */ - protected void captureSteadyState(NexmarkPerf perf, - List snapshots) { + private void captureSteadyState(NexmarkPerf perf, List snapshots) { if (!options.isStreaming()) { return; } @@ -426,7 +429,7 @@ public class NexmarkRunner { return perf; } - protected String getJobId(PipelineResult job) { + private String getJobId(PipelineResult job) { return ""; } @@ -528,15 +531,14 @@ public class NexmarkRunner { /** * Build and run a pipeline using specified options. */ - protected interface PipelineBuilder { + interface PipelineBuilder { void build(OptionT publishOnlyOptions); } /** * Invoke the builder with options suitable for running a publish-only child pipeline. */ - protected void invokeBuilderForPublishOnlyPipeline( - PipelineBuilder builder) { + private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) { builder.build(options); // throw new UnsupportedOperationException( // "Cannot use --pubSubMode=COMBINED with DirectRunner"); @@ -546,7 +548,7 @@ public class NexmarkRunner { * If monitoring, wait until the publisher pipeline has run long enough to establish * a backlog on the Pubsub topic. Otherwise, return immediately. */ - protected void waitForPublisherPreload() { + private void waitForPublisherPreload() { throw new UnsupportedOperationException(); } @@ -555,7 +557,7 @@ public class NexmarkRunner { * it was measured. */ @Nullable - protected NexmarkPerf monitor(NexmarkQuery query) { + private NexmarkPerf monitor(NexmarkQuery query) { if (!options.getMonitorJobs()) { return null; } @@ -841,14 +843,28 @@ public class NexmarkRunner { private PCollection sourceEventsFromPubsub(Pipeline p, long now) { String shortSubscription = shortSubscription(now); NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription); - PubsubIO.Read io = - PubsubIO.read().fromSubscription(shortSubscription) - .withIdAttribute(NexmarkUtils.PUBSUB_ID) - .withCoder(Event.CODER); + + PubsubIO.Read io = + PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); if (!configuration.usePubsubPublishTime) { io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); } - return p.apply(queryName + ".ReadPubsubEvents", io); + + return p + .apply(queryName + ".ReadPubsubEvents", io) + .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + byte[] payload = c.element().getPayload(); + try { + Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); + c.output(event); + } catch (CoderException e) { + // TODO Log decoding Event error + } + } + })); } /** @@ -861,9 +877,8 @@ public class NexmarkRunner { } NexmarkUtils.console("Reading events from Avro files at %s", filename); return p - .apply(queryName + ".ReadAvroEvents", AvroIO.Read - .from(filename + "*.avro") - .withSchema(Event.class)) + .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class) + .from(filename + "*.avro")) .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA); } @@ -873,14 +888,28 @@ public class NexmarkRunner { private void sinkEventsToPubsub(PCollection events, long now) { String shortTopic = shortTopic(now); NexmarkUtils.console("Writing events to Pubsub %s", shortTopic); - PubsubIO.Write io = - PubsubIO.write().to(shortTopic) - .withIdAttribute(NexmarkUtils.PUBSUB_ID) - .withCoder(Event.CODER); + + PubsubIO.Write io = + PubsubIO.writePubsubMessages().to(shortTopic) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); if (!configuration.usePubsubPublishTime) { io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); } - events.apply(queryName + ".WritePubsubEvents", io); + + events.apply(queryName + ".EventToPubsubMessage", + ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + try { + byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element()); + c.output(new PubsubMessage(payload, new HashMap())); + } catch (CoderException e1) { + // TODO Log encoding Event error + } + } + }) + ) + .apply(queryName + ".WritePubsubEvents", io); } /** @@ -890,7 +919,7 @@ public class NexmarkRunner { String shortTopic = shortTopic(now); NexmarkUtils.console("Writing results to Pubsub %s", shortTopic); PubsubIO.Write io = - PubsubIO.write().to(shortTopic) + PubsubIO.writeStrings().to(shortTopic) .withIdAttribute(NexmarkUtils.PUBSUB_ID); if (!configuration.usePubsubPublishTime) { io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); @@ -917,18 +946,16 @@ public class NexmarkRunner { } NexmarkUtils.console("Writing events to Avro files at %s", filename); source.apply(queryName + ".WriteAvroEvents", - AvroIO.Write.to(filename + "/event").withSuffix(".avro").withSchema(Event.class)); + AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro")); source.apply(NexmarkQuery.JUST_BIDS) .apply(queryName + ".WriteAvroBids", - AvroIO.Write.to(filename + "/bid").withSuffix(".avro").withSchema(Bid.class)); + AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro")); source.apply(NexmarkQuery.JUST_NEW_AUCTIONS) .apply(queryName + ".WriteAvroAuctions", - AvroIO.Write.to(filename + "/auction").withSuffix(".avro") - .withSchema(Auction.class)); + AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro")); source.apply(NexmarkQuery.JUST_NEW_PERSONS) .apply(queryName + ".WriteAvroPeople", - AvroIO.Write.to(filename + "/person").withSuffix(".avro") - .withSchema(Person.class)); + AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro")); } /** @@ -938,7 +965,7 @@ public class NexmarkRunner { String filename = textFilename(now); NexmarkUtils.console("Writing results to text files at %s", filename); formattedResults.apply(queryName + ".WriteTextResults", - TextIO.Write.to(filename)); + TextIO.write().to(filename)); } private static class StringToTableRow extends DoFn { @@ -1010,12 +1037,12 @@ public class NexmarkRunner { // Send synthesized events to Pubsub in separate publisher job. // We won't start the main pipeline until the publisher has sent the pre-load events. // We'll shutdown the publisher job when we notice the main job has finished. - invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() { + invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() { @Override public void build(NexmarkOptions publishOnlyOptions) { Pipeline sp = Pipeline.create(options); NexmarkUtils.setupPipeline(configuration.coderStrategy, sp); - publisherMonitor = new Monitor(queryName, "publisher"); + publisherMonitor = new Monitor<>(queryName, "publisher"); sinkEventsToPubsub( sourceEventsFromSynthetic(sp) .apply(queryName + ".Monitor", publisherMonitor.getTransform()), @@ -1140,9 +1167,6 @@ public class NexmarkRunner { checkState(queryName == null); configuration = runConfiguration; - // GCS URI patterns to delete on exit. - List pathsToDelete = new ArrayList<>(); - try { NexmarkUtils.console("Running %s", configuration.toShortString()); @@ -1220,9 +1244,6 @@ public class NexmarkRunner { } ((Query10) query).setOutputPath(path); ((Query10) query).setMaxNumWorkers(maxNumWorkers()); - if (path != null && options.getManageResources()) { - pathsToDelete.add(path + "/**"); - } } // Apply query. @@ -1252,7 +1273,6 @@ public class NexmarkRunner { } finally { configuration = null; queryName = null; - // TODO: Cleanup pathsToDelete } } } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index 18589c4..f6215e9 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -55,6 +55,9 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -63,9 +66,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; @@ -178,7 +178,7 @@ public class NexmarkUtils { /** Names are suffixed with the query being run. */ QUERY, /** Names are suffixed with the query being run and a random number. */ - QUERY_AND_SALT; + QUERY_AND_SALT } /** @@ -310,7 +310,7 @@ public class NexmarkUtils { * Log message to console. For client side only. */ public static void console(String format, Object... args) { - System.out.printf("%s %s\n", Instant.now(), String.format(format, args)); + System.out.printf("%s %s%n", Instant.now(), String.format(format, args)); } /** @@ -326,7 +326,7 @@ public class NexmarkUtils { /** * All events will be given a timestamp relative to this time (ms since epoch). */ - public static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis(); + private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis(); /** * Instants guaranteed to be strictly before and after all event timestamps, and which won't @@ -377,7 +377,7 @@ public class NexmarkUtils { /** * Return a generator config to match the given {@code options}. */ - public static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) { + private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) { return new GeneratorConfig(configuration, configuration.useWallclockEventTime ? System.currentTimeMillis() : BASE_TIME, 0, @@ -558,15 +558,14 @@ public class NexmarkUtils { } p++; } - long next = System.currentTimeMillis(); - now = next; + now = System.currentTimeMillis(); } c.output(c.element()); } }); } - private static final StateSpec> DUMMY_TAG = + private static final StateSpec> DUMMY_TAG = StateSpecs.value(ByteArrayCoder.of()); private static final int MAX_BUFFER_SIZE = 1 << 24; @@ -578,20 +577,19 @@ public class NexmarkUtils { @ProcessElement public void processElement(ProcessContext c) { long remain = bytes; - long start = System.currentTimeMillis(); - long now = start; +// long now = System.currentTimeMillis(); while (remain > 0) { + //TODO Ismael google on state long thisBytes = Math.min(remain, MAX_BUFFER_SIZE); remain -= thisBytes; - byte[] arr = new byte[(int) thisBytes]; - for (int i = 0; i < thisBytes; i++) { - arr[i] = (byte) now; - } - //TODO Ismael google on state +// byte[] arr = new byte[(int) thisBytes]; +// for (int i = 0; i < thisBytes; i++) { +// arr[i] = (byte) now; +// } // ValueState state = c.windowingInternals().stateInternals().state( // StateNamespaces.global(), DUMMY_TAG); // state.write(arr); - now = System.currentTimeMillis(); +// now = System.currentTimeMillis(); } c.output(c.element()); } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java index 4b1a848..5c018dc 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java @@ -81,14 +81,14 @@ public class Auction implements KnownSize, Serializable { /** Extra auction properties. */ @JsonProperty - public final String itemName; + private final String itemName; @JsonProperty - public final String description; + private final String description; /** Initial bid price, in cents. */ @JsonProperty - public final long initialBid; + private final long initialBid; /** Reserve price, in cents. */ @JsonProperty @@ -111,7 +111,7 @@ public class Auction implements KnownSize, Serializable { /** Additional arbitrary payload for performance testing. */ @JsonProperty - public final String extra; + private final String extra; // For Avro only. http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java index e6d3450..c83a455 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java @@ -54,11 +54,9 @@ public class AuctionCount implements KnownSize, Serializable { } }; - @JsonProperty - public final long auction; + @JsonProperty private final long auction; - @JsonProperty - public final long count; + @JsonProperty private final long count; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java index cb971e2..43d0b27 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java @@ -55,11 +55,11 @@ public class AuctionPrice implements KnownSize, Serializable { }; @JsonProperty - public final long auction; + private final long auction; /** Price in cents. */ @JsonProperty - public final long price; + private final long price; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java index 26b6a41..6dddf34 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java @@ -56,10 +56,10 @@ public class BidsPerSession implements KnownSize, Serializable { }; @JsonProperty - public final long personId; + private final long personId; @JsonProperty - public final long bidsPerSession; + private final long bidsPerSession; public BidsPerSession() { personId = 0; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java index 42999cd..0c14e8f 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java @@ -54,7 +54,7 @@ public class Done implements KnownSize, Serializable { }; @JsonProperty - public final String message; + private final String message; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java index e2130c9..1f1f096 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java @@ -138,19 +138,6 @@ public class Event implements KnownSize, Serializable { } } - /** - * Remove {@code annotation} from event. (Used for debugging.) - */ - public Event withoutAnnotation(String annotation) { - if (newPerson != null) { - return new Event(newPerson.withoutAnnotation(annotation)); - } else if (newAuction != null) { - return new Event(newAuction.withoutAnnotation(annotation)); - } else { - return new Event(bid.withoutAnnotation(annotation)); - } - } - @Override public long sizeInBytes() { if (newPerson != null) { http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java index cf1e571..17b8c4a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java @@ -60,14 +60,14 @@ public class IdNameReserve implements KnownSize, Serializable { }; @JsonProperty - public final long id; + private final long id; @JsonProperty - public final String name; + private final String name; /** Reserve price in cents. */ @JsonProperty - public final long reserve; + private final long reserve; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java index 86d1738..28f25cd 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java @@ -62,16 +62,16 @@ public class NameCityStateId implements KnownSize, Serializable { }; @JsonProperty - public final String name; + private final String name; @JsonProperty - public final String city; + private final String city; @JsonProperty - public final String state; + private final String state; @JsonProperty - public final long id; + private final long id; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java index 906df94..c690fd4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java @@ -77,10 +77,10 @@ public class Person implements KnownSize, Serializable { public final String name; @JsonProperty - public final String emailAddress; + private final String emailAddress; @JsonProperty - public final String creditCard; + private final String creditCard; @JsonProperty public final String city; @@ -93,7 +93,7 @@ public class Person implements KnownSize, Serializable { /** Additional arbitrary payload for performance testing. */ @JsonProperty - public final String extra; + private final String extra; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java index 68f2697..52ff540 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java @@ -60,7 +60,7 @@ public class SellerPrice implements KnownSize, Serializable { /** Price in cents. */ @JsonProperty - public final long price; + private final long price; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java index 270b5c3..1395182 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java @@ -37,7 +37,7 @@ import org.joda.time.Instant; */ public abstract class AbstractSimulator { /** Window size for action bucket sampling. */ - public static final Duration WINDOW_SIZE = Duration.standardMinutes(1); + private static final Duration WINDOW_SIZE = Duration.standardMinutes(1); /** Input event stream we should draw from. */ private final Iterator> input; @@ -77,7 +77,7 @@ public abstract class AbstractSimulator { /** Called by implementors of {@link #run}: Fetch the next input element. */ @Nullable - protected TimestampedValue nextInput() { + TimestampedValue nextInput() { if (!input.hasNext()) { return null; } @@ -90,7 +90,7 @@ public abstract class AbstractSimulator { * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of * recording the expected activity of the query over time. */ - protected void addIntermediateResult(TimestampedValue result) { + void addIntermediateResult(TimestampedValue result) { NexmarkUtils.info("intermediate result: %s", result); updateCounts(result.getTimestamp()); } @@ -99,7 +99,7 @@ public abstract class AbstractSimulator { * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking * semantic correctness. */ - protected void addResult(TimestampedValue result) { + void addResult(TimestampedValue result) { NexmarkUtils.info("result: %s", result); pendingResults.add(result); updateCounts(result.getTimestamp()); @@ -121,7 +121,7 @@ public abstract class AbstractSimulator { } /** Called by implementors of {@link #run}: Record that no more results will be emitted. */ - protected void allDone() { + void allDone() { isDone = true; } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java index 0796ce5..09415c0 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java @@ -46,10 +46,10 @@ public abstract class NexmarkQuery extends PTransform, PCollection>> { public static final TupleTag AUCTION_TAG = new TupleTag<>("auctions"); public static final TupleTag BID_TAG = new TupleTag<>("bids"); - protected static final TupleTag PERSON_TAG = new TupleTag<>("person"); + static final TupleTag PERSON_TAG = new TupleTag<>("person"); /** Predicate to detect a new person event. */ - protected static final SerializableFunction IS_NEW_PERSON = + private static final SerializableFunction IS_NEW_PERSON = new SerializableFunction() { @Override public Boolean apply(Event event) { @@ -58,7 +58,7 @@ public abstract class NexmarkQuery }; /** DoFn to convert a new person event to a person. */ - protected static final DoFn AS_PERSON = new DoFn() { + private static final DoFn AS_PERSON = new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().newPerson); @@ -66,7 +66,7 @@ public abstract class NexmarkQuery }; /** Predicate to detect a new auction event. */ - protected static final SerializableFunction IS_NEW_AUCTION = + private static final SerializableFunction IS_NEW_AUCTION = new SerializableFunction() { @Override public Boolean apply(Event event) { @@ -75,7 +75,7 @@ public abstract class NexmarkQuery }; /** DoFn to convert a new auction event to an auction. */ - protected static final DoFn AS_AUCTION = new DoFn() { + private static final DoFn AS_AUCTION = new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().newAuction); @@ -83,7 +83,7 @@ public abstract class NexmarkQuery }; /** Predicate to detect a new bid event. */ - protected static final SerializableFunction IS_BID = + private static final SerializableFunction IS_BID = new SerializableFunction() { @Override public Boolean apply(Event event) { @@ -92,7 +92,7 @@ public abstract class NexmarkQuery }; /** DoFn to convert a bid event to a bid. */ - protected static final DoFn AS_BID = new DoFn() { + private static final DoFn AS_BID = new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().bid); @@ -100,7 +100,7 @@ public abstract class NexmarkQuery }; /** Transform to key each person by their id. */ - protected static final ParDo.SingleOutput> PERSON_BY_ID = + static final ParDo.SingleOutput> PERSON_BY_ID = ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -109,7 +109,7 @@ public abstract class NexmarkQuery }); /** Transform to key each auction by its id. */ - protected static final ParDo.SingleOutput> AUCTION_BY_ID = + static final ParDo.SingleOutput> AUCTION_BY_ID = ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -118,7 +118,7 @@ public abstract class NexmarkQuery }); /** Transform to key each auction by its seller id. */ - protected static final ParDo.SingleOutput> AUCTION_BY_SELLER = + static final ParDo.SingleOutput> AUCTION_BY_SELLER = ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -127,7 +127,7 @@ public abstract class NexmarkQuery }); /** Transform to key each bid by it's auction id. */ - protected static final ParDo.SingleOutput> BID_BY_AUCTION = + static final ParDo.SingleOutput> BID_BY_AUCTION = ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -136,7 +136,7 @@ public abstract class NexmarkQuery }); /** Transform to project the auction id from each bid. */ - protected static final ParDo.SingleOutput BID_TO_AUCTION = + static final ParDo.SingleOutput BID_TO_AUCTION = ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { @@ -145,7 +145,7 @@ public abstract class NexmarkQuery }); /** Transform to project the price from each bid. */ - protected static final ParDo.SingleOutput BID_TO_PRICE = + static final ParDo.SingleOutput BID_TO_PRICE = ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { @@ -205,13 +205,13 @@ public abstract class NexmarkQuery } }; - protected final NexmarkConfiguration configuration; + final NexmarkConfiguration configuration; public final Monitor eventMonitor; public final Monitor resultMonitor; - public final Monitor endOfStreamMonitor; - protected final Counter fatalCounter; + private final Monitor endOfStreamMonitor; + private final Counter fatalCounter; - protected NexmarkQuery(NexmarkConfiguration configuration, String name) { + NexmarkQuery(NexmarkConfiguration configuration, String name) { super(name); this.configuration = configuration; if (configuration.debug) { http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java index 1ad9099..bfa668b 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java @@ -43,7 +43,7 @@ import org.junit.Assert; public abstract class NexmarkQueryModel implements Serializable { public final NexmarkConfiguration configuration; - public NexmarkQueryModel(NexmarkConfiguration configuration) { + NexmarkQueryModel(NexmarkConfiguration configuration) { this.configuration = configuration; } @@ -51,7 +51,7 @@ public abstract class NexmarkQueryModel implements Serializable { * Return the start of the most recent window of {@code size} and {@code period} which ends * strictly before {@code timestamp}. */ - public static Instant windowStart(Duration size, Duration period, Instant timestamp) { + static Instant windowStart(Duration size, Duration period, Instant timestamp) { long ts = timestamp.getMillis(); long p = period.getMillis(); long lim = ts - ts % p; @@ -60,7 +60,7 @@ public abstract class NexmarkQueryModel implements Serializable { } /** Convert {@code itr} to strings capturing values, timestamps and order. */ - protected static List toValueTimestampOrder(Iterator> itr) { + static List toValueTimestampOrder(Iterator> itr) { List strings = new ArrayList<>(); while (itr.hasNext()) { strings.add(itr.next().toString()); @@ -69,7 +69,7 @@ public abstract class NexmarkQueryModel implements Serializable { } /** Convert {@code itr} to strings capturing values and order. */ - protected static List toValueOrder(Iterator> itr) { + static List toValueOrder(Iterator> itr) { List strings = new ArrayList<>(); while (itr.hasNext()) { strings.add(itr.next().getValue().toString()); @@ -78,7 +78,7 @@ public abstract class NexmarkQueryModel implements Serializable { } /** Convert {@code itr} to strings capturing values only. */ - protected static Set toValue(Iterator> itr) { + static Set toValue(Iterator> itr) { Set strings = new HashSet<>(); while (itr.hasNext()) { strings.add(itr.next().getValue().toString()); @@ -90,7 +90,7 @@ public abstract class NexmarkQueryModel implements Serializable { public abstract AbstractSimulator simulator(); /** Return sub-sequence of results which are significant for model. */ - protected Iterable> relevantResults( + Iterable> relevantResults( Iterable> results) { return results; } @@ -104,8 +104,6 @@ public abstract class NexmarkQueryModel implements Serializable { /** Return assertion to use on results of pipeline for this query. */ public SerializableFunction>, Void> assertionFor() { final Collection expectedStrings = toCollection(simulator().results()); - final String[] expectedStringsArray = - expectedStrings.toArray(new String[expectedStrings.size()]); return new SerializableFunction>, Void>() { @Override @@ -113,9 +111,6 @@ public abstract class NexmarkQueryModel implements Serializable { Collection actualStrings = toCollection(relevantResults(actual).iterator()); Assert.assertThat("wrong pipeline output", actualStrings, IsEqual.equalTo(expectedStrings)); -//compare without order -// Assert.assertThat("wrong pipeline output", actualStrings, -// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray)); return null; } }; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java index 6fb6613..8e65591 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java @@ -32,7 +32,7 @@ public class Query0Model extends NexmarkQueryModel { /** * Simulator for query 0. */ - private class Simulator extends AbstractSimulator { + private static class Simulator extends AbstractSimulator { public Simulator(NexmarkConfiguration configuration) { super(NexmarkUtils.standardEventIterator(configuration)); } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java index c919691..516dab1 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java @@ -35,7 +35,7 @@ import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; @@ -101,7 +101,7 @@ public class Query10 extends NexmarkQuery { @Override public String toString() { - return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename); + return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename); } } @@ -130,8 +130,6 @@ public class Query10 extends NexmarkQuery { /** * Return channel for writing bytes to GCS. - * - * @throws IOException */ private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java index fd936a9..6db9bcf 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java @@ -63,14 +63,13 @@ public class Query11 extends NexmarkQuery { Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))) .discardingFiredPanes() .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2))); - PCollection bidsPerSession = biddersWindowed.apply(Count.perElement()) + return biddersWindowed.apply(Count.perElement()) .apply(name + ".ToResult", ParDo.of(new DoFn, BidsPerSession>() { @ProcessElement public void processElement(ProcessContext c) { c.output(new BidsPerSession(c.element().getKey(), c.element().getValue())); } })); - return bidsPerSession; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java index 0388687..5d4de45 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java @@ -34,7 +34,7 @@ public class Query1Model extends NexmarkQueryModel implements Serializable { /** * Simulator for query 1. */ - private class Simulator extends AbstractSimulator { + private static class Simulator extends AbstractSimulator { public Simulator(NexmarkConfiguration configuration) { super(NexmarkUtils.standardEventIterator(configuration)); } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java index 71364ba..f74b78d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java @@ -17,7 +17,6 @@ */ package org.apache.beam.integration.nexmark.queries; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.beam.integration.nexmark.NexmarkConfiguration; @@ -30,6 +29,13 @@ import org.apache.beam.integration.nexmark.model.Person; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.ParDo; @@ -41,13 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -176,18 +175,18 @@ public class Query3 extends NexmarkQuery { */ private static class JoinDoFn extends DoFn, KV> { - private int maxAuctionsWaitingTime; + private final int maxAuctionsWaitingTime; private static final String AUCTIONS = "auctions"; private static final String PERSON = "person"; @StateId(PERSON) - private static final StateSpec> personSpec = + private static final StateSpec> personSpec = StateSpecs.value(Person.CODER); private static final String PERSON_STATE_EXPIRING = "personStateExpiring"; @StateId(AUCTIONS) - private final StateSpec>> auctionsSpec = + private final StateSpec>> auctionsSpec = StateSpecs.value(ListCoder.of(Auction.CODER)); @TimerId(PERSON_STATE_EXPIRING) @@ -219,8 +218,7 @@ public class Query3 extends NexmarkQuery { ProcessContext c, @TimerId(PERSON_STATE_EXPIRING) Timer timer, @StateId(PERSON) ValueState personState, - @StateId(AUCTIONS) ValueState> auctionsState) - throws IOException { + @StateId(AUCTIONS) ValueState> auctionsState) { // We would *almost* implement this by rewindowing into the global window and // running a combiner over the result. The combiner's accumulator would be the // state we use below. However, combiners cannot emit intermediate results, thus http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java index 6b98e2a..f415709 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java @@ -42,7 +42,7 @@ public class Query3Model extends NexmarkQueryModel implements Serializable { /** * Simulator for query 3. */ - private class Simulator extends AbstractSimulator { + private static class Simulator extends AbstractSimulator { /** Auctions, indexed by seller id. */ private final Multimap newAuctions; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java index 634a58e..269e47a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java @@ -93,8 +93,9 @@ public class Query4Model extends NexmarkQueryModel implements Serializable { } totals.put(category, total); } - for (long category : counts.keySet()) { - long count = counts.get(category); + for (Map.Entry entry : counts.entrySet()) { + long category = entry.getKey(); + long count = entry.getValue(); long total = totals.get(category); TimestampedValue result = TimestampedValue.of( new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp); http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java index 18ce578..1944330 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java @@ -18,7 +18,7 @@ package org.apache.beam.integration.nexmark.queries; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.beam.integration.nexmark.NexmarkConfiguration; import org.apache.beam.integration.nexmark.NexmarkUtils; @@ -80,7 +80,7 @@ public class Query5 extends NexmarkQuery { ParDo.of(new DoFn, KV, Long>>() { @ProcessElement public void processElement(ProcessContext c) { - c.output(KV.of(Arrays.asList(c.element().getKey()), c.element().getValue())); + c.output(KV.of(Collections.singletonList(c.element().getKey()), c.element().getValue())); } })) http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java index 65789ab..ea39ede 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java @@ -86,9 +86,7 @@ public class Query6 extends NexmarkQuery { public List mergeAccumulators(Iterable> accumulators) { List result = new ArrayList<>(); for (List accumulator : accumulators) { - for (Bid bid : accumulator) { - result.add(bid); - } + result.addAll(accumulator); } Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE); if (result.size() > maxNumBids) { http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java index 0691714..9cb8b3d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java @@ -86,8 +86,9 @@ public class Query6Model extends NexmarkQueryModel implements Serializable { protected void run() { TimestampedValue timestampedWinningBid = nextInput(); if (timestampedWinningBid == null) { - for (long seller : numWinningBidsPerSeller.keySet()) { - long count = numWinningBidsPerSeller.get(seller); + for (Map.Entry entry : numWinningBidsPerSeller.entrySet()) { + long seller = entry.getKey(); + long count = entry.getValue(); long total = totalWinningBidPricesPerSeller.get(seller); addResult(TimestampedValue.of( new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp)); http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java index 11a4d38..52891a7 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java @@ -25,8 +25,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -77,7 +77,7 @@ import org.joda.time.Instant; */ public class WinningBids extends PTransform, PCollection> { /** Windows for open auctions and bids. */ - private static class AuctionOrBidWindow extends IntervalWindow implements Serializable { + private static class AuctionOrBidWindow extends IntervalWindow { /** Id of auction this window is for. */ public final long auction; @@ -104,9 +104,7 @@ public class WinningBids extends PTransform, PCollection, PCollection, PCollection, PCollection, PCollection entry : idToTrueAuctionWindow.entrySet()) { + long auction = entry.getKey(); + AuctionOrBidWindow auctionWindow = entry.getValue(); List bidWindows = idToBidAuctionWindows.get(auction); if (bidWindows != null) { List toBeMerged = new ArrayList<>(); @@ -296,8 +294,8 @@ public class WinningBids extends PTransform, PCollection>, Serializabl @Override public void verifyDeterministic() throws NonDeterministicException {} }; - private long numEvents; - private long wallclockBaseTime; + private final long numEvents; + private final long wallclockBaseTime; private Checkpoint(long numEvents, long wallclockBaseTime) { this.numEvents = numEvents; @@ -403,8 +403,8 @@ public class Generator implements Iterator>, Serializabl if (n < Integer.MAX_VALUE) { return random.nextInt((int) n); } else { - // TODO: Very skewed distribution! Bad! - return Math.abs(random.nextLong()) % n; + // WARNING: Very skewed distribution! Bad! + return Math.abs(random.nextLong() % n); } } @@ -470,14 +470,13 @@ public class Generator implements Iterator>, Serializabl long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES); long initialBid = nextPrice(random); - long dateTime = timestamp; long expires = timestamp + nextAuctionLengthMs(random, timestamp); String name = nextString(random, 20); String desc = nextString(random, 100); long reserve = initialBid + nextPrice(random); int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8; String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize); - return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category, + return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category, extra); } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java index 3caaf51..5799bb2 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java @@ -42,7 +42,7 @@ public class GeneratorConfig implements Serializable { */ public static final int PERSON_PROPORTION = 1; public static final int AUCTION_PROPORTION = 3; - public static final int BID_PROPORTION = 46; + private static final int BID_PROPORTION = 46; public static final int PROPORTION_DENOMINATOR = PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION; @@ -55,12 +55,12 @@ public class GeneratorConfig implements Serializable { * Delay between events, in microseconds. If the array has more than one entry then * the rate is changed every {@link #stepLengthSec}, and wraps around. */ - public final long[] interEventDelayUs; + private final long[] interEventDelayUs; /** * Delay before changing the current inter-event delay. */ - public final long stepLengthSec; + private final long stepLengthSec; /** * Time for first event (ms since epoch). @@ -88,13 +88,13 @@ public class GeneratorConfig implements Serializable { * True period of epoch in milliseconds. Derived from above. * (Ie time to run through cycle for all interEventDelayUs entries). */ - public final long epochPeriodMs; + private final long epochPeriodMs; /** * Number of events per epoch. Derived from above. * (Ie number of events to run through cycle for all interEventDelayUs entries). */ - public final long eventsPerEpoch; + private final long eventsPerEpoch; public GeneratorConfig( NexmarkConfiguration configuration, long baseTime, long firstEventId, @@ -121,10 +121,10 @@ public class GeneratorConfig implements Serializable { long eventsPerEpoch = 0; long epochPeriodMs = 0; if (interEventDelayUs.length > 1) { - for (int i = 0; i < interEventDelayUs.length; i++) { - long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i]; + for (long interEventDelayU : interEventDelayUs) { + long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU; eventsPerEpoch += numEventsForThisCycle; - epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L; + epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L; } } this.eventsPerEpoch = eventsPerEpoch; @@ -248,16 +248,16 @@ public class GeneratorConfig implements Serializable { long epoch = eventNumber / eventsPerEpoch; long n = eventNumber % eventsPerEpoch; long offsetInEpochMs = 0; - for (int i = 0; i < interEventDelayUs.length; i++) { - long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i]; + for (long interEventDelayU : interEventDelayUs) { + long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU; if (n < numEventsForThisCycle) { - long offsetInCycleUs = n * interEventDelayUs[i]; + long offsetInCycleUs = n * interEventDelayU; long timestamp = baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L); - return KV.of(timestamp, interEventDelayUs[i]); + return KV.of(timestamp, interEventDelayU); } n -= numEventsForThisCycle; - offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L; + offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L; } throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java index c3c6eb0..09d945d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java @@ -116,7 +116,7 @@ public class UnboundedEventSource extends UnboundedSource currentEvent; /** Events which have been held back so as to force them to be late. */ - private Queue heldBackEvents = new PriorityQueue<>(); + private final Queue heldBackEvents = new PriorityQueue<>(); public EventReader(Generator generator) { this.generator = generator; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java index 15e17a8..1d04e2a 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java @@ -53,8 +53,8 @@ public class UnboundedEventSourceTest { * confirming reading events match the model events. */ private static class EventIdChecker { - private Set seenPersonIds = new HashSet<>(); - private Set seenAuctionIds = new HashSet<>(); + private final Set seenPersonIds = new HashSet<>(); + private final Set seenAuctionIds = new HashSet<>(); public void add(Event event) { if (event.newAuction != null) { @@ -90,7 +90,6 @@ public class UnboundedEventSourceTest { EventIdChecker checker = new EventIdChecker(); PipelineOptions options = TestPipeline.testingPipelineOptions(); - Pipeline p = TestPipeline.create(options); UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false); UnboundedReader reader = source.createReader(options, null); http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/pom.xml ---------------------------------------------------------------------- diff --git a/integration/pom.xml b/integration/pom.xml index 4839da5..31f293e 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -30,6 +30,20 @@ pom Apache Beam :: Integration Tests + + + release + + + + org.codehaus.mojo + findbugs-maven-plugin + + + + + + java