flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] flink git commit: [FLINK-1789][core][runtime] Allow adding of URLs to the usercode class loader
Date Thu, 08 Oct 2015 13:58:33 GMT
Repository: flink
Updated Branches:
  refs/heads/master fa2bb8f11 -> 0ee0c1f55


http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 461138c..131a2e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -74,6 +74,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -162,7 +163,7 @@ public class TaskManagerTest {
 						new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				
 				new Within(d) {
@@ -267,13 +268,13 @@ public class TaskManagerTest {
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2,
"TestTask2", 2, 7,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final ActorGateway tm = taskManager;
 
@@ -405,13 +406,13 @@ public class TaskManagerTest {
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver",
2, 7,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				new Within(d){
 
@@ -507,13 +508,14 @@ public class TaskManagerTest {
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender",
0, 1,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
-						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(),
0);
+						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(),
+						Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver",
2, 7,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
-						new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				new Within(d) {
 
@@ -651,13 +653,13 @@ public class TaskManagerTest {
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender",
0, 1,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver",
2, 7,
 						new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
-						new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				new Within(d){
 
@@ -796,7 +798,8 @@ public class TaskManagerTest {
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(igdd),
-						Collections.<BlobKey>emptyList(), 0);
+						Collections.<BlobKey>emptyList(),
+						Collections.<URL>emptyList(), 0);
 
 				new Within(d) {
 					@Override
@@ -888,7 +891,8 @@ public class TaskManagerTest {
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(igdd),
-						Collections.<BlobKey>emptyList(), 0);
+						Collections.<BlobKey>emptyList(),
+						Collections.<URL>emptyList(), 0);
 
 				new Within(new FiniteDuration(120, TimeUnit.SECONDS)) {
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index cf1bfc5..fb933f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Field;
+import java.net.URL;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -739,6 +740,7 @@ public class TaskTest {
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList(),
 				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
 				0);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index cb37fd4..a336957 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -19,7 +19,6 @@ package org.apache.flink.api.java;
  * limitations under the License.
  */
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
@@ -27,8 +26,9 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.File;
+import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -50,7 +50,7 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 	 * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment
is called.
 	 */
 	public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String...
jarFiles) {
-		super(host, port, jarFiles);
+		super(host, port, null, jarFiles, null);
 		this.flinkILoop = flinkILoop;
 	}
 
@@ -65,21 +65,22 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan p = createProgramPlan(jobName);
 
-		String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
+		URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
 
 		// get "external jars, and add the shell command jar, pass to executor
-		List<String> alljars = new ArrayList<String>();
+		List<URL> alljars = new ArrayList<>();
 		// get external (library) jars
 		String[] extJars = this.flinkILoop.getExternalJars();
-		
-		if(!ArrayUtils.isEmpty(extJars)) {
-			alljars.addAll(Arrays.asList(extJars));
+
+		for (String extJar : extJars) {
+			URL extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL();
+			alljars.add(extJarUrl);
 		}
+
 		// add shell commands
-		alljars.add(jarFile);
-		String[] alljarsArr = new String[alljars.size()];
-		alljarsArr = alljars.toArray(alljarsArr);
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(),
alljarsArr);
+		alljars.add(jarUrl);
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(),
+				alljars.toArray(new URL[alljars.size()]), null);
 
 		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
 		return executor.executePlan(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index ccf51d2..02c938e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -19,7 +19,12 @@ package org.apache.flink.streaming.api.environment;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
@@ -40,13 +45,21 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 	
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
+	/** The hostname of the JobManager */
 	private final String host;
+
+	/** The port of the JobManager main actor system */
 	private final int port;
-	private final List<File> jarFiles;
-	
+
 	/** The configuration used to parametrize the client that connects to the remote cluster
*/
 	private final Configuration config;
 
+	/** The jar files that need to be attached to each job */
+	private final List<URL> jarFiles;
+	
+	/** The classpaths that need to be attached to each job */
+	private final List<URL> globalClasspaths;
+
 	/**
 	 * Creates a new RemoteStreamEnvironment that points to the master
 	 * (JobManager) described by the given host name and port.
@@ -87,6 +100,34 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 	 *            provided in the JAR files.
 	 */
 	public RemoteStreamEnvironment(String host, int port, Configuration config, String... jarFiles)
{
+		this(host, port, config, jarFiles, null);
+	}
+
+	/**
+	 * Creates a new RemoteStreamEnvironment that points to the master
+	 * (JobManager) described by the given host name and port.
+	 *
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param config
+	 *            The configuration used to parametrize the client that connects to the
+	 *            remote cluster.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 * @param globalClasspaths 
+	 *            The paths of directories and JAR files that are added to each user code 
+	 *            classloader on all nodes in the cluster. Note that the paths must specify
a 
+	 *            protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a
NFS share).
+	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
+	 */
+	public RemoteStreamEnvironment(String host, int port, Configuration config, String[] jarFiles,
URL[] globalClasspaths) {
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The RemoteEnvironment cannot be used when submitting a program through a client, "
+
@@ -103,16 +144,23 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 		this.host = host;
 		this.port = port;
 		this.config = config == null ? new Configuration() : config;
-		this.jarFiles = new ArrayList<File>(jarFiles.length);
+		this.jarFiles = new ArrayList<>(jarFiles.length);
 		for (String jarFile : jarFiles) {
-			File file = new File(jarFile);
 			try {
-				JobWithJars.checkJarFile(file);
+				URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
+				this.jarFiles.add(jarFileUrl);
+				JobWithJars.checkJarFile(jarFileUrl);
+			} catch (MalformedURLException e) {
+				throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
+			} catch (IOException e) {
+				throw new RuntimeException("Problem with jar file " + jarFile, e);
 			}
-			catch (IOException e) {
-				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
-			}
-			this.jarFiles.add(file);
+		}
+		if (globalClasspaths == null) {
+			this.globalClasspaths = Collections.emptyList();
+		}
+		else {
+			this.globalClasspaths = Arrays.asList(globalClasspaths);
 		}
 	}
 
@@ -135,11 +183,16 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		for (File file : jarFiles) {
-			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		for (URL jarFile : jarFiles) {
+			try {
+				jobGraph.addJar(new Path(jarFile.toURI()));
+			} catch (URISyntaxException e) {
+				throw new ProgramInvocationException("URL is invalid", e);
+			}
 		}
 
-		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
+		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
+			getClass().getClassLoader());
 		
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 2b2a426..1392efb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import java.io.File;
+import java.net.URL;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
@@ -36,7 +36,9 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment
{
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
 
-	private final List<File> jars;
+	private final List<URL> jars;
+
+	private final List<URL> classpaths;
 	
 	private final Client client;
 
@@ -44,12 +46,15 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment
{
 	
 	private final boolean wait;
 
-	protected StreamContextEnvironment(Client client, List<File> jars, int parallelism,
boolean wait) {
+	protected StreamContextEnvironment(Client client, List<URL> jars, List<URL>
classpaths, int parallelism,
+			boolean wait) {
 		this.client = client;
 		this.jars = jars;
+		this.classpaths = classpaths;
 		this.wait = wait;
 		
-		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, getClass().getClassLoader());
+		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths,
+				getClass().getClassLoader());
 		
 		if (parallelism > 0) {
 			setParallelism(parallelism);
@@ -84,10 +89,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment
{
 		transformations.clear();
 
 		// attach all necessary jar files to the JobGraph
-		for (File file : jars) {
-			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		for (URL file : jars) {
+			jobGraph.addJar(new Path(file.toURI()));
 		}
 
+		jobGraph.setClasspaths(classpaths);
+
 		// execute the programs
 		if (wait) {
 			return client.runBlocking(jobGraph, userCodeClassLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index cc96217..28410fd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -69,9 +69,9 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.SplittableIterator;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1285,7 +1285,7 @@ public abstract class StreamExecutionEnvironment {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		if (env instanceof ContextEnvironment) {
 			ContextEnvironment ctx = (ContextEnvironment) env;
-			return createContextEnvironment(ctx.getClient(), ctx.getJars(),
+			return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getClasspaths(),
 					ctx.getParallelism(), ctx.isWait());
 		} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment)
{
 			return new StreamPlanEnvironment(env);
@@ -1295,9 +1295,9 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	private static StreamExecutionEnvironment createContextEnvironment(
-			Client client, List<File> jars, int parallelism, boolean wait)
+			Client client, List<URL> jars, List<URL> classpaths, int parallelism, boolean
wait)
 	{
-		return new StreamContextEnvironment(client, jars, parallelism, wait);
+		return new StreamContextEnvironment(client, jars, classpaths, parallelism, wait);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 4480d95..dab6a6d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -68,12 +67,23 @@ public class ClassLoaderITCase {
 				PackagedProgram inputSplitTestProg = new PackagedProgram(
 						new File(INPUT_SPLITS_PROG_JAR_FILE),
 						new String[] { INPUT_SPLITS_PROG_JAR_FILE,
+										"", // classpath
 										"localhost",
 										String.valueOf(port),
 										"4" // parallelism
 									});
 				inputSplitTestProg.invokeInteractiveModeForExecution();
 
+				String classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL().toString();
+				PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
+						new String[] { "",
+										classpath, // classpath
+										"localhost",
+										String.valueOf(port),
+										"4" // parallelism
+									} );
+				inputSplitTestProg2.invokeInteractiveModeForExecution();
+
 				// regular streaming job
 				PackagedProgram streamingProg = new PackagedProgram(
 						new File(STREAMING_PROG_JAR_FILE),

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index 36b56e0..b18e8e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.classloading.jar;
 
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.RemoteEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -40,13 +42,13 @@ import org.apache.flink.core.io.InputSplitAssigner;
 public class CustomInputSplitProgram {
 	
 	public static void main(String[] args) throws Exception {
-		
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-		final int parallelism = Integer.parseInt(args[3]);
-		
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		final String[] jarFile = (args[0].equals(""))? null : new String[] { args[0] };
+		final URL[] classpath = (args[1].equals(""))? null : new URL[] { new URL(args[1]) };
+		final String host = args[2];
+		final int port = Integer.parseInt(args[3]);
+		final int parallelism = Integer.parseInt(args[4]);
+
+		RemoteEnvironment env = new RemoteEnvironment(host, port, null, jarFile, classpath);
 		env.setParallelism(parallelism);
 		env.getConfig().disableSysoutLogging();
 
@@ -59,7 +61,7 @@ public class CustomInputSplitProgram {
 					return new Tuple2<Integer, Double>(value, value * 0.5);
 				}
 			})
-			.output(new DiscardingOutputFormat<Tuple2<Integer,Double>>());
+			.output(new DiscardingOutputFormat<Tuple2<Integer, Double>>());
 
 		env.execute();
 	}


Mime
View raw message