Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CB1C8200B9A for ; Fri, 7 Oct 2016 18:53:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CA075160AE8; Fri, 7 Oct 2016 16:53:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1DF54160AC6 for ; Fri, 7 Oct 2016 18:53:48 +0200 (CEST) Received: (qmail 30932 invoked by uid 500); 7 Oct 2016 16:53:48 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 30919 invoked by uid 99); 7 Oct 2016 16:53:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Oct 2016 16:53:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C3E18180298 for ; Fri, 7 Oct 2016 16:53:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id QBlZSYMm-UWR for ; Fri, 7 Oct 2016 16:53:40 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id A06E25FAF6 for ; Fri, 7 Oct 2016 16:53:39 +0000 (UTC) Received: (qmail 30738 invoked by uid 99); 7 Oct 2016 16:53:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Oct 2016 16:53:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C3392DFF03; Fri, 7 Oct 2016 16:53:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Fri, 07 Oct 2016 16:53:38 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: Improve compile-time type checking archived-at: Fri, 07 Oct 2016 16:53:51 -0000 Repository: incubator-beam Updated Branches: refs/heads/master 03b89c065 -> 2492604e4 Improve compile-time type checking * Enables -Xlint:rawtypes except for Spark and Flink runners and the microbenchmarks module * Fixed some warnings in tests * Fixed checkstyle warnings Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3e6fe3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3e6fe3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3e6fe3f Branch: refs/heads/master Commit: e3e6fe3fc4cf462f84271acc8447798760ae770f Parents: 03b89c0 Author: Eugene Kirpichov Authored: Thu Sep 29 14:00:41 2016 -0700 Committer: Dan Halperin Committed: Fri Oct 7 09:49:35 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 1 + .../direct/ImmutableListBundleFactory.java | 2 +- .../beam/runners/direct/ParDoEvaluator.java | 7 +- .../beam/runners/direct/StructuralKey.java | 2 +- .../direct/TestStreamEvaluatorFactory.java | 4 +- .../beam/runners/direct/WatermarkManager.java | 2 +- .../beam/runners/dataflow/DataflowRunner.java | 26 ++----- .../runners/dataflow/internal/IsmFormat.java | 5 +- .../beam/sdk/io/PubsubUnboundedSource.java | 2 +- .../beam/sdk/options/PipelineOptions.java | 9 ++- .../org/apache/beam/sdk/testing/TestStream.java | 2 +- .../beam/sdk/transforms/DoFnAdapters.java | 3 +- .../sdk/transforms/reflect/DoFnInvokers.java | 8 +- .../sdk/transforms/reflect/DoFnSignature.java | 4 +- .../sdk/transforms/reflect/DoFnSignatures.java | 11 ++- .../beam/sdk/coders/CoderRegistryTest.java | 1 + .../sdk/options/PipelineOptionsFactoryTest.java | 6 +- .../beam/sdk/testing/TestPipelineTest.java | 4 +- .../apache/beam/sdk/transforms/CombineTest.java | 77 ++++++++------------ .../apache/beam/sdk/transforms/ParDoTest.java | 4 +- .../display/DisplayDataEvaluator.java | 6 +- .../transforms/reflect/DoFnInvokersTest.java | 2 +- .../beam/sdk/io/hdfs/AvroHDFSFileSource.java | 2 +- .../beam/sdk/io/hdfs/AvroWrapperCoder.java | 2 +- .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 7 +- .../simpleauth/SimpleAuthHDFSFileSource.java | 5 +- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 3 +- .../beam/sdk/io/kinesis/CustomOptional.java | 5 +- 28 files changed, 93 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 67ec3e6..abcc57b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -280,6 +280,7 @@ public class DirectRunner return result; } + @SuppressWarnings("rawtypes") private Map, Collection> defaultModelEnforcements(DirectOptions options) { ImmutableMap.Builder, Collection> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index 53b7e54..4972340 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -111,7 +111,7 @@ class ImmutableListBundleFactory implements BundleFactory { StructuralKey key, Iterable> committedElements, Instant synchronizedCompletionTime) { - return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle( + return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle<>( pcollection, key, committedElements, synchronizedCompletionTime); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index a761289..bbe7827 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -160,15 +160,14 @@ class ParDoEvaluator implements TransformEvaluator { undeclaredOutputs = new HashMap<>(); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) @Override public void output(TupleTag tag, WindowedValue output) { - @SuppressWarnings("rawtypes") UncommittedBundle bundle = bundles.get(tag); if (bundle == null) { - List undeclaredContents = undeclaredOutputs.get(tag); + List> undeclaredContents = (List) undeclaredOutputs.get(tag); if (undeclaredContents == null) { - undeclaredContents = new ArrayList(); + undeclaredContents = new ArrayList<>(); undeclaredOutputs.put(tag, undeclaredContents); } undeclaredContents.add(output); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java index 61332f9..0d0f8e7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java @@ -89,7 +89,7 @@ abstract class StructuralKey { return true; } if (other instanceof CoderStructuralKey) { - CoderStructuralKey that = (CoderStructuralKey) other; + CoderStructuralKey that = (CoderStructuralKey) other; return structuralValue.equals(that.structuralValue); } return false; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index ffb4fb5..4a48a58 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -171,13 +171,13 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { private static class DirectTestStream extends PTransform> { private final TestStream original; - private DirectTestStream(TestStream transform) { + private DirectTestStream(TestStream transform) { this.original = transform; } @Override public PCollection apply(PBegin input) { - PipelineRunner runner = input.getPipeline().getRunner(); + PipelineRunner runner = input.getPipeline().getRunner(); checkState( runner instanceof DirectRunner, "%s can only be used when running with the %s", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index b3d1fc5..21cb427 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -859,7 +859,7 @@ public class WatermarkManager { private void applyPendingUpdate(PendingWatermarkUpdate pending) { CommittedResult result = pending.getResult(); - AppliedPTransform transform = result.getTransform(); + AppliedPTransform transform = result.getTransform(); CommittedBundle inputBundle = pending.getInputBundle(); updatePending(inputBundle, pending.getTimerUpdate(), result); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ceaf6a0..64ac3ad 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2081,24 +2081,17 @@ public class DataflowRunner extends PipelineRunner { static { DataflowPipelineTranslator.registerTransformTranslator( - StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator()); + StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator<>()); } } /** * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. */ - private static class StreamingPubsubIOReadTranslator implements - TransformTranslator { + private static class StreamingPubsubIOReadTranslator implements + TransformTranslator> { @Override - @SuppressWarnings({"rawtypes", "unchecked"}) public void translate( - StreamingPubsubIORead transform, - TranslationContext context) { - translateTyped(transform, context); - } - - private void translateTyped( StreamingPubsubIORead transform, TranslationContext context) { checkArgument(context.getPipelineOptions().isStreaming(), @@ -2157,25 +2150,18 @@ public class DataflowRunner extends PipelineRunner { static { DataflowPipelineTranslator.registerTransformTranslator( - StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator()); + StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator<>()); } } /** * Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node. */ - private static class StreamingPubsubIOWriteTranslator implements - TransformTranslator { + private static class StreamingPubsubIOWriteTranslator implements + TransformTranslator> { @Override - @SuppressWarnings({"rawtypes", "unchecked"}) public void translate( - StreamingPubsubIOWrite transform, - TranslationContext context) { - translateTyped(transform, context); - } - - private void translateTyped( StreamingPubsubIOWrite transform, TranslationContext context) { checkArgument(context.getPipelineOptions().isStreaming(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index 6f4a18b..bb8daf3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -240,8 +240,9 @@ public class IsmFormat { } /** Returns the key coder at the specified index. */ - public Coder getKeyComponentCoder(int index) { - return keyComponentCoders.get(index); + @SuppressWarnings("unchecked") + public Coder getKeyComponentCoder(int index) { + return (Coder) keyComponentCoders.get(index); } /** Returns the value coder. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index 36f154f..7617689 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -303,7 +303,7 @@ public class PubsubUnboundedSource extends PTransform> /** * Return current time according to {@code reader}. */ - private static long now(PubsubReader reader) { + private static long now(PubsubReader reader) { if (reader.outer.outer.clock == null) { return System.currentTimeMillis(); } else { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 701ae70..3e810e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -279,13 +279,14 @@ public interface PipelineOptions extends HasDisplayData { * as the {@link Default}. However, it should still be used if available, and a user is required * to explicitly set the {@code --runner} property if they wish to use an alternative runner. */ - class DirectRunner implements DefaultValueFactory> { + class DirectRunner implements DefaultValueFactory>> { @Override - public Class create(PipelineOptions options) { + public Class> create(PipelineOptions options) { try { @SuppressWarnings({"unchecked", "rawtypes"}) - Class direct = (Class) Class.forName( - "org.apache.beam.runners.direct.DirectRunner"); + Class> direct = + (Class>) + Class.forName("org.apache.beam.runners.direct.DirectRunner"); return direct; } catch (ClassNotFoundException e) { throw new IllegalArgumentException(String.format( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java index e2730ed..509bb24 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java @@ -105,7 +105,7 @@ public final class TestStream extends PTransform> { @SafeVarargs public final Builder addElements(T element, T... elements) { TimestampedValue firstElement = TimestampedValue.of(element, currentWatermark); - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) TimestampedValue[] remainingElements = new TimestampedValue[elements.length]; for (int i = 0; i < elements.length; i++) { remainingElements[i] = TimestampedValue.of(elements[i], currentWatermark); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 7b259aa..3eee74a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -58,8 +58,9 @@ public class DoFnAdapters { } /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ + @SuppressWarnings({"unchecked", "rawtypes"}) public static OldDoFn toOldDoFn(DoFn fn) { - DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass()); + DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature((Class) fn.getClass()); if (signature.processElement().usesSingleWindow()) { return new WindowDoFnAdapter<>(fn); } else { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 041eb60..da88587 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -164,9 +164,11 @@ public class DoFnInvokers { } /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */ + @SuppressWarnings({"unchecked", "rawtypes"}) public DoFnInvoker newByteBuddyInvoker( DoFn fn) { - return newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass()), fn); + return newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature( + (Class) fn.getClass()), fn); } /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */ @@ -198,7 +200,7 @@ public class DoFnInvokers { */ private synchronized Constructor getOrGenerateByteBuddyInvokerConstructor( DoFnSignature signature) { - Class fnClass = signature.fnClass(); + Class> fnClass = signature.fnClass(); Constructor constructor = byteBuddyInvokerConstructorCache.get(fnClass); if (constructor == null) { Class> invokerClass = generateInvokerClass(signature); @@ -214,7 +216,7 @@ public class DoFnInvokers { /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */ private static Class> generateInvokerClass(DoFnSignature signature) { - Class fnClass = signature.fnClass(); + Class> fnClass = signature.fnClass(); final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index b6864da..756df07 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn; @AutoValue public abstract class DoFnSignature { /** Class of the original {@link DoFn} from which this signature was produced. */ - public abstract Class fnClass(); + public abstract Class> fnClass(); /** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */ public abstract ProcessElementMethod processElement(); @@ -60,7 +60,7 @@ public abstract class DoFnSignature { @AutoValue.Builder abstract static class Builder { - abstract Builder setFnClass(Class fnClass); + abstract Builder setFnClass(Class> fnClass); abstract Builder setProcessElement(ProcessElementMethod processElement); abstract Builder setStartBundle(BundleMethod startBundle); abstract Builder setFinishBundle(BundleMethod finishBundle); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 8283788..ad15127 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -49,8 +49,7 @@ public class DoFnSignatures { private final Map, DoFnSignature> signatureCache = new LinkedHashMap<>(); /** @return the {@link DoFnSignature} for the given {@link DoFn}. */ - public synchronized DoFnSignature getOrParseSignature( - @SuppressWarnings("rawtypes") Class fn) { + public synchronized > DoFnSignature getOrParseSignature(Class fn) { DoFnSignature signature = signatureCache.get(fn); if (signature == null) { signatureCache.put(fn, signature = parseSignature(fn)); @@ -59,14 +58,14 @@ public class DoFnSignatures { } /** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */ - private static DoFnSignature parseSignature(Class fnClass) { + private static DoFnSignature parseSignature(Class> fnClass) { DoFnSignature.Builder builder = DoFnSignature.builder(); ErrorReporter errors = new ErrorReporter(null, fnClass.getName()); errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn"); builder.setFnClass(fnClass); - TypeToken fnToken = TypeToken.of(fnClass); + TypeToken> fnToken = TypeToken.of(fnClass); // Extract the input and output type, and whether the fn is bounded. TypeToken inputT = null; @@ -163,7 +162,7 @@ public class DoFnSignatures { @VisibleForTesting static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod( ErrorReporter errors, - TypeToken fnClass, + TypeToken> fnClass, Method m, TypeToken inputT, TypeToken outputT) { @@ -228,7 +227,7 @@ public class DoFnSignatures { @VisibleForTesting static DoFnSignature.BundleMethod analyzeBundleMethod( ErrorReporter errors, - TypeToken fnToken, + TypeToken> fnToken, Method m, TypeToken inputT, TypeToken outputT) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index d690a47..530d755 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -395,6 +395,7 @@ public class CoderRegistryTest { private static class TestGenericClass { } @Test + @SuppressWarnings("rawtypes") public void testSerializableTypeVariableDefaultCoder() throws Exception { CoderRegistry registry = new CoderRegistry(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index f26667f..a9ec7e4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -58,7 +58,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class PipelineOptionsFactoryTest { private static final String DEFAULT_RUNNER_NAME = "DirectRunner"; - private static final Class REGISTERED_RUNNER = + private static final Class> REGISTERED_RUNNER = RegisteredTestRunner.class; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -753,7 +753,9 @@ public class PipelineOptionsFactoryTest { void setString(List value); List getInteger(); void setInteger(List value); + @SuppressWarnings("rawtypes") List getList(); + @SuppressWarnings("rawtypes") void setList(List value); } @@ -1242,7 +1244,7 @@ public class PipelineOptionsFactoryTest { } private static class RegisteredTestRunner extends PipelineRunner { - public static PipelineRunner fromOptions(PipelineOptions options) { + public static PipelineRunner fromOptions(PipelineOptions options) { return new RegisteredTestRunner(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index ed65f15..03563f3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -124,8 +124,8 @@ public class TestPipelineTest { @Test public void testMatcherSerializationDeserialization() { TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class); - SerializableMatcher m1 = new TestMatcher(); - SerializableMatcher m2 = new TestMatcher(); + SerializableMatcher m1 = new TestMatcher(); + SerializableMatcher m2 = new TestMatcher(); opts.setOnCreateMatcher(m1); opts.setOnSuccessMatcher(m2); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index be061af..5ce8055 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -38,6 +38,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -95,34 +96,27 @@ public class CombineTest implements Serializable { // This test is Serializable, just so that it's easy to have // anonymous inner classes inside the non-static test methods. - @SuppressWarnings({"rawtypes", "unchecked"}) - static final KV[] TABLE = new KV[] { + static final List> TABLE = Arrays.asList( KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), - KV.of("b", 13), - }; - - @SuppressWarnings({"rawtypes", "unchecked"}) - static final KV[] EMPTY_TABLE = new KV[] { - }; + KV.of("b", 13) + ); - static final Integer[] NUMBERS = new Integer[] { - 1, 1, 2, 3, 5, 8, 13, 21, 34, 55 - }; + static final List> EMPTY_TABLE = Collections.emptyList(); @Mock private DoFn.ProcessContext processContext; PCollection> createInput(Pipeline p, - KV[] table) { - return p.apply(Create.of(Arrays.asList(table)).withCoder( + List> table) { + return p.apply(Create.of(table).withCoder( KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); } - private void runTestSimpleCombine(KV[] table, + private void runTestSimpleCombine(List> table, int globalSum, - KV[] perKeyCombines) { + List> perKeyCombines) { Pipeline pipeline = TestPipeline.create(); PCollection> input = createInput(pipeline, table); @@ -140,9 +134,9 @@ public class CombineTest implements Serializable { pipeline.run(); } - private void runTestSimpleCombineWithContext(KV[] table, + private void runTestSimpleCombineWithContext(List> table, int globalSum, - KV[] perKeyCombines, + List> perKeyCombines, String[] globallyCombines) { Pipeline pipeline = TestPipeline.create(); PCollection> perKeyInput = createInput(pipeline, table); @@ -174,8 +168,7 @@ public class CombineTest implements Serializable { @Category(RunnableOnService.class) @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombine() { - runTestSimpleCombine(TABLE, 20, new KV[] { - KV.of("a", "114a"), KV.of("b", "113b") }); + runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114a"), KV.of("b", "113b"))); } @Test @@ -183,28 +176,27 @@ public class CombineTest implements Serializable { @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombineWithContext() { runTestSimpleCombineWithContext(TABLE, 20, - new KV[] {KV.of("a", "01124a"), KV.of("b", "01123b") }, + Arrays.asList(KV.of("a", "01124a"), KV.of("b", "01123b")), new String[] {"01111234G"}); } @Test @Category(RunnableOnService.class) - @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombineWithContextEmpty() { - runTestSimpleCombineWithContext(EMPTY_TABLE, 0, new KV[] {}, new String[] {}); + runTestSimpleCombineWithContext( + EMPTY_TABLE, 0, Collections.>emptyList(), new String[] {}); } @Test @Category(RunnableOnService.class) - @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombineEmpty() { - runTestSimpleCombine(EMPTY_TABLE, 0, new KV[] { }); + runTestSimpleCombine(EMPTY_TABLE, 0, Collections.>emptyList()); } @SuppressWarnings("unchecked") - private void runTestBasicCombine(KV[] table, + private void runTestBasicCombine(List> table, Set globalUnique, - KV>[] perKeyUnique) { + List>> perKeyUnique) { Pipeline pipeline = TestPipeline.create(); pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class); PCollection> input = createInput(pipeline, table); @@ -225,23 +217,22 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) - @SuppressWarnings({"rawtypes", "unchecked"}) public void testBasicCombine() { - runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), new KV[] { + runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), Arrays.asList( KV.of("a", (Set) ImmutableSet.of(1, 4)), - KV.of("b", (Set) ImmutableSet.of(1, 13)) }); + KV.of("b", (Set) ImmutableSet.of(1, 13)))); } @Test @Category(RunnableOnService.class) - @SuppressWarnings("rawtypes") public void testBasicCombineEmpty() { - runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), new KV[] { }); + runTestBasicCombine( + EMPTY_TABLE, ImmutableSet.of(), Collections.>>emptyList()); } - private void runTestAccumulatingCombine(KV[] table, + private void runTestAccumulatingCombine(List> table, Double globalMean, - KV[] perKeyMeans) { + List> perKeyMeans) { Pipeline pipeline = TestPipeline.create(); PCollection> input = createInput(pipeline, table); @@ -265,8 +256,7 @@ public class CombineTest implements Serializable { Pipeline pipeline = TestPipeline.create(); PCollection> input = - pipeline.apply(Create.timestamped(Arrays.asList(TABLE), - Arrays.asList(0L, 1L, 6L, 7L, 8L)) + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) .apply(Window.>into(FixedWindows.of(Duration.millis(2)))); @@ -292,8 +282,7 @@ public class CombineTest implements Serializable { Pipeline pipeline = TestPipeline.create(); PCollection> perKeyInput = - pipeline.apply(Create.timestamped(Arrays.asList(TABLE), - Arrays.asList(0L, 1L, 6L, 7L, 8L)) + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) .apply(Window.>into(FixedWindows.of(Duration.millis(2)))); @@ -330,8 +319,7 @@ public class CombineTest implements Serializable { Pipeline pipeline = TestPipeline.create(); PCollection> perKeyInput = - pipeline.apply(Create.timestamped(Arrays.asList(TABLE), - Arrays.asList(2L, 3L, 8L, 9L, 10L)) + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 10L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) .apply(Window.>into(SlidingWindows.of(Duration.millis(2)))); @@ -407,8 +395,7 @@ public class CombineTest implements Serializable { Pipeline pipeline = TestPipeline.create(); PCollection> input = - pipeline.apply(Create.timestamped(Arrays.asList(TABLE), - Arrays.asList(0L, 4L, 7L, 10L, 16L)) + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) .apply(Window.>into(Sessions.withGapDuration(Duration.millis(5)))); @@ -433,8 +420,7 @@ public class CombineTest implements Serializable { Pipeline pipeline = TestPipeline.create(); PCollection> perKeyInput = - pipeline.apply(Create.timestamped(Arrays.asList(TABLE), - Arrays.asList(0L, 4L, 7L, 10L, 16L)) + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); PCollection globallyInput = perKeyInput.apply(Values.create()); @@ -488,14 +474,13 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testAccumulatingCombine() { - runTestAccumulatingCombine(TABLE, 4.0, new KV[] { - KV.of("a", 2.0), KV.of("b", 7.0) }); + runTestAccumulatingCombine(TABLE, 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0))); } @Test @Category(RunnableOnService.class) public void testAccumulatingCombineEmpty() { - runTestAccumulatingCombine(EMPTY_TABLE, 0.0, new KV[] { }); + runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.>emptyList()); } // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 0a4b3cd..7ce98bc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -1461,8 +1461,8 @@ public class ParDoTest implements Serializable { } }; - ParDo.BoundMulti parDo = ParDo - .withOutputTags(new TupleTag(), TupleTagList.empty()) + ParDo.BoundMulti parDo = ParDo + .withOutputTags(new TupleTag(), TupleTagList.empty()) .of(fn); DisplayData displayData = DisplayData.from(parDo); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java index db9aea3..b758ed6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java @@ -117,7 +117,7 @@ public class DisplayDataEvaluator { return displayDataForPipeline(pipeline, root); } - private static Set displayDataForPipeline(Pipeline pipeline, PTransform root) { + private static Set displayDataForPipeline(Pipeline pipeline, PTransform root) { PrimitiveDisplayDataPTransformVisitor visitor = new PrimitiveDisplayDataPTransformVisitor(root); pipeline.traverseTopologically(visitor); return visitor.getPrimitivesDisplayData(); @@ -129,11 +129,11 @@ public class DisplayDataEvaluator { */ private static class PrimitiveDisplayDataPTransformVisitor extends Pipeline.PipelineVisitor.Defaults { - private final PTransform root; + private final PTransform root; private final Set displayData; private boolean inCompositeRoot = false; - PrimitiveDisplayDataPTransformVisitor(PTransform root) { + PrimitiveDisplayDataPTransformVisitor(PTransform root) { this.root = root; this.displayData = Sets.newHashSet(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 97d810c..d057765 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -42,7 +42,7 @@ import org.mockito.MockitoAnnotations; public class DoFnInvokersTest { @Rule public ExpectedException thrown = ExpectedException.none(); - @Mock private DoFn.ProcessContext mockContext; + @Mock private DoFn.ProcessContext mockContext; @Mock private BoundedWindow mockWindow; @Mock private DoFn.InputProvider mockInputProvider; @Mock private DoFn.OutputReceiver mockOutputReceiver; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java index 2629995..92fe5a6 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java @@ -125,7 +125,7 @@ public class AvroHDFSFileSource extends HDFSFileSource, NullWritab // clone the record to work around identical element issue due to object reuse Coder avroCoder = ((AvroHDFSFileSource) this.getCurrentSource()).avroCoder; - key = new AvroKey(CoderUtils.clone(avroCoder, key.datum())); + key = new AvroKey<>(CoderUtils.clone(avroCoder, key.datum())); return KV.of(key, value); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java index c1340c0..45a8037 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java @@ -64,7 +64,7 @@ public class AvroWrapperCoder, DatumT> } @JsonCreator - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public static AvroWrapperCoder of( @JsonProperty("wrapperType") String wrapperType, @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List> components) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index 3a4d01f..c71a58c 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -120,10 +120,7 @@ public class HDFSFileSource extends BoundedSource> { */ public static > HDFSFileSource from( String filepattern, Class formatClass, Class keyClass, Class valueClass) { - @SuppressWarnings("unchecked") - HDFSFileSource source = (HDFSFileSource) - new HDFSFileSource(filepattern, formatClass, keyClass, valueClass); - return source; + return new HDFSFileSource<>(filepattern, formatClass, keyClass, valueClass); } /** @@ -271,7 +268,7 @@ public class HDFSFileSource extends BoundedSource> { private final BoundedSource> source; private final String filepattern; - private final Class formatClass; + private final Class> formatClass; protected Job job; private FileInputFormat format; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java index 6fb340e..d2cab57 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java @@ -93,10 +93,7 @@ public class SimpleAuthHDFSFileSource extends HDFSFileSource { Class keyClass, Class valueClass, String username) { - @SuppressWarnings("unchecked") - HDFSFileSource source = (HDFSFileSource) - new SimpleAuthHDFSFileSource(filepattern, formatClass, keyClass, valueClass, username); - return source; + return new SimpleAuthHDFSFileSource<>(filepattern, formatClass, keyClass, valueClass, username); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 3107aab..1c35f6e 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -255,7 +255,7 @@ public class JmsIO { } @Override - public Coder getCheckpointMarkCoder() { + public Coder getCheckpointMarkCoder() { return AvroCoder.of(JmsCheckpointMark.class); } @@ -319,6 +319,7 @@ public class JmsIO { } Map properties = new HashMap<>(); + @SuppressWarnings("rawtypes") Enumeration propertyNames = message.getPropertyNames(); while (propertyNames.hasMoreElements()) { String propertyName = (String) propertyNames.nextElement(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3e6fe3f/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java index 804d6cc..4515f38 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java @@ -23,8 +23,9 @@ import java.util.NoSuchElementException; * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element. */ abstract class CustomOptional { + @SuppressWarnings("unchecked") public static CustomOptional absent() { - return Absent.INSTANCE; + return (Absent) Absent.INSTANCE; } public static CustomOptional of(T v) { @@ -67,7 +68,7 @@ abstract class CustomOptional { } private static class Absent extends CustomOptional { - public static final Absent INSTANCE = new Absent(); + private static final Absent INSTANCE = new Absent<>(); private Absent() { }