ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [18/50] [abbrv] ignite git commit: IGNITE-9034: [ML] Add Estimator API support to TensorFlow cluster on top of Apache Ignite.
Date Fri, 03 Aug 2018 09:58:31 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
new file mode 100644
index 0000000..3dcd5f8
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster.util;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive;
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec;
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec;
+import org.apache.ignite.tensorflow.core.util.AsyncNativeProcessRunner;
+import org.apache.ignite.tensorflow.core.util.NativeProcessRunner;
+
+/**
+ * Utils class that helps to start and stop user script process.
+ */
+public class TensorFlowUserScriptRunner extends AsyncNativeProcessRunner {
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** Job archive that will be extracted and used as working directory for the native process. */
+    private final TensorFlowJobArchive jobArchive;
+
+    /** TensorFlow cluster specification. */
+    private final TensorFlowClusterSpec clusterSpec;
+
+    /** Output stream data consumer. */
+    private final Consumer<String> out;
+
+    /** Error stream data consumer. */
+    private final Consumer<String> err;
+
+    /** Working directory of the user script process. */
+    private File workingDir;
+
+    /**
+     * Constructs a new instance of TensorFlow user script runner.
+     *
+     * @param ignite Ignite instance.
+     * @param executor Executor to be used in {@link AsyncNativeProcessRunner}.
+     * @param jobArchive  Job archive that will be extracted and used as working directory for the native process.
+     * @param clusterSpec TensorFlow cluster specification.
+     * @param out Output stream data consumer.
+     * @param err  Error stream data consumer.
+     */
+    public TensorFlowUserScriptRunner(Ignite ignite, ExecutorService executor, TensorFlowJobArchive jobArchive,
+        TensorFlowClusterSpec clusterSpec, Consumer<String> out, Consumer<String> err) {
+        super(ignite, executor);
+
+        this.log = ignite.log().getLogger(TensorFlowUserScriptRunner.class);
+
+        this.jobArchive = jobArchive;
+        this.clusterSpec = clusterSpec;
+        this.out = out;
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public NativeProcessRunner doBefore() {
+        try {
+            workingDir = Files.createTempDirectory("tf_us_").toFile();
+            log.debug("Directory has been created [path=" + workingDir.getAbsolutePath() + "]");
+
+            unzip(jobArchive.getData(), workingDir);
+            log.debug("Job archive has been extracted [path=" + workingDir.getAbsolutePath() + "]");
+
+            return prepareNativeProcessRunner();
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void doAfter() {
+        if (workingDir != null) {
+            delete(workingDir);
+            log.debug("Directory has been deleted [path=" + workingDir.getAbsolutePath() + "]");
+        }
+    }
+
+    /**
+     * Prepares process builder and specifies working directory and command to be run.
+     *
+     * @return Prepared process builder.
+     */
+    private NativeProcessRunner prepareNativeProcessRunner() {
+        if (workingDir == null)
+            throw new IllegalStateException("Working directory is not created");
+
+        ProcessBuilder procBuilder = new ProcessBuilder();
+
+        procBuilder.directory(workingDir);
+        procBuilder.command(jobArchive.getCommands());
+
+        Map<String, String> env = procBuilder.environment();
+        env.put("PYTHONPATH", workingDir.getAbsolutePath());
+        env.put("TF_CONFIG", formatTfConfigVar());
+        env.put("TF_WORKERS", formatTfWorkersVar());
+        env.put("TF_CHIEF_SERVER", formatTfChiefServerVar());
+
+        return new NativeProcessRunner(procBuilder, null, out, err);
+    }
+
+    /**
+     * Formats "TF_CONFIG" variable to be passed into user script.
+     *
+     * @return Formatted "TF_CONFIG" variable to be passed into user script.
+     */
+    private String formatTfConfigVar() {
+        return new StringBuilder()
+            .append("{\"cluster\" : ")
+            .append(clusterSpec.format(Ignition.ignite()))
+            .append(", ")
+            .append("\"task\": {\"type\" : \"" + TensorFlowClusterResolver.CHIEF_JOB_NAME + "\", \"index\": 0}}")
+            .toString();
+    }
+
+    /**
+     * Formats "TF_WORKERS" variable to be passed into user script.
+     *
+     * @return Formatted "TF_WORKERS" variable to be passed into user script.
+     */
+    private String formatTfWorkersVar() {
+        StringJoiner joiner = new StringJoiner(", ");
+
+        int cnt = clusterSpec.getJobs().get(TensorFlowClusterResolver.WORKER_JOB_NAME).size();
+        for (int i = 0; i < cnt; i++)
+            joiner.add("\"/job:" + TensorFlowClusterResolver.WORKER_JOB_NAME + "/task:" + i + "\"");
+
+        return "[" + joiner + "]";
+    }
+
+    /**
+     * Formats "TF_CHIEF_SERVER" variable to be passed into user script.
+     *
+     * @return Formatted "TF_CHIEF_SERVER" variable to be passed into user script.
+     */
+    private String formatTfChiefServerVar() {
+        List<TensorFlowServerAddressSpec> tasks = clusterSpec.getJobs().get(TensorFlowClusterResolver.CHIEF_JOB_NAME);
+
+        if (tasks == null || tasks.size() != 1)
+            throw new IllegalStateException("TensorFlow cluster specification should contain exactly one chief task");
+
+        TensorFlowServerAddressSpec addrSpec = tasks.iterator().next();
+
+        return "grpc://" + addrSpec.format(Ignition.ignite());
+    }
+
+    /**
+     * Clears given file or directory recursively.
+     *
+     * @param file File or directory to be cleaned,
+     */
+    private void delete(File file) {
+        if (file.isDirectory()) {
+            String[] files = file.list();
+
+            if (files != null && files.length != 0)
+                for (String fileToBeDeleted : files)
+                    delete(new File(file, fileToBeDeleted));
+
+            if (!file.delete())
+                throw new IllegalStateException("Can't delete directory [path=" + file.getAbsolutePath() + "]");
+        }
+        else {
+            if (!file.delete())
+                throw new IllegalStateException("Can't delete file [path=" + file.getAbsolutePath() + "]");
+        }
+    }
+
+    /**
+     * Extracts specified zip archive into specified directory.
+     *
+     * @param data Zip archive to be extracted.
+     * @param extractTo Target directory.
+     */
+    private void unzip(byte[] data, File extractTo) {
+        try (ZipInputStream zipStream = new ZipInputStream(new ByteArrayInputStream(data))) {
+            ZipEntry entry;
+            while ((entry = zipStream.getNextEntry()) != null) {
+                File file = new File(extractTo, entry.getName());
+
+                if (entry.isDirectory() && !file.exists()) {
+                    boolean created = file.mkdirs();
+                    if (!created)
+                        throw new IllegalStateException("Can't create directory [path=" + file.getAbsolutePath() + "]");
+                }
+                else {
+                    if (!file.getParentFile().exists()) {
+                        boolean created = file.getParentFile().mkdirs();
+                        if (!created)
+                            throw new IllegalStateException("Can't create directory [path=" +
+                                file.getParentFile().getAbsolutePath() + "]");
+                    }
+
+                    try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file))) {
+                        IOUtils.copy(zipStream, out);
+                    }
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java
index 0ef81bc..c825448 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java
@@ -17,18 +17,17 @@
 
 package org.apache.ignite.tensorflow.core;
 
-import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
-import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
 
 /**
  * Process manager that allows to run and maintain processes in the cluster.
  *
  * @param <R> Type of task to be run.
  */
-public interface ProcessManager<R> extends Serializable {
+public interface ProcessManager<R> {
     /**
      * Starts the processes by the given specifications.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java
index b66b54f..4f10e83 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.tensorflow.core;
 
-import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
 
 /**
  * Process manager wrapper that allows to define how one type of process specification should be transformed into
@@ -31,9 +31,6 @@ import java.util.UUID;
  * @param <R> Type of accepted process specifications.
  */
 public abstract class ProcessManagerWrapper<T, R> implements ProcessManager<R> {
-    /** */
-    private static final long serialVersionUID = -6397225095261457524L;
-
     /** Delegate. */
     private final ProcessManager<T> delegate;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java
index 027ece3..a25ff97 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.tensorflow.core.longrunning;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,7 +24,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterGroupEmptyException;
@@ -41,22 +39,18 @@ import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProces
  * Long running process manager that allows to start, stop and make other actions with long running processes.
  */
 public class LongRunningProcessManager implements ProcessManager<LongRunningProcess> {
-    /** */
-    private static final long serialVersionUID = 1151455641358063287L;
-
-    /** Ignite instance supplier. */
-    private final Supplier<Ignite> igniteSupplier;
+    /** Ignite instance. */
+    private final Ignite ignite;
 
     /**
      * Constructs a new instance of long running process manager.
      *
-     * @param igniteSupplier Ignite instance supplier.
-     * @param <T> Type of serializable supplier.
+     * @param ignite Ignite instance.
      */
-    public <T extends Supplier<Ignite> & Serializable> LongRunningProcessManager(T igniteSupplier) {
-        assert igniteSupplier != null : "Ignite supplier should not be null";
+    public LongRunningProcessManager(Ignite ignite) {
+        assert ignite != null : "Ignite instance should not be null";
 
-        this.igniteSupplier = igniteSupplier;
+        this.ignite = ignite;
     }
 
     /** {@inheritDoc} */
@@ -100,7 +94,6 @@ public class LongRunningProcessManager implements ProcessManager<LongRunningProc
                 List<T> nodeProcesses = params.get(nodeId);
                 LongRunningProcessTask<List<E>> task = taskSupplier.apply(nodeProcesses);
 
-                Ignite ignite = igniteSupplier.get();
                 ClusterGroup clusterGrp = ignite.cluster().forNodeId(nodeId);
 
                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java
index 1d08519..04f90d3 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.tensorflow.core.longrunning.task;
 
-import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess;
 import org.apache.ignite.tensorflow.core.util.CustomizableThreadFactory;
 
 /**
@@ -78,7 +78,7 @@ public class LongRunningProcessStartTask extends LongRunningProcessTask<List<UUI
      */
     private Future<?> runTask(Runnable task) {
         return Executors
-            .newSingleThreadExecutor(new CustomizableThreadFactory("LONG_RUNNING_PROCESS_TASK", true))
+            .newSingleThreadExecutor(new CustomizableThreadFactory("tf-long-running", true))
             .submit(task);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java
index df36ba9..2ad3c9d 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java
@@ -20,6 +20,7 @@ package org.apache.ignite.tensorflow.core.nativerunning;
 import java.io.Serializable;
 import java.util.UUID;
 import java.util.function.Supplier;
+import org.apache.ignite.tensorflow.util.SerializableSupplier;
 
 /**
  * Native process specification.
@@ -29,7 +30,7 @@ public class NativeProcess implements Serializable {
     private static final long serialVersionUID = -7056800139746134956L;
 
     /** Process builder supplier. */
-    private final Supplier<ProcessBuilder> procBuilderSupplier;
+    private final SerializableSupplier<ProcessBuilder> procBuilderSupplier;
 
     /** Stdin of the process. */
     private final String stdin;
@@ -44,8 +45,7 @@ public class NativeProcess implements Serializable {
      * @param stdin Stdin of the process.
      * @param nodeId Node identifier.
      */
-    public <T extends Supplier<ProcessBuilder> & Serializable> NativeProcess(T procBuilderSupplier, String stdin,
-        UUID nodeId) {
+    public NativeProcess(SerializableSupplier<ProcessBuilder> procBuilderSupplier, String stdin, UUID nodeId) {
         assert procBuilderSupplier != null : "Process builder supplier should not be null";
         assert nodeId != null : "Node identifier should not be null";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java
index 60cd89b..5accf3d 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.tensorflow.core.nativerunning;
 
-import java.io.Serializable;
-import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.tensorflow.core.ProcessManager;
 import org.apache.ignite.tensorflow.core.ProcessManagerWrapper;
@@ -30,17 +28,13 @@ import org.apache.ignite.tensorflow.core.nativerunning.task.NativeProcessStartTa
  * Native process manager that allows to start, stop and make other actions with native processes.
  */
 public class NativeProcessManager extends ProcessManagerWrapper<LongRunningProcess, NativeProcess> {
-    /** */
-    private static final long serialVersionUID = 718119807915504045L;
-
     /**
      * Constructs a new native process manager.
      *
-     * @param igniteSupplier Ignite instance supplier.
-     * @param <T> Type of serializable supplier.
+     * @param ignite Ignite instance.
      */
-    public <T extends Supplier<Ignite> & Serializable> NativeProcessManager(T igniteSupplier) {
-        super(new LongRunningProcessManager(igniteSupplier));
+    public NativeProcessManager(Ignite ignite) {
+        super(new LongRunningProcessManager(ignite));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java
index 8fc28a5..ae9e2b9 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java
@@ -18,16 +18,11 @@
 package org.apache.ignite.tensorflow.core.nativerunning.task;
 
 import java.util.function.Supplier;
-import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.Scanner;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.tensorflow.core.util.CustomizableThreadFactory;
+import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess;
+import org.apache.ignite.tensorflow.core.util.NativeProcessRunner;
 
 /**
  * Task that starts native process by its specification.
@@ -55,62 +50,26 @@ public class NativeProcessStartTask implements IgniteRunnable {
         Supplier<ProcessBuilder> procBuilderSupplier = procSpec.getProcBuilderSupplier();
         ProcessBuilder procBuilder = procBuilderSupplier.get();
 
-        Process proc;
-        try {
-            proc = procBuilder.start();
-        }
-        catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-
-        Thread shutdownHook = new Thread(proc::destroy);
-        Runtime.getRuntime().addShutdownHook(shutdownHook);
+        NativeProcessRunner procRunner = new NativeProcessRunner(
+            procBuilder,
+            procSpec.getStdin(),
+            System.out::println,
+            System.err::println
+        );
 
-        Future<?> outForward = forwardStream(proc.getInputStream(), System.out);
-        Future<?> errForward = forwardStream(proc.getErrorStream(), System.err);
+        IgniteLogger log = Ignition.ignite().log().getLogger(NativeProcessStartTask.class);
 
         try {
-            if (procSpec.getStdin() != null) {
-                PrintWriter writer = new PrintWriter(proc.getOutputStream());
-                writer.println(procSpec.getStdin());
-                writer.flush();
-            }
-
-            int status;
-            try {
-                status = proc.waitFor();
-            }
-            catch (InterruptedException e) {
-                proc.destroy();
-                status = proc.exitValue();
-            }
-
-            Runtime.getRuntime().removeShutdownHook(shutdownHook);
-
-            if (status != 0)
-                throw new IllegalStateException("Native process exit status is " + status);
+            log.debug("Starting native process");
+            procRunner.startAndWait();
+            log.debug("Native process completed");
         }
-        finally {
-            outForward.cancel(true);
-            errForward.cancel(true);
+        catch (InterruptedException e) {
+            log.debug("Native process interrupted");
+        }
+        catch (Exception e) {
+            log.error("Native process failed", e);
+            throw e;
         }
-    }
-
-    /**
-     * Forwards stream.
-     *
-     * @param src Source stream.
-     * @param dst Destination stream.
-     * @return Future that allows to interrupt forwarding.
-     */
-    private Future<?> forwardStream(InputStream src, PrintStream dst) {
-        return Executors
-            .newSingleThreadExecutor(new CustomizableThreadFactory("NATIVE_PROCESS_FORWARD_STREAM", true))
-            .submit(() -> {
-                Scanner scanner = new Scanner(src);
-
-                while (!Thread.currentThread().isInterrupted() && scanner.hasNextLine())
-                    dst.println(scanner.nextLine());
-            });
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
new file mode 100644
index 0000000..e59ab00
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.core.pythonrunning;
+
+import org.apache.ignite.tensorflow.util.SerializableSupplier;
+
+/**
+ * Python process builder supplier that is used to create Python process builder.
+ */
+public class PythonProcessBuilderSupplier implements SerializableSupplier<ProcessBuilder> {
+    /** */
+    private static final long serialVersionUID = 7181937306294456125L;
+
+    /** Python environment variable name. */
+    private static final String PYTHON_ENV_NAME = "PYTHON";
+
+    /** Interactive flag (allows to used standard input to pass Python script). */
+    private final boolean interactive;
+
+    /**
+     * Constructs a new instance of Python process builder supplier.
+     *
+     * @param interactive Interactive flag (allows to used standard input to pass Python script).
+     */
+    public PythonProcessBuilderSupplier(boolean interactive) {
+        this.interactive = interactive;
+    }
+
+    /**
+     * Returns process builder to be used to start Python process.
+     *
+     * @return Process builder to be used to start Python process.
+     */
+    public ProcessBuilder get() {
+        String python = System.getenv(PYTHON_ENV_NAME);
+
+        if (python == null)
+            python = "python3";
+
+        return interactive ? new ProcessBuilder(python, "-i") : new ProcessBuilder(python);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
index de35ff9..1f6c11e 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.tensorflow.core.pythonrunning;
 
-import java.io.Serializable;
-import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.tensorflow.core.ProcessManager;
 import org.apache.ignite.tensorflow.core.ProcessManagerWrapper;
@@ -29,17 +27,13 @@ import org.apache.ignite.tensorflow.core.nativerunning.NativeProcessManager;
  * Python process manager that allows to  start, stop and make other actions with python processes.
  */
 public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, PythonProcess> {
-    /** */
-    private static final long serialVersionUID = -7095409565854538504L;
-
     /**
      * Constructs a new instance of python process manager.
      *
-     * @param igniteSupplier Ignite instance supplier.
-     * @param <T> Type of serializable supplier.
+     * @param ignite Ignite instance.
      */
-    public <T extends Supplier<Ignite> & Serializable> PythonProcessManager(T igniteSupplier) {
-        this(new NativeProcessManager(igniteSupplier));
+    public PythonProcessManager(Ignite ignite) {
+        this(new NativeProcessManager(ignite));
     }
 
     /**
@@ -54,30 +48,9 @@ public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, P
     /** {@inheritDoc} */
     @Override protected NativeProcess transformSpecification(PythonProcess spec) {
         return new NativeProcess(
-            new PythonProcessBuilderSupplier(),
+            new PythonProcessBuilderSupplier(true),
             spec.getStdin(),
             spec.getNodeId()
         );
     }
-
-    /**
-     * Python process builder supplier that is used to create Python process builder.
-     */
-    private static class PythonProcessBuilderSupplier implements Supplier<ProcessBuilder>, Serializable {
-        /** */
-        private static final long serialVersionUID = 8497087649461965914L;
-
-        /** Python environment variable name. */
-        private static final String PYTHON_ENV_NAME = "PYTHON";
-
-        /** {@inheritDoc} */
-        @Override public ProcessBuilder get() {
-            String python = System.getenv(PYTHON_ENV_NAME);
-
-            if (python == null)
-                python = "python3";
-
-            return new ProcessBuilder(python, "-i");
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java
new file mode 100644
index 0000000..b336b97
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.core.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+
+/**
+ * Asynchronous native process runner.
+ */
+public abstract class AsyncNativeProcessRunner {
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** Executors that is used to start async native process. */
+    private final ExecutorService executor;
+
+    /** Future of the async process process. */
+    private Future<?> fut;
+
+    /**
+     * Constructs a new asynchronous native process runner.
+     *
+     * @param ignite Ignite instance.
+     * @param executor Executor.
+     */
+    public AsyncNativeProcessRunner(Ignite ignite, ExecutorService executor) {
+        this.log = ignite.log().getLogger(AsyncNativeProcessRunner.class);
+        this.executor = executor;
+    }
+
+    /**
+     * Method that should be called before starting the process.
+     *
+     * @return Prepared native process runner.
+     */
+    public abstract NativeProcessRunner doBefore();
+
+    /**
+     * Method that should be called after starting the process.
+     */
+    public abstract void doAfter();
+
+    /**
+     * Starts the process in separate thread.
+     */
+    public synchronized void start() {
+        if (fut != null)
+            throw new IllegalStateException("Async native process has already been started");
+
+        NativeProcessRunner procRunner = doBefore();
+
+        fut = executor.submit(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    log.debug("Starting native process");
+                    procRunner.startAndWait();
+                    log.debug("Native process completed");
+                    break;
+                }
+                catch (InterruptedException e) {
+                    log.debug("Native process interrupted");
+                    break;
+                }
+                catch (Exception e) {
+                    log.error("Native process failed", e);
+                }
+            }
+
+            doAfter();
+        });
+    }
+
+    /**
+     * Stops the process.
+     */
+    public synchronized void stop() {
+        if (fut != null && !fut.isDone())
+            fut.cancel(true);
+    }
+
+    /**
+     * Checks if process is already completed.
+     *
+     * @return {@code true} if process completed, otherwise {@code false}.
+     */
+    public boolean isCompleted() {
+        return fut != null && fut.isDone();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java
new file mode 100644
index 0000000..38af26d
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.core.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Scanner;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Utils class that helps to start native processes.
+ */
+public class NativeProcessRunner {
+    /** Thread name to be used by threads that forward streams. */
+    private static final String NATIVE_PROCESS_FORWARD_STREAM_THREAD_NAME = "tf-forward-native-output";
+
+    /** Process builder. */
+    private final ProcessBuilder procBuilder;
+
+    /** Standard input of the process. */
+    private final String stdin;
+
+    /** Output stream data consumer. */
+    private final Consumer<String> out;
+
+    /** Error stream data consumer. */
+    private final Consumer<String> err;
+
+    /**
+     * Constructs a new instance of native process runner.
+     *
+     * @param procBuilder Process builder.
+     * @param stdin Standard input of the process.
+     * @param out Output stream data consumer.
+     * @param err Error stream data consumer.
+     */
+    public NativeProcessRunner(ProcessBuilder procBuilder, String stdin, Consumer<String> out, Consumer<String> err) {
+        this.procBuilder = procBuilder;
+        this.stdin = stdin;
+        this.out = out;
+        this.err = err;
+    }
+
+    /**
+     * Starts the native process and waits it to be completed successfully or with exception.
+     */
+    public void startAndWait() throws InterruptedException {
+        Process proc;
+        try {
+            proc = procBuilder.start();
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        AtomicBoolean shutdown = new AtomicBoolean();
+
+        Thread shutdownHook = new Thread(() -> {
+            shutdown.set(true);
+            proc.destroy();
+        });
+
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+        Future<?> outForward = forwardStream(proc.getInputStream(), out);
+        Future<?> errForward = forwardStream(proc.getErrorStream(), err);
+
+        try {
+            if (stdin != null) {
+                PrintWriter writer = new PrintWriter(proc.getOutputStream());
+                writer.println(stdin);
+                writer.flush();
+            }
+
+            int status;
+            try {
+                status = proc.waitFor();
+            }
+            catch (InterruptedException e) {
+                proc.destroy();
+                throw e;
+            }
+
+            if (!shutdown.get()) {
+                Runtime.getRuntime().removeShutdownHook(shutdownHook);
+
+                if (status != 0)
+                    throw new IllegalStateException("Native process exit [status=" + status + "]");
+            }
+        }
+        finally {
+            outForward.cancel(true);
+            errForward.cancel(true);
+        }
+    }
+
+    /**
+     * Forwards stream.
+     *
+     * @param src Source stream.
+     * @param dst Destination stream.
+     * @return Future that allows to interrupt forwarding.
+     */
+    private Future<?> forwardStream(InputStream src, Consumer<String> dst) {
+        return Executors
+            .newSingleThreadExecutor(new CustomizableThreadFactory(NATIVE_PROCESS_FORWARD_STREAM_THREAD_NAME, true))
+            .submit(() -> {
+                Scanner scanner = new Scanner(src);
+
+                while (!Thread.currentThread().isInterrupted() && scanner.hasNextLine())
+                    dst.accept(scanner.nextLine());
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java
new file mode 100644
index 0000000..0a7cae6
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter;
+
+import org.apache.ignite.tensorflow.submitter.command.RootCommand;
+import picocli.CommandLine;
+
+/**
+ * Main class of the job submitter application that allows to submit TensorFlow jobs to be run within Ignite cluster.
+ */
+public class JobSubmitter {
+    /**
+     * Main method.
+     *
+     * @param args Arguments.
+     */
+    public static void main(String... args) {
+        CommandLine.run(new RootCommand(), System.out, args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java
new file mode 100644
index 0000000..4d2fc18
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.logger.slf4j.Slf4jLogger;
+import picocli.CommandLine;
+
+/**
+ * Abstract command that contains options common for all commands.
+ */
+public abstract class AbstractCommand implements Runnable {
+    /** Ignite node configuration path. */
+    @CommandLine.Option(names = { "-c", "--config" }, description = "Apache Ignite client configuration.")
+    protected String cfg;
+
+    /**
+     * Returns Ignite instance based on configuration specified in {@link #cfg} field.
+     *
+     * @return Ignite instance.
+     */
+    protected Ignite getIgnite() {
+        if (cfg != null)
+            return Ignition.start(cfg);
+        else {
+            IgniteConfiguration igniteCfg = new IgniteConfiguration();
+            igniteCfg.setGridLogger(new Slf4jLogger());
+            igniteCfg.setClientMode(true);
+
+            return Ignition.start(igniteCfg);
+        }
+    }
+
+    /** */
+    public void setCfg(String cfg) {
+        this.cfg = cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java
new file mode 100644
index 0000000..946aa08
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager;
+import picocli.CommandLine;
+
+/**
+ * Command "attach" that is used to attach to running TensorFlow cluster and receive output of the user script.
+ */
+@CommandLine.Command(
+    name = "attach",
+    description = "Attaches to running TensorFlow cluster (user script process).",
+    mixinStandardHelpOptions = true
+)
+public class AttachCommand extends AbstractCommand {
+    /** TensorFlow cluster identifier. */
+    @CommandLine.Parameters(paramLabel = "CLUSTER_ID", description = "Cluster identifier.")
+    private UUID clusterId;
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        try (Ignite ignite = getIgnite()) {
+            TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite);
+
+            mgr.listenToClusterUserScript(clusterId, System.out::println, System.err::println);
+        }
+    }
+
+    /** */
+    public void setClusterId(UUID clusterId) {
+        this.clusterId = clusterId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java
new file mode 100644
index 0000000..0538496
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.tensorflow.cluster.TensorFlowCluster;
+import org.apache.ignite.tensorflow.cluster.TensorFlowClusterManager;
+import picocli.CommandLine;
+
+/**
+ * Command "ps" that is used to print identifiers of all running TensorFlow clusters.
+ */
+@CommandLine.Command(
+    name = "ps",
+    description = "Prints identifiers of all running TensorFlow clusters.",
+    mixinStandardHelpOptions = true
+)
+public class PsCommand extends AbstractCommand {
+    /** {@inheritDoc} */
+    @Override public void run() {
+        try (Ignite ignite = getIgnite()) {
+            TensorFlowClusterManager mgr = new TensorFlowClusterManager(ignite);
+
+            Map<UUID, TensorFlowCluster> clusters = mgr.getAllClusters();
+
+            for (UUID clusterId : clusters.keySet())
+                System.out.println(clusterId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java
new file mode 100644
index 0000000..508ea7b
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import picocli.CommandLine;
+
+/**
+ * Root command that aggregates all sub commands.
+ */
+@CommandLine.Command(
+    name = "ignite-tf",
+    description = "Apache Ignite and TensorFlow integration command line tool that allows to start, maintain and" +
+        " stop distributed deep learning utilizing Apache Ignite infrastructure and data.",
+    subcommands = {
+        StartCommand.class,
+        StopCommand.class,
+        AttachCommand.class,
+        PsCommand.class
+    },
+    mixinStandardHelpOptions = true
+)
+public class RootCommand extends AbstractCommand {
+    /** {@inheritDoc} */
+    @Override public void run() {
+        CommandLine.usage(this, System.out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java
new file mode 100644
index 0000000..082b363
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager;
+import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive;
+import picocli.CommandLine;
+
+/**
+ * Command "start" that is used to start a new TensorFlow cluster on top of Apache Ignite.
+ */
+@CommandLine.Command(
+    name = "start",
+    description = "Starts a new TensorFlow cluster and attaches to user script process.",
+    mixinStandardHelpOptions = true
+)
+public class StartCommand extends AbstractCommand {
+    /** Upstream cache name. */
+    @CommandLine.Parameters(index = "0", paramLabel = "CACHE_NAME", description = "Upstream cache name.")
+    private String cacheName;
+
+    /** Job folder or archive. */
+    @CommandLine.Parameters(index = "1", paramLabel = "JOB_DIR", description = "Job folder (or zip archive).")
+    private String jobFolder;
+
+    /** Job command to be executed in cluster. */
+    @CommandLine.Parameters(index = "2", paramLabel = "JOB_CMD", description = "Job command.")
+    private String jobCmd;
+
+    /** Arguments of a job command to be executed in cluster. */
+    @CommandLine.Parameters(index = "3..*", paramLabel = "JOB_ARGS", description = "Job arguments.")
+    private String[] jobArguments;
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        try (Ignite ignite = getIgnite()) {
+            UUID clusterId = UUID.randomUUID();
+            String[] commands = new String[jobArguments.length + 1];
+            commands[0] = jobCmd;
+            System.arraycopy(jobArguments, 0, commands, 1, commands.length - 1);
+
+            TensorFlowJobArchive jobArchive = new TensorFlowJobArchive(
+                cacheName,
+                zip(jobFolder),
+                commands
+            );
+
+            TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite);
+            mgr.createCluster(clusterId, jobArchive);
+
+            mgr.listenToClusterUserScript(clusterId, System.out::println, System.err::println);
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Archives specified folder or file into zip archive.
+     *
+     * @param jobArchivePath Path to folder to be archived.
+     * @return Byte array representing zip archive.
+     * @throws IOException In case of input/output exception.
+     */
+    private byte[] zip(String jobArchivePath) throws IOException {
+        Path path = Paths.get(jobArchivePath);
+        File file = path.toFile();
+
+        if (!file.exists())
+            throw new IllegalArgumentException("File doesn't exist [name=" + jobArchivePath + "]");
+
+        if (file.isDirectory())
+            return zipDirectory(file);
+        else if (jobArchivePath.endsWith(".zip"))
+            return zipArchive(file);
+        else
+            return zipFile(file);
+    }
+
+    /**
+     * Archives specified folder into zip archive.
+     *
+     * @param dir Directory to be archived.
+     * @return Byte array representing zip archive.
+     * @throws IOException In case of input/output exception.
+     */
+    private byte[] zipDirectory(File dir) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        try (ZipOutputStream zipFile = new ZipOutputStream(baos)) {
+            compressDirectoryToZip(dir.getAbsolutePath(), dir.getAbsolutePath(), zipFile);
+        }
+
+        return baos.toByteArray();
+    }
+
+    /**
+     * Archives specified file into zip archive.
+     *
+     * @param file File to be archived.
+     * @return Byte array representing zip archive.
+     * @throws IOException In case of input/output exception.
+     */
+    private byte[] zipFile(File file) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        try (ZipOutputStream zos = new ZipOutputStream(baos)) {
+            ZipEntry entry = new ZipEntry(file.getName());
+            zos.putNextEntry(entry);
+
+            try (FileInputStream in = new FileInputStream(file.getAbsolutePath())) {
+                IOUtils.copy(in, zos);
+            }
+        }
+
+        return baos.toByteArray();
+    }
+
+    /**
+     * Reads zip archive into byte array and returns this array.
+     *
+     * @param file Archive to be read.
+     * @return Byte array representing zip archive.
+     * @throws IOException In case of input/output exception.
+     */
+    private byte[] zipArchive(File file) throws IOException {
+        try (FileInputStream fis = new FileInputStream(file)) {
+            return IOUtils.toByteArray(fis);
+        }
+    }
+
+    /**
+     * Archives specified folder into zip output stream.
+     *
+     * @param rootDir Root directory.
+     * @param srcDir Source directory.
+     * @param out Zip output stream.
+     * @throws IOException In case of input/output exception.
+     */
+    private void compressDirectoryToZip(String rootDir, String srcDir, ZipOutputStream out) throws IOException {
+        File[] files = new File(srcDir).listFiles();
+
+        if (files != null) {
+            for (File file : files) {
+                if (file.isDirectory())
+                    compressDirectoryToZip(rootDir, srcDir + File.separator + file.getName(), out);
+                else {
+                    ZipEntry entry = new ZipEntry(srcDir.replace(rootDir, "")
+                        + File.separator + file.getName());
+                    out.putNextEntry(entry);
+
+                    try (FileInputStream in = new FileInputStream(srcDir + File.separator + file.getName())) {
+                        IOUtils.copy(in, out);
+                    }
+                }
+            }
+        }
+    }
+
+    /** */
+    public void setCacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    /** */
+    public void setJobFolder(String jobFolder) {
+        this.jobFolder = jobFolder;
+    }
+
+    /** */
+    public void setJobCmd(String jobCmd) {
+        this.jobCmd = jobCmd;
+    }
+
+    /** */
+    public void setJobArguments(String[] jobArguments) {
+        this.jobArguments = jobArguments;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java
new file mode 100644
index 0000000..8890370
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager;
+import picocli.CommandLine;
+
+/**
+ * Command "stop" that is used to stop TensorFlow cluster.
+ */
+@CommandLine.Command(
+    name = "stop",
+    description = "Stops a running TensorFlow cluster.",
+    mixinStandardHelpOptions = true
+)
+public class StopCommand extends AbstractCommand {
+    /** Cluster identifier. */
+    @CommandLine.Parameters(paramLabel = "CLUSTER_ID", description = "Cluster identifier.")
+    private UUID clusterId;
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        try (Ignite ignite = getIgnite()) {
+            TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite);
+            mgr.stopClusterIfExists(clusterId);
+        }
+    }
+
+    /** */
+    public void setClusterId(UUID clusterId) {
+        this.clusterId = clusterId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java
new file mode 100644
index 0000000..7949feb
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * This package contains commands that command line tool provides. Pico CLI is used to make these commands maintainable.
+ */
+package org.apache.ignite.tensorflow.submitter.command;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java
new file mode 100644
index 0000000..8288b16
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * This package contains classes that allow to use command line interface to submit jobs into TensorFlow in Apache
+ * Ignite infrastructure.
+ */
+package org.apache.ignite.tensorflow.submitter;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java
new file mode 100644
index 0000000..ece58aa
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.util;
+
+import java.io.Serializable;
+import java.util.function.Consumer;
+
+/**
+ * Serializable consumer.
+ *
+ * @param <T> The type of the input to the operation.
+ */
+public interface SerializableConsumer<T> extends Consumer<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java
new file mode 100644
index 0000000..768dbe1
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.util;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Serializable supplier.
+ *
+ * @param <T> The type of results supplied by this supplier.
+ */
+public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java
new file mode 100644
index 0000000..8ed43c3
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Util classes used in {@link org.apache.ignite.tensorflow} package.
+ */
+package org.apache.ignite.tensorflow.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/sh/ignite-tf.sh
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/sh/ignite-tf.sh b/modules/tensorflow/src/main/sh/ignite-tf.sh
new file mode 100755
index 0000000..fd3e02c
--- /dev/null
+++ b/modules/tensorflow/src/main/sh/ignite-tf.sh
@@ -0,0 +1,19 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+SCRIPT_PATH="$( cd "$(dirname "$0")" ; pwd -P )"
+java -Xmx4G -DIGNITE_QUIET=false -cp "$SCRIPT_PATH:$SCRIPT_PATH/lib/*" org.apache.ignite.tensorflow.submitter.JobSubmitter "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/sh/logback.xml
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/sh/logback.xml b/modules/tensorflow/src/main/sh/logback.xml
new file mode 100644
index 0000000..816b5e6
--- /dev/null
+++ b/modules/tensorflow/src/main/sh/logback.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!--
+    Logback configuration file.
+-->
+<configuration>
+
+    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+        <file>ignite-tf.log</file>
+        <append>false</append>
+        <encoder>
+            <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="warn">
+        <appender-ref ref="FILE" />
+    </root>
+
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java
index 7d917e7..faa2b6b 100644
--- a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java
+++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java
@@ -17,13 +17,11 @@
 
 package org.apache.ignite.tensorflow.core.longrunning;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCluster;
 import org.apache.ignite.IgniteCompute;
@@ -67,7 +65,7 @@ public class LongRunningProcessManagerTest {
 
         List<LongRunningProcess> list = Collections.singletonList(new LongRunningProcess(nodeId, () -> {}));
 
-        LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite);
+        LongRunningProcessManager mgr = new LongRunningProcessManager(ignite);
         Map<UUID, List<UUID>> res = mgr.start(list);
 
         assertEquals(1, res.size());
@@ -97,7 +95,7 @@ public class LongRunningProcessManagerTest {
         Map<UUID, List<UUID>> procIds = new HashMap<>();
         procIds.put(nodeId, Collections.singletonList(procId));
 
-        LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite);
+        LongRunningProcessManager mgr = new LongRunningProcessManager(ignite);
         Map<UUID, List<LongRunningProcessStatus>> res = mgr.ping(procIds);
 
         assertEquals(1, res.size());
@@ -127,7 +125,7 @@ public class LongRunningProcessManagerTest {
         Map<UUID, List<UUID>> procIds = new HashMap<>();
         procIds.put(nodeId, Collections.singletonList(procId));
 
-        LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite);
+        LongRunningProcessManager mgr = new LongRunningProcessManager(ignite);
         Map<UUID, List<LongRunningProcessStatus>> res = mgr.stop(procIds, true);
 
         assertEquals(1, res.size());
@@ -157,7 +155,7 @@ public class LongRunningProcessManagerTest {
         Map<UUID, List<UUID>> procIds = new HashMap<>();
         procIds.put(nodeId, Collections.singletonList(procId));
 
-        LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite);
+        LongRunningProcessManager mgr = new LongRunningProcessManager(ignite);
         Map<UUID, List<LongRunningProcessStatus>> res = mgr.clear(procIds);
 
         assertEquals(1, res.size());


Mime
View raw message