beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ieme...@apache.org
Subject [31/55] [abbrv] beam git commit: Fix static analysis issues
Date Wed, 23 Aug 2017 17:09:39 GMT
Fix static analysis issues

Restrict access level on classes + other static analysis fixes

Fix findbugs issues (issue #33)
Fix compile after AvroIO, TextIO, PubsubIO and State refactor


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1541fad0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1541fad0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1541fad0

Branch: refs/heads/master
Commit: 1541fad077e47df1d47636fd186a72aa827bbc42
Parents: a39cb80
Author: Ismaël Mejía <iemejia@apache.org>
Authored: Mon May 1 00:54:08 2017 +0200
Committer: Ismaël Mejía <iemejia@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/pom.xml                |   2 +-
 .../beam/integration/nexmark/Monitor.java       |   4 +-
 .../beam/integration/nexmark/NexmarkDriver.java |  12 +-
 .../beam/integration/nexmark/NexmarkRunner.java | 124 +++++++++++--------
 .../beam/integration/nexmark/NexmarkUtils.java  |  34 +++--
 .../beam/integration/nexmark/model/Auction.java |   8 +-
 .../integration/nexmark/model/AuctionCount.java |   6 +-
 .../integration/nexmark/model/AuctionPrice.java |   4 +-
 .../nexmark/model/BidsPerSession.java           |   4 +-
 .../beam/integration/nexmark/model/Done.java    |   2 +-
 .../beam/integration/nexmark/model/Event.java   |  13 --
 .../nexmark/model/IdNameReserve.java            |   6 +-
 .../nexmark/model/NameCityStateId.java          |   8 +-
 .../beam/integration/nexmark/model/Person.java  |   6 +-
 .../integration/nexmark/model/SellerPrice.java  |   2 +-
 .../nexmark/queries/AbstractSimulator.java      |  10 +-
 .../nexmark/queries/NexmarkQuery.java           |  34 ++---
 .../nexmark/queries/NexmarkQueryModel.java      |  17 +--
 .../nexmark/queries/Query0Model.java            |   2 +-
 .../integration/nexmark/queries/Query10.java    |   6 +-
 .../integration/nexmark/queries/Query11.java    |   3 +-
 .../nexmark/queries/Query1Model.java            |   2 +-
 .../integration/nexmark/queries/Query3.java     |  24 ++--
 .../nexmark/queries/Query3Model.java            |   2 +-
 .../nexmark/queries/Query4Model.java            |   5 +-
 .../integration/nexmark/queries/Query5.java     |   4 +-
 .../integration/nexmark/queries/Query6.java     |   4 +-
 .../nexmark/queries/Query6Model.java            |   5 +-
 .../nexmark/queries/WinningBids.java            |  30 +++--
 .../integration/nexmark/sources/Generator.java  |  11 +-
 .../nexmark/sources/GeneratorConfig.java        |  26 ++--
 .../nexmark/sources/UnboundedEventSource.java   |   2 +-
 .../sources/UnboundedEventSourceTest.java       |   5 +-
 integration/pom.xml                             |  14 +++
 34 files changed, 221 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index fb213e9..8a65c0f 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -210,7 +210,7 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+      <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
index cb4d71c..2f0c56a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
@@ -63,8 +63,8 @@ public class Monitor<T extends KnownSize> implements Serializable {
 
   public final String name;
   public final String prefix;
-  final MonitorDoFn doFn;
-  final PTransform<PCollection<? extends T>, PCollection<T>> transform;
+  private final MonitorDoFn doFn;
+  private final PTransform<PCollection<? extends T>, PCollection<T>> transform;
 
   public Monitor(String name, String prefix) {
     this.name = name;

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
index 7d532cc..a982a8d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
@@ -57,7 +57,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
   /**
    * Entry point.
    */
-  public void runAll(OptionT options, NexmarkRunner runner) {
+  void runAll(OptionT options, NexmarkRunner runner) {
     Instant start = Instant.now();
     Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
     Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
@@ -87,7 +87,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
     }
 
     if (!successful) {
-      System.exit(1);
+      throw new RuntimeException("Execution was not successful");
     }
   }
 
@@ -149,8 +149,6 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
 
   /**
    * Print summary  of {@code actual} vs (if non-null) {@code baseline}.
-   *
-   * @throws IOException
    */
   private static void saveSummary(
       @Nullable String summaryFilename,
@@ -227,7 +225,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
       if (actualPerf != null) {
         List<String> errors = actualPerf.errors;
         if (errors == null) {
-          errors = new ArrayList<String>();
+          errors = new ArrayList<>();
           errors.add("NexmarkGoogleRunner returned null errors list");
         }
         for (String error : errors) {
@@ -300,7 +298,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
     NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
       .withValidation()
       .as(NexmarkOptions.class);
-    NexmarkRunner runner = new NexmarkRunner(options);
-    new NexmarkDriver().runAll(options, runner);
+    NexmarkRunner<NexmarkOptions> runner = new NexmarkRunner<>(options);
+    new NexmarkDriver<>().runAll(options, runner);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index a3c4d33..6df76f0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
@@ -65,10 +66,12 @@ import org.apache.beam.integration.nexmark.queries.Query9;
 import org.apache.beam.integration.nexmark.queries.Query9Model;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -77,6 +80,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -91,15 +95,15 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Minimum number of samples needed for 'stead-state' rate calculation.
    */
-  protected static final int MIN_SAMPLES = 9;
+  private static final int MIN_SAMPLES = 9;
   /**
    * Minimum length of time over which to consider samples for 'steady-state' rate calculation.
    */
-  protected static final Duration MIN_WINDOW = Duration.standardMinutes(2);
+  private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
   /**
    * Delay between perf samples.
    */
-  protected static final Duration PERF_DELAY = Duration.standardSeconds(15);
+  private static final Duration PERF_DELAY = Duration.standardSeconds(15);
   /**
    * How long to let streaming pipeline run after all events have been generated and we've
    * seen no activity.
@@ -117,37 +121,37 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * NexmarkOptions shared by all runs.
    */
-  protected final OptionT options;
+  private final OptionT options;
 
   /**
    * Which configuration we are running.
    */
   @Nullable
-  protected NexmarkConfiguration configuration;
+  private NexmarkConfiguration configuration;
 
   /**
    * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
    */
   @Nullable
-  protected Monitor<Event> publisherMonitor;
+  private Monitor<Event> publisherMonitor;
 
   /**
    * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
    */
   @Nullable
-  protected PipelineResult publisherResult;
+  private PipelineResult publisherResult;
 
   /**
    * Result for the main pipeline.
    */
   @Nullable
-  protected PipelineResult mainResult;
+  private PipelineResult mainResult;
 
   /**
    * Query name we are running.
    */
   @Nullable
-  protected String queryName;
+  private String queryName;
 
   public NexmarkRunner(OptionT options) {
     this.options = options;
@@ -160,7 +164,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Is this query running in streaming mode?
    */
-  protected boolean isStreaming() {
+  private boolean isStreaming() {
     return options.isStreaming();
   }
 
@@ -174,7 +178,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Return maximum number of workers.
    */
-  protected int maxNumWorkers() {
+  private int maxNumWorkers() {
     return 5;
   }
 
@@ -182,7 +186,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
    * Return the current value for a long counter, or a default value if can't be retrieved.
    * Note this uses only attempted metrics because some runners don't support committed metrics.
    */
-  protected long getCounterMetric(PipelineResult result, String namespace, String name,
+  private long getCounterMetric(PipelineResult result, String namespace, String name,
     long defaultValue) {
     //TODO Ismael calc this only once
     MetricQueryResults metrics = result.metrics().queryMetrics(
@@ -201,7 +205,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
    * Return the current value for a long counter, or a default value if can't be retrieved.
    * Note this uses only attempted metrics because some runners don't support committed metrics.
    */
-  protected long getDistributionMetric(PipelineResult result, String namespace, String name,
+  private long getDistributionMetric(PipelineResult result, String namespace, String name,
       DistributionType distType, long defaultValue) {
     MetricQueryResults metrics = result.metrics().queryMetrics(
         MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
@@ -226,7 +230,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Return the current value for a time counter, or -1 if can't be retrieved.
    */
-  protected long getTimestampMetric(long now, long value) {
+  private long getTimestampMetric(long now, long value) {
     //TODO Ismael improve doc
     if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
       return -1;
@@ -238,8 +242,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
    * Find a 'steady state' events/sec from {@code snapshots} and
    * store it in {@code perf} if found.
    */
-  protected void captureSteadyState(NexmarkPerf perf,
-                                    List<NexmarkPerf.ProgressSnapshot> snapshots) {
+  private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
     if (!options.isStreaming()) {
       return;
     }
@@ -426,7 +429,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     return perf;
   }
 
-  protected String getJobId(PipelineResult job) {
+  private String getJobId(PipelineResult job) {
     return "";
   }
 
@@ -528,15 +531,14 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Build and run a pipeline using specified options.
    */
-  protected interface PipelineBuilder<OptionT extends NexmarkOptions> {
+  interface PipelineBuilder<OptionT extends NexmarkOptions> {
     void build(OptionT publishOnlyOptions);
   }
 
   /**
    * Invoke the builder with options suitable for running a publish-only child pipeline.
    */
-  protected void invokeBuilderForPublishOnlyPipeline(
-      PipelineBuilder builder) {
+  private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
     builder.build(options);
 //    throw new UnsupportedOperationException(
 //        "Cannot use --pubSubMode=COMBINED with DirectRunner");
@@ -546,7 +548,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
    * If monitoring, wait until the publisher pipeline has run long enough to establish
    * a backlog on the Pubsub topic. Otherwise, return immediately.
    */
-  protected void waitForPublisherPreload() {
+  private void waitForPublisherPreload() {
     throw new UnsupportedOperationException();
   }
 
@@ -555,7 +557,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
    * it was measured.
    */
   @Nullable
-  protected NexmarkPerf monitor(NexmarkQuery query) {
+  private NexmarkPerf monitor(NexmarkQuery query) {
     if (!options.getMonitorJobs()) {
       return null;
     }
@@ -841,14 +843,28 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
   private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
     String shortSubscription = shortSubscription(now);
     NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
-    PubsubIO.Read<Event> io =
-        PubsubIO.<Event>read().fromSubscription(shortSubscription)
-            .withIdAttribute(NexmarkUtils.PUBSUB_ID)
-            .withCoder(Event.CODER);
+
+    PubsubIO.Read<PubsubMessage> io =
+        PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription)
+            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
     if (!configuration.usePubsubPublishTime) {
       io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
     }
-    return p.apply(queryName + ".ReadPubsubEvents", io);
+
+    return p
+      .apply(queryName + ".ReadPubsubEvents", io)
+      .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          byte[] payload = c.element().getPayload();
+          try {
+            Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
+            c.output(event);
+          } catch (CoderException e) {
+            // TODO Log decoding Event error
+          }
+        }
+      }));
   }
 
   /**
@@ -861,9 +877,8 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     }
     NexmarkUtils.console("Reading events from Avro files at %s", filename);
     return p
-        .apply(queryName + ".ReadAvroEvents", AvroIO.Read
-                          .from(filename + "*.avro")
-                          .withSchema(Event.class))
+        .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class)
+                          .from(filename + "*.avro"))
         .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
   }
 
@@ -873,14 +888,28 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
   private void sinkEventsToPubsub(PCollection<Event> events, long now) {
     String shortTopic = shortTopic(now);
     NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
-    PubsubIO.Write<Event> io =
-        PubsubIO.<Event>write().to(shortTopic)
-                      .withIdAttribute(NexmarkUtils.PUBSUB_ID)
-                      .withCoder(Event.CODER);
+
+    PubsubIO.Write<PubsubMessage> io =
+        PubsubIO.writePubsubMessages().to(shortTopic)
+            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
     if (!configuration.usePubsubPublishTime) {
       io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
     }
-    events.apply(queryName + ".WritePubsubEvents", io);
+
+    events.apply(queryName + ".EventToPubsubMessage",
+            ParDo.of(new DoFn<Event, PubsubMessage>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                try {
+                  byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+                  c.output(new PubsubMessage(payload, new HashMap<String, String>()));
+                } catch (CoderException e1) {
+                  // TODO Log encoding Event error
+                }
+              }
+            })
+        )
+        .apply(queryName + ".WritePubsubEvents", io);
   }
 
   /**
@@ -890,7 +919,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     String shortTopic = shortTopic(now);
     NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
     PubsubIO.Write<String> io =
-        PubsubIO.<String>write().to(shortTopic)
+        PubsubIO.writeStrings().to(shortTopic)
             .withIdAttribute(NexmarkUtils.PUBSUB_ID);
     if (!configuration.usePubsubPublishTime) {
       io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
@@ -917,18 +946,16 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     }
     NexmarkUtils.console("Writing events to Avro files at %s", filename);
     source.apply(queryName + ".WriteAvroEvents",
-            AvroIO.Write.to(filename + "/event").withSuffix(".avro").withSchema(Event.class));
+            AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
     source.apply(NexmarkQuery.JUST_BIDS)
           .apply(queryName + ".WriteAvroBids",
-            AvroIO.Write.to(filename + "/bid").withSuffix(".avro").withSchema(Bid.class));
+            AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
     source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
           .apply(queryName + ".WriteAvroAuctions",
-                  AvroIO.Write.to(filename + "/auction").withSuffix(".avro")
-                          .withSchema(Auction.class));
+            AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro"));
     source.apply(NexmarkQuery.JUST_NEW_PERSONS)
           .apply(queryName + ".WriteAvroPeople",
-                  AvroIO.Write.to(filename + "/person").withSuffix(".avro")
-                             .withSchema(Person.class));
+            AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro"));
   }
 
   /**
@@ -938,7 +965,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     String filename = textFilename(now);
     NexmarkUtils.console("Writing results to text files at %s", filename);
     formattedResults.apply(queryName + ".WriteTextResults",
-        TextIO.Write.to(filename));
+        TextIO.write().to(filename));
   }
 
   private static class StringToTableRow extends DoFn<String, TableRow> {
@@ -1010,12 +1037,12 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
             // Send synthesized events to Pubsub in separate publisher job.
             // We won't start the main pipeline until the publisher has sent the pre-load events.
             // We'll shutdown the publisher job when we notice the main job has finished.
-            invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() {
+            invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() {
               @Override
               public void build(NexmarkOptions publishOnlyOptions) {
                 Pipeline sp = Pipeline.create(options);
                 NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
-                publisherMonitor = new Monitor<Event>(queryName, "publisher");
+                publisherMonitor = new Monitor<>(queryName, "publisher");
                 sinkEventsToPubsub(
                     sourceEventsFromSynthetic(sp)
                             .apply(queryName + ".Monitor", publisherMonitor.getTransform()),
@@ -1140,9 +1167,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     checkState(queryName == null);
     configuration = runConfiguration;
 
-    // GCS URI patterns to delete on exit.
-    List<String> pathsToDelete = new ArrayList<>();
-
     try {
       NexmarkUtils.console("Running %s", configuration.toShortString());
 
@@ -1220,9 +1244,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
           }
           ((Query10) query).setOutputPath(path);
           ((Query10) query).setMaxNumWorkers(maxNumWorkers());
-          if (path != null && options.getManageResources()) {
-            pathsToDelete.add(path + "/**");
-          }
         }
 
         // Apply query.
@@ -1252,7 +1273,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     } finally {
       configuration = null;
       queryName = null;
-      // TODO: Cleanup pathsToDelete
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index 18589c4..f6215e9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -55,6 +55,9 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -63,9 +66,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -178,7 +178,7 @@ public class NexmarkUtils {
     /** Names are suffixed with the query being run. */
     QUERY,
     /** Names are suffixed with the query being run and a random number. */
-    QUERY_AND_SALT;
+    QUERY_AND_SALT
   }
 
   /**
@@ -310,7 +310,7 @@ public class NexmarkUtils {
    * Log message to console. For client side only.
    */
   public static void console(String format, Object... args) {
-    System.out.printf("%s %s\n", Instant.now(), String.format(format, args));
+    System.out.printf("%s %s%n", Instant.now(), String.format(format, args));
   }
 
   /**
@@ -326,7 +326,7 @@ public class NexmarkUtils {
   /**
    * All events will be given a timestamp relative to this time (ms since epoch).
    */
-  public static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();
+  private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();
 
   /**
    * Instants guaranteed to be strictly before and after all event timestamps, and which won't
@@ -377,7 +377,7 @@ public class NexmarkUtils {
   /**
    * Return a generator config to match the given {@code options}.
    */
-  public static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
+  private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
     return new GeneratorConfig(configuration,
                                configuration.useWallclockEventTime ? System.currentTimeMillis()
                                                                    : BASE_TIME, 0,
@@ -558,15 +558,14 @@ public class NexmarkUtils {
                         }
                         p++;
                       }
-                      long next = System.currentTimeMillis();
-                      now = next;
+                      now = System.currentTimeMillis();
                     }
                     c.output(c.element());
                   }
                 });
   }
 
-  private static final StateSpec<Object, ValueState<byte[]>> DUMMY_TAG =
+  private static final StateSpec<ValueState<byte[]>> DUMMY_TAG =
           StateSpecs.value(ByteArrayCoder.of());
   private static final int MAX_BUFFER_SIZE = 1 << 24;
 
@@ -578,20 +577,19 @@ public class NexmarkUtils {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     long remain = bytes;
-                    long start = System.currentTimeMillis();
-                    long now = start;
+//                    long now = System.currentTimeMillis();
                     while (remain > 0) {
+                      //TODO Ismael google on state
                       long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
                       remain -= thisBytes;
-                      byte[] arr = new byte[(int) thisBytes];
-                      for (int i = 0; i < thisBytes; i++) {
-                        arr[i] = (byte) now;
-                      }
-                      //TODO Ismael google on state
+//                      byte[] arr = new byte[(int) thisBytes];
+//                      for (int i = 0; i < thisBytes; i++) {
+//                        arr[i] = (byte) now;
+//                      }
 //                      ValueState<byte[]> state = c.windowingInternals().stateInternals().state(
 //                          StateNamespaces.global(), DUMMY_TAG);
 //                      state.write(arr);
-                      now = System.currentTimeMillis();
+//                      now = System.currentTimeMillis();
                     }
                     c.output(c.element());
                   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
index 4b1a848..5c018dc 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
@@ -81,14 +81,14 @@ public class Auction implements KnownSize, Serializable {
 
   /** Extra auction properties. */
   @JsonProperty
-  public final String itemName;
+  private final String itemName;
 
   @JsonProperty
-  public final String description;
+  private final String description;
 
   /** Initial bid price, in cents. */
   @JsonProperty
-  public final long initialBid;
+  private final long initialBid;
 
   /** Reserve price, in cents. */
   @JsonProperty
@@ -111,7 +111,7 @@ public class Auction implements KnownSize, Serializable {
 
   /** Additional arbitrary payload for performance testing. */
   @JsonProperty
-  public final String extra;
+  private final String extra;
 
 
   // For Avro only.

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
index e6d3450..c83a455 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
@@ -54,11 +54,9 @@ public class AuctionCount implements KnownSize, Serializable {
     }
   };
 
-  @JsonProperty
-  public final long auction;
+  @JsonProperty private final long auction;
 
-  @JsonProperty
-  public final long count;
+  @JsonProperty private final long count;
 
   // For Avro only.
   @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
index cb971e2..43d0b27 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
@@ -55,11 +55,11 @@ public class AuctionPrice implements KnownSize, Serializable {
   };
 
   @JsonProperty
-  public final long auction;
+  private final long auction;
 
   /** Price in cents. */
   @JsonProperty
-  public final long price;
+  private final long price;
 
   // For Avro only.
   @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
index 26b6a41..6dddf34 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
@@ -56,10 +56,10 @@ public class BidsPerSession implements KnownSize, Serializable {
   };
 
   @JsonProperty
-  public final long personId;
+  private final long personId;
 
   @JsonProperty
-  public final long bidsPerSession;
+  private final long bidsPerSession;
 
   public BidsPerSession() {
     personId = 0;

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
index 42999cd..0c14e8f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
@@ -54,7 +54,7 @@ public class Done implements KnownSize, Serializable {
   };
 
   @JsonProperty
-  public final String message;
+  private final String message;
 
   // For Avro only.
   @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
index e2130c9..1f1f096 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -138,19 +138,6 @@ public class Event implements KnownSize, Serializable {
     }
   }
 
-  /**
-   * Remove {@code annotation} from event. (Used for debugging.)
-   */
-  public Event withoutAnnotation(String annotation) {
-    if (newPerson != null) {
-      return new Event(newPerson.withoutAnnotation(annotation));
-    } else if (newAuction != null) {
-      return new Event(newAuction.withoutAnnotation(annotation));
-    } else {
-      return new Event(bid.withoutAnnotation(annotation));
-    }
-  }
-
   @Override
   public long sizeInBytes() {
     if (newPerson != null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
index cf1e571..17b8c4a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
@@ -60,14 +60,14 @@ public class IdNameReserve implements KnownSize, Serializable {
   };
 
   @JsonProperty
-  public final long id;
+  private final long id;
 
   @JsonProperty
-  public final String name;
+  private final String name;
 
   /** Reserve price in cents. */
   @JsonProperty
-  public final long reserve;
+  private final long reserve;
 
   // For Avro only.
   @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
index 86d1738..28f25cd 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
@@ -62,16 +62,16 @@ public class NameCityStateId implements KnownSize, Serializable {
   };
 
   @JsonProperty
-  public final String name;
+  private final String name;
 
   @JsonProperty
-  public final String city;
+  private final String city;
 
   @JsonProperty
-  public final String state;
+  private final String state;
 
   @JsonProperty
-  public final long id;
+  private final long id;
 
   // For Avro only.
   @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
index 906df94..c690fd4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
@@ -77,10 +77,10 @@ public class Person implements KnownSize, Serializable {
   public final String name;
 
   @JsonProperty
-  public final String emailAddress;
+  private final String emailAddress;
 
   @JsonProperty
-  public final String creditCard;
+  private final String creditCard;
 
   @JsonProperty
   public final String city;
@@ -93,7 +93,7 @@ public class Person implements KnownSize, Serializable {
 
   /** Additional arbitrary payload for performance testing. */
   @JsonProperty
-  public final String extra;
+  private final String extra;
 
   // For Avro only.
   @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
index 68f2697..52ff540 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
@@ -60,7 +60,7 @@ public class SellerPrice implements KnownSize, Serializable {
 
   /** Price in cents. */
   @JsonProperty
-  public final long price;
+  private final long price;
 
   // For Avro only.
   @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
index 270b5c3..1395182 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
@@ -37,7 +37,7 @@ import org.joda.time.Instant;
  */
 public abstract class AbstractSimulator<InputT, OutputT> {
   /** Window size for action bucket sampling. */
-  public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
+  private static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
 
   /** Input event stream we should draw from. */
   private final Iterator<TimestampedValue<InputT>> input;
@@ -77,7 +77,7 @@ public abstract class AbstractSimulator<InputT, OutputT> {
 
   /** Called by implementors of {@link #run}: Fetch the next input element. */
   @Nullable
-  protected TimestampedValue<InputT> nextInput() {
+  TimestampedValue<InputT> nextInput() {
     if (!input.hasNext()) {
       return null;
     }
@@ -90,7 +90,7 @@ public abstract class AbstractSimulator<InputT, OutputT> {
    * Called by implementors of {@link #run}:  Capture an intermediate result, for the purpose of
    * recording the expected activity of the query over time.
    */
-  protected void addIntermediateResult(TimestampedValue<OutputT> result) {
+  void addIntermediateResult(TimestampedValue<OutputT> result) {
     NexmarkUtils.info("intermediate result: %s", result);
     updateCounts(result.getTimestamp());
   }
@@ -99,7 +99,7 @@ public abstract class AbstractSimulator<InputT, OutputT> {
    * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
    * semantic correctness.
    */
-  protected void addResult(TimestampedValue<OutputT> result) {
+  void addResult(TimestampedValue<OutputT> result) {
     NexmarkUtils.info("result: %s", result);
     pendingResults.add(result);
     updateCounts(result.getTimestamp());
@@ -121,7 +121,7 @@ public abstract class AbstractSimulator<InputT, OutputT> {
   }
 
   /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
-  protected void allDone() {
+  void allDone() {
     isDone = true;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
index 0796ce5..09415c0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
@@ -46,10 +46,10 @@ public abstract class NexmarkQuery
     extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
   public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
   public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
-  protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
+  static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
 
   /** Predicate to detect a new person event. */
-  protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
+  private static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
       new SerializableFunction<Event, Boolean>() {
         @Override
         public Boolean apply(Event event) {
@@ -58,7 +58,7 @@ public abstract class NexmarkQuery
       };
 
   /** DoFn to convert a new person event to a person. */
-  protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
+  private static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().newPerson);
@@ -66,7 +66,7 @@ public abstract class NexmarkQuery
   };
 
   /** Predicate to detect a new auction event. */
-  protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
+  private static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
       new SerializableFunction<Event, Boolean>() {
         @Override
         public Boolean apply(Event event) {
@@ -75,7 +75,7 @@ public abstract class NexmarkQuery
       };
 
   /** DoFn to convert a new auction event to an auction. */
-  protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
+  private static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().newAuction);
@@ -83,7 +83,7 @@ public abstract class NexmarkQuery
   };
 
   /** Predicate to detect a new bid event. */
-  protected static final SerializableFunction<Event, Boolean> IS_BID =
+  private static final SerializableFunction<Event, Boolean> IS_BID =
       new SerializableFunction<Event, Boolean>() {
         @Override
         public Boolean apply(Event event) {
@@ -92,7 +92,7 @@ public abstract class NexmarkQuery
       };
 
   /** DoFn to convert a bid event to a bid. */
-  protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
+  private static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().bid);
@@ -100,7 +100,7 @@ public abstract class NexmarkQuery
   };
 
   /** Transform to key each person by their id. */
-  protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
+  static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
       ParDo.of(new DoFn<Person, KV<Long, Person>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -109,7 +109,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to key each auction by its id. */
-  protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
+  static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
       ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -118,7 +118,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to key each auction by its seller id. */
-  protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
+  static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
       ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -127,7 +127,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to key each bid by it's auction id. */
-  protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+  static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
       ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -136,7 +136,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to project the auction id from each bid. */
-  protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
+  static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
       ParDo.of(new DoFn<Bid, Long>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -145,7 +145,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to project the price from each bid. */
-  protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
+  static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
       ParDo.of(new DoFn<Bid, Long>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -205,13 +205,13 @@ public abstract class NexmarkQuery
         }
       };
 
-  protected final NexmarkConfiguration configuration;
+  final NexmarkConfiguration configuration;
   public final Monitor<Event> eventMonitor;
   public final Monitor<KnownSize> resultMonitor;
-  public final Monitor<Event> endOfStreamMonitor;
-  protected final Counter fatalCounter;
+  private final Monitor<Event> endOfStreamMonitor;
+  private final Counter fatalCounter;
 
-  protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
+  NexmarkQuery(NexmarkConfiguration configuration, String name) {
     super(name);
     this.configuration = configuration;
     if (configuration.debug) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
index 1ad9099..bfa668b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
@@ -43,7 +43,7 @@ import org.junit.Assert;
 public abstract class NexmarkQueryModel implements Serializable {
   public final NexmarkConfiguration configuration;
 
-  public NexmarkQueryModel(NexmarkConfiguration configuration) {
+  NexmarkQueryModel(NexmarkConfiguration configuration) {
     this.configuration = configuration;
   }
 
@@ -51,7 +51,7 @@ public abstract class NexmarkQueryModel implements Serializable {
    * Return the start of the most recent window of {@code size} and {@code period} which ends
    * strictly before {@code timestamp}.
    */
-  public static Instant windowStart(Duration size, Duration period, Instant timestamp) {
+  static Instant windowStart(Duration size, Duration period, Instant timestamp) {
     long ts = timestamp.getMillis();
     long p = period.getMillis();
     long lim = ts - ts % p;
@@ -60,7 +60,7 @@ public abstract class NexmarkQueryModel implements Serializable {
   }
 
   /** Convert {@code itr} to strings capturing values, timestamps and order. */
-  protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
+  static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
     List<String> strings = new ArrayList<>();
     while (itr.hasNext()) {
       strings.add(itr.next().toString());
@@ -69,7 +69,7 @@ public abstract class NexmarkQueryModel implements Serializable {
   }
 
   /** Convert {@code itr} to strings capturing values and order. */
-  protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
+  static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
     List<String> strings = new ArrayList<>();
     while (itr.hasNext()) {
       strings.add(itr.next().getValue().toString());
@@ -78,7 +78,7 @@ public abstract class NexmarkQueryModel implements Serializable {
   }
 
   /** Convert {@code itr} to strings capturing values only. */
-  protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
+  static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
     Set<String> strings = new HashSet<>();
     while (itr.hasNext()) {
       strings.add(itr.next().getValue().toString());
@@ -90,7 +90,7 @@ public abstract class NexmarkQueryModel implements Serializable {
   public abstract AbstractSimulator<?, ?> simulator();
 
   /** Return sub-sequence of results which are significant for model. */
-  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+  Iterable<TimestampedValue<KnownSize>> relevantResults(
       Iterable<TimestampedValue<KnownSize>> results) {
     return results;
   }
@@ -104,8 +104,6 @@ public abstract class NexmarkQueryModel implements Serializable {
   /** Return assertion to use on results of pipeline for this query. */
   public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
     final Collection<String> expectedStrings = toCollection(simulator().results());
-    final String[] expectedStringsArray =
-      expectedStrings.toArray(new String[expectedStrings.size()]);
 
     return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
       @Override
@@ -113,9 +111,6 @@ public abstract class NexmarkQueryModel implements Serializable {
       Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
         Assert.assertThat("wrong pipeline output", actualStrings,
           IsEqual.equalTo(expectedStrings));
-//compare without order
-//      Assert.assertThat("wrong pipeline output", actualStrings,
-//        IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
         return null;
       }
     };

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
index 6fb6613..8e65591 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
@@ -32,7 +32,7 @@ public class Query0Model extends NexmarkQueryModel {
   /**
    * Simulator for query 0.
    */
-  private class Simulator extends AbstractSimulator<Event, Event> {
+  private static class Simulator extends AbstractSimulator<Event, Event> {
     public Simulator(NexmarkConfiguration configuration) {
       super(NexmarkUtils.standardEventIterator(configuration));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index c919691..516dab1 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -35,7 +35,7 @@ import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -101,7 +101,7 @@ public class Query10 extends NexmarkQuery {
 
     @Override
     public String toString() {
-      return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename);
+      return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename);
     }
   }
 
@@ -130,8 +130,6 @@ public class Query10 extends NexmarkQuery {
 
   /**
    * Return channel for writing bytes to GCS.
-   *
-   * @throws IOException
    */
   private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
index fd936a9..6db9bcf 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
@@ -63,14 +63,13 @@ public class Query11 extends NexmarkQuery {
                 Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
             .discardingFiredPanes()
             .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)));
-    PCollection<BidsPerSession> bidsPerSession = biddersWindowed.apply(Count.<Long>perElement())
+    return biddersWindowed.apply(Count.<Long>perElement())
         .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
 
           @ProcessElement public void processElement(ProcessContext c) {
             c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
           }
         }));
-    return bidsPerSession;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
index 0388687..5d4de45 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
@@ -34,7 +34,7 @@ public class Query1Model extends NexmarkQueryModel implements Serializable {
   /**
    * Simulator for query 1.
    */
-  private class Simulator extends AbstractSimulator<Event, Bid> {
+  private static class Simulator extends AbstractSimulator<Event, Bid> {
     public Simulator(NexmarkConfiguration configuration) {
       super(NexmarkUtils.standardEventIterator(configuration));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index 71364ba..f74b78d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.integration.nexmark.queries;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
@@ -30,6 +29,13 @@ import org.apache.beam.integration.nexmark.model.Person;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -41,13 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -176,18 +175,18 @@ public class Query3 extends NexmarkQuery {
    */
   private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
 
-    private int maxAuctionsWaitingTime;
+    private final int maxAuctionsWaitingTime;
     private static final String AUCTIONS = "auctions";
     private static final String PERSON = "person";
 
     @StateId(PERSON)
-    private static final StateSpec<Object, ValueState<Person>> personSpec =
+    private static final StateSpec<ValueState<Person>> personSpec =
         StateSpecs.value(Person.CODER);
 
     private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
 
     @StateId(AUCTIONS)
-    private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec =
+    private final StateSpec<ValueState<List<Auction>>> auctionsSpec =
         StateSpecs.value(ListCoder.of(Auction.CODER));
 
     @TimerId(PERSON_STATE_EXPIRING)
@@ -219,8 +218,7 @@ public class Query3 extends NexmarkQuery {
         ProcessContext c,
         @TimerId(PERSON_STATE_EXPIRING) Timer timer,
         @StateId(PERSON) ValueState<Person> personState,
-        @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState)
-        throws IOException {
+        @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) {
       // We would *almost* implement this by  rewindowing into the global window and
       // running a combiner over the result. The combiner's accumulator would be the
       // state we use below. However, combiners cannot emit intermediate results, thus

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
index 6b98e2a..f415709 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
@@ -42,7 +42,7 @@ public class Query3Model extends NexmarkQueryModel implements Serializable {
   /**
    * Simulator for query 3.
    */
-  private class Simulator extends AbstractSimulator<Event, NameCityStateId> {
+  private static class Simulator extends AbstractSimulator<Event, NameCityStateId> {
     /** Auctions, indexed by seller id. */
     private final Multimap<Long, Auction> newAuctions;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
index 634a58e..269e47a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
@@ -93,8 +93,9 @@ public class Query4Model extends NexmarkQueryModel implements Serializable {
         }
         totals.put(category, total);
       }
-      for (long category : counts.keySet()) {
-        long count = counts.get(category);
+      for (Map.Entry<Long, Long> entry : counts.entrySet()) {
+        long category = entry.getKey();
+        long count = entry.getValue();
         long total = totals.get(category);
         TimestampedValue<CategoryPrice> result = TimestampedValue.of(
             new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp);

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
index 18ce578..1944330 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
@@ -18,7 +18,7 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
@@ -80,7 +80,7 @@ public class Query5 extends NexmarkQuery {
             ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
-                    c.output(KV.of(Arrays.asList(c.element().getKey()), c.element().getValue()));
+                    c.output(KV.of(Collections.singletonList(c.element().getKey()), c.element().getValue()));
                   }
                 }))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
index 65789ab..ea39ede 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
@@ -86,9 +86,7 @@ public class Query6 extends NexmarkQuery {
     public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) {
       List<Bid> result = new ArrayList<>();
       for (List<Bid> accumulator : accumulators) {
-        for (Bid bid : accumulator) {
-          result.add(bid);
-        }
+        result.addAll(accumulator);
       }
       Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE);
       if (result.size() > maxNumBids) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
index 0691714..9cb8b3d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
@@ -86,8 +86,9 @@ public class Query6Model extends NexmarkQueryModel implements Serializable {
     protected void run() {
       TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
       if (timestampedWinningBid == null) {
-        for (long seller : numWinningBidsPerSeller.keySet()) {
-          long count = numWinningBidsPerSeller.get(seller);
+        for (Map.Entry<Long, Long> entry : numWinningBidsPerSeller.entrySet()) {
+          long seller = entry.getKey();
+          long count = entry.getValue();
           long total = totalWinningBidPricesPerSeller.get(seller);
           addResult(TimestampedValue.of(
               new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp));

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
index 11a4d38..52891a7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
@@ -25,8 +25,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -77,7 +77,7 @@ import org.joda.time.Instant;
  */
 public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
   /** Windows for open auctions and bids. */
-  private static class AuctionOrBidWindow extends IntervalWindow implements Serializable {
+  private static class AuctionOrBidWindow extends IntervalWindow {
     /** Id of auction this window is for. */
     public final long auction;
 
@@ -104,9 +104,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
 
     /** Return an auction window for {@code auction}. */
     public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
-      AuctionOrBidWindow result =
-          new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
-      return result;
+      return new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
     }
 
     /**
@@ -127,9 +125,8 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
       // Instead, we will just give the bid a finite window which expires at
       // the upper bound of auctions assuming the auction starts at the same time as the bid,
       // and assuming the system is running at its lowest event rate (as per interEventDelayUs).
-      AuctionOrBidWindow result = new AuctionOrBidWindow(
+      return new AuctionOrBidWindow(
           timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
-      return result;
     }
 
     /** Is this an auction window? */
@@ -171,8 +168,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
         throws IOException, CoderException {
       IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
       long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
-      boolean isAuctionWindow =
-          INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true;
+      boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) != 0;
       return new AuctionOrBidWindow(
           superWindow.start(), superWindow.end(), auction, isAuctionWindow);
     }
@@ -194,15 +190,16 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
       Event event = c.element();
       if (event.newAuction != null) {
         // Assign auctions to an auction window which expires at the auction's close.
-        return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
+        return Collections
+            .singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
       } else if (event.bid != null) {
         // Assign bids to a temporary bid window which will later be merged into the appropriate
         // auction window.
-        return Arrays.asList(
+        return Collections.singletonList(
             AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid));
       } else {
         // Don't assign people to any window. They will thus be dropped.
-        return Arrays.asList();
+        return Collections.emptyList();
       }
     }
 
@@ -226,8 +223,9 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
 
       // Merge all 'bid' windows into their corresponding 'auction' window, provided the
       // auction has not expired.
-      for (long auction : idToTrueAuctionWindow.keySet()) {
-        AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction);
+      for (Map.Entry<Long, AuctionOrBidWindow> entry : idToTrueAuctionWindow.entrySet()) {
+        long auction = entry.getKey();
+        AuctionOrBidWindow auctionWindow = entry.getValue();
         List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction);
         if (bidWindows != null) {
           List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
@@ -296,8 +294,8 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
         configuration.firstEventRate, configuration.nextEventRate,
         configuration.rateUnit, configuration.numEventGenerators);
     long longestDelayUs = 0;
-    for (int i = 0; i < interEventDelayUs.length; i++) {
-      longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]);
+    for (long interEventDelayU : interEventDelayUs) {
+      longestDelayUs = Math.max(longestDelayUs, interEventDelayU);
     }
     // Adjust for proportion of auction events amongst all events.
     longestDelayUs =

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
index 012d4e6..2a2732b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
@@ -123,8 +123,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
           @Override public void verifyDeterministic() throws NonDeterministicException {}
         };
 
-    private long numEvents;
-    private long wallclockBaseTime;
+    private final long numEvents;
+    private final long wallclockBaseTime;
 
     private Checkpoint(long numEvents, long wallclockBaseTime) {
       this.numEvents = numEvents;
@@ -403,8 +403,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
     if (n < Integer.MAX_VALUE) {
       return random.nextInt((int) n);
     } else {
-      // TODO: Very skewed distribution! Bad!
-      return Math.abs(random.nextLong()) % n;
+      // WARNING: Very skewed distribution! Bad!
+      return Math.abs(random.nextLong() % n);
     }
   }
 
@@ -470,14 +470,13 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
 
     long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
     long initialBid = nextPrice(random);
-    long dateTime = timestamp;
     long expires = timestamp + nextAuctionLengthMs(random, timestamp);
     String name = nextString(random, 20);
     String desc = nextString(random, 100);
     long reserve = initialBid + nextPrice(random);
     int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
     String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
-    return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category,
+    return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
         extra);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
index 3caaf51..5799bb2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
@@ -42,7 +42,7 @@ public class GeneratorConfig implements Serializable {
    */
   public static final int PERSON_PROPORTION = 1;
   public static final int AUCTION_PROPORTION = 3;
-  public static final int BID_PROPORTION = 46;
+  private static final int BID_PROPORTION = 46;
   public static final int PROPORTION_DENOMINATOR =
       PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
 
@@ -55,12 +55,12 @@ public class GeneratorConfig implements Serializable {
    * Delay between events, in microseconds. If the array has more than one entry then
    * the rate is changed every {@link #stepLengthSec}, and wraps around.
    */
-  public final long[] interEventDelayUs;
+  private final long[] interEventDelayUs;
 
   /**
    * Delay before changing the current inter-event delay.
    */
-  public final long stepLengthSec;
+  private final long stepLengthSec;
 
   /**
    * Time for first event (ms since epoch).
@@ -88,13 +88,13 @@ public class GeneratorConfig implements Serializable {
    * True period of epoch in milliseconds. Derived from above.
    * (Ie time to run through cycle for all interEventDelayUs entries).
    */
-  public final long epochPeriodMs;
+  private final long epochPeriodMs;
 
   /**
    * Number of events per epoch. Derived from above.
    * (Ie number of events to run through cycle for all interEventDelayUs entries).
    */
-  public final long eventsPerEpoch;
+  private final long eventsPerEpoch;
 
   public GeneratorConfig(
       NexmarkConfiguration configuration, long baseTime, long firstEventId,
@@ -121,10 +121,10 @@ public class GeneratorConfig implements Serializable {
     long eventsPerEpoch = 0;
     long epochPeriodMs = 0;
     if (interEventDelayUs.length > 1) {
-      for (int i = 0; i < interEventDelayUs.length; i++) {
-        long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
+      for (long interEventDelayU : interEventDelayUs) {
+        long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
         eventsPerEpoch += numEventsForThisCycle;
-        epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
+        epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
       }
     }
     this.eventsPerEpoch = eventsPerEpoch;
@@ -248,16 +248,16 @@ public class GeneratorConfig implements Serializable {
     long epoch = eventNumber / eventsPerEpoch;
     long n = eventNumber % eventsPerEpoch;
     long offsetInEpochMs = 0;
-    for (int i = 0; i < interEventDelayUs.length; i++) {
-      long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
+    for (long interEventDelayU : interEventDelayUs) {
+      long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
       if (n < numEventsForThisCycle) {
-        long offsetInCycleUs = n * interEventDelayUs[i];
+        long offsetInCycleUs = n * interEventDelayU;
         long timestamp =
             baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
-        return KV.of(timestamp, interEventDelayUs[i]);
+        return KV.of(timestamp, interEventDelayU);
       }
       n -= numEventsForThisCycle;
-      offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
+      offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
     }
     throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
index c3c6eb0..09d945d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
@@ -116,7 +116,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
     private TimestampedValue<Event> currentEvent;
 
     /** Events which have been held back so as to force them to be late. */
-    private Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
+    private final Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
 
     public EventReader(Generator generator) {
       this.generator = generator;

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
index 15e17a8..1d04e2a 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
@@ -53,8 +53,8 @@ public class UnboundedEventSourceTest {
    * confirming reading events match the model events.
    */
   private static class EventIdChecker {
-    private Set<Long> seenPersonIds = new HashSet<>();
-    private Set<Long> seenAuctionIds = new HashSet<>();
+    private final Set<Long> seenPersonIds = new HashSet<>();
+    private final Set<Long> seenAuctionIds = new HashSet<>();
 
     public void add(Event event) {
       if (event.newAuction != null) {
@@ -90,7 +90,6 @@ public class UnboundedEventSourceTest {
 
     EventIdChecker checker = new EventIdChecker();
     PipelineOptions options = TestPipeline.testingPipelineOptions();
-    Pipeline p = TestPipeline.create(options);
     UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false);
     UnboundedReader<Event> reader = source.createReader(options, null);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
index 4839da5..31f293e 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -30,6 +30,20 @@
   <packaging>pom</packaging>
   <name>Apache Beam :: Integration Tests</name>
 
+  <profiles>
+    <profile>
+      <id>release</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>findbugs-maven-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <modules>
     <module>java</module>
   </modules>


Mime
View raw message