flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [7/8] flink git commit: [FLINK-4298] [storm compatibility] Clean up unnecessary dependencies in 'flink-storm'
Date Mon, 01 Aug 2016 17:54:02 GMT
[FLINK-4298] [storm compatibility] Clean up unnecessary dependencies in 'flink-storm'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ea2596e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ea2596e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ea2596e

Branch: refs/heads/master
Commit: 0ea2596e1b605c1eedb843273660ef1366463313
Parents: 4456453
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Aug 1 15:58:12 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200

----------------------------------------------------------------------
 flink-contrib/flink-storm/pom.xml               | 83 +++++++-------------
 .../org/apache/flink/storm/api/FlinkClient.java |  5 +-
 .../flink/storm/wrappers/BoltWrapper.java       | 19 +++--
 .../storm/wrappers/FlinkTopologyContext.java    |  3 +-
 .../storm/wrappers/MergedInputsBoltWrapper.java |  6 +-
 .../flink/storm/wrappers/SpoutWrapper.java      |  8 +-
 .../storm/wrappers/WrapperSetupHelperTest.java  | 12 ++-
 7 files changed, 60 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index 590f33d..0ac49db 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -61,18 +61,40 @@ under the License.
 					<artifactId>log4j-over-slf4j</artifactId>
 				</exclusion>
 				<exclusion>
-					<artifactId>logback-classic</artifactId>
 					<groupId>ch.qos.logback</groupId>
+					<artifactId>logback-classic</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ring</groupId>
+					<artifactId>ring-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ring</groupId>
+					<artifactId>ring-devel</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ring</groupId>
+					<artifactId>ring-servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ring</groupId>
+					<artifactId>ring-jetty-adapter</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.jgrapht</groupId>
+					<artifactId>jgrapht-core</artifactId>
 				</exclusion>
 			</exclusions>
 		</dependency>
 
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
 		<!-- test dependencies -->
 
 		<dependency>
@@ -85,51 +107,4 @@ under the License.
 		
 	</dependencies>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has
no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-dependency-plugin</artifactId>
-										<versionRange>[2.9,)</versionRange>
-										<goals>
-											<goal>unpack</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-
-	</build>
-
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 9628bb7..f4bcfb7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import backtype.storm.Config;
 import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
@@ -32,7 +33,6 @@ import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
 import com.esotericsoftware.kryo.Serializer;
-import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,7 +216,7 @@ public class FlinkClient {
 
 		try {
 			ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
-					Lists.newArrayList(uploadedJarUrl),
+					Collections.<URL>singletonList(uploadedJarUrl),
 					Collections.<URL>emptyList(),
 					this.getClass().getClassLoader());
 			client.runDetached(jobGraph, classLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index 5311cb3..6e316e7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.generated.GlobalStreamId;
@@ -26,8 +27,6 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.utils.Utils;
 
-import com.google.common.collect.Sets;
-
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
@@ -44,6 +43,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within
a Flink Streaming program.
  * It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s
that the bolt can
@@ -135,9 +136,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT>
implements
 	 *             {@code rawOuput} is {@code false} and the number of declared output attributes
is not within range
 	 *             [1;25].
 	 */
-	public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
+	public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) 
 			throws IllegalArgumentException {
-		this(bolt, null, Sets.newHashSet(rawOutputs));
+		this(bolt, null, asList(rawOutputs));
 	}
 
 	/**
@@ -157,8 +158,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT>
implements
 	 *             {@code rawOuput} is {@code false} and the number of declared output attributes
is not with range
 	 *             [1;25].
 	 */
-	public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
-			throws IllegalArgumentException {
+	public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs) throws
IllegalArgumentException {
 		this(bolt, null, rawOutputs);
 	}
 
@@ -181,9 +181,12 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT>
implements
 	 *             {@code rawOuput} is {@code false} and the number of declared output attributes
is not with range
 	 *             [0;25].
 	 */
-	public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, final String[] rawOutputs)
+	public BoltWrapper(
+			final IRichBolt bolt,
+			final Fields inputSchema,
+			final String[] rawOutputs) 
 			throws IllegalArgumentException {
-		this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
+		this(bolt, inputSchema, asList(rawOutputs));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
index db1d147..52d39a7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -120,9 +120,8 @@ final class FlinkTopologyContext extends TopologyContext {
 	 * @throws UnsupportedOperationException
 	 * 		at every invocation
 	 */
-	@SuppressWarnings("unchecked")
 	@Override
-	public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs)
{
+	public <T extends IMetric> T registerMetric(final String name, final T metric, final
int timeBucketSizeInSecs) {
 		throw new UnsupportedOperationException("Metrics are not supported by Flink");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
index 89defde..7a3b6d5 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.topology.IRichBolt;
-import com.google.common.collect.Sets;
 
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -26,6 +26,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link MergedInputsBoltWrapper} is a {@link BoltWrapper} that expects input tuples of
type {@link StormTuple}. It
  * can be used to wrap a multi-input bolt and assumes that all input stream got merged into
a {@link StormTuple} stream
@@ -67,7 +69,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTup
 	 */
 	public MergedInputsBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
 			throws IllegalArgumentException {
-		super(bolt, Sets.newHashSet(rawOutputs));
+		super(bolt, asList(rawOutputs));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index 66b05c6..c171ccc 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -22,8 +22,6 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 
-import com.google.common.collect.Sets;
-
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.java.tuple.Tuple0;
@@ -37,6 +35,8 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import java.util.Collection;
 import java.util.HashMap;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink
Streaming program. It
  * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT}
(see
@@ -121,7 +121,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT>
imp
 	 */
 	public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
 			throws IllegalArgumentException {
-		this(spout, Sets.newHashSet(rawOutputs), null);
+		this(spout, asList(rawOutputs), null);
 	}
 
 	/**
@@ -147,7 +147,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT>
imp
 	 */
 	public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
 			final Integer numberOfInvocations) throws IllegalArgumentException {
-		this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations);
+		this(spout, asList(rawOutputs), numberOfInvocations);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index 82b12d6..000fe84 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.Config;
@@ -27,16 +28,18 @@ import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-import com.google.common.collect.Sets;
+
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.storm.util.TestDummySpout;
 import org.apache.flink.storm.util.TestSink;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -48,6 +51,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import static java.util.Collections.singleton;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -85,7 +90,7 @@ public class WrapperSetupHelperTest extends AbstractTest {
 		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		WrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
-				Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
+				new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -143,8 +148,7 @@ public class WrapperSetupHelperTest extends AbstractTest {
 
 		Assert.assertEquals(attributes, WrapperSetupHelper.getNumberOfAttributes(
 				boltOrSpout,
-				numberOfAttributes == -1 ? Sets
-						.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
+				numberOfAttributes == -1 ? new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID))
: null));
 	}
 
 	@Test


Mime
View raw message