ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ch...@apache.org
Subject ignite git commit: IGNITE-10133: ML: Switch to per-node TensorFlow worker strategy
Date Wed, 07 Nov 2018 14:41:42 GMT
Repository: ignite
Updated Branches:
  refs/heads/master abfc403df -> ea33ec7f0


IGNITE-10133: ML: Switch to per-node TensorFlow worker strategy


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ea33ec7f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ea33ec7f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ea33ec7f

Branch: refs/heads/master
Commit: ea33ec7f0af8fcad113cd92953fba0e8e5502dfa
Parents: abfc403
Author: Anton Dmitriev <dmitrievanthony@gmail.com>
Authored: Wed Nov 7 17:41:35 2018 +0300
Committer: Yury Babak <ybabak@gridgain.com>
Committed: Wed Nov 7 17:41:35 2018 +0300

----------------------------------------------------------------------
 .../cluster/tfrunning/TensorFlowServerManager.java    |  2 +-
 .../tfrunning/TensorFlowServerScriptFormatter.java    |  2 +-
 .../cluster/util/TensorFlowClusterResolver.java       | 12 +++++++++++-
 .../util/TensorFlowProcessBuilderSupplier.java        | 14 +++++++-------
 4 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ea33ec7f/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
index ed6c801..589d1d7 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
@@ -60,7 +60,7 @@ public class TensorFlowServerManager extends ProcessManagerWrapper<NativeProcess
         return new NativeProcess(
             new TensorFlowProcessBuilderSupplier(
                 true,
-                spec.getTaskIdx(),
+                true,
                 "job:" + spec.getJobName(),
                 "task:" + spec.getTaskIdx()
             ),

http://git-wip-us.apache.org/repos/asf/ignite/blob/ea33ec7f/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
index ad77d16..fc90ef0 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
@@ -78,7 +78,7 @@ public class TensorFlowServerScriptFormatter {
             .append("\n");
         builder.append("print('IGNITE_DATASET_HOST = ', os.environ.get('IGNITE_DATASET_HOST'))").append("\n");
         builder.append("print('IGNITE_DATASET_PORT = ', os.environ.get('IGNITE_DATASET_PORT'))").append("\n");
-        builder.append("print('IGNITE_DATASET_PART = ', os.environ.get('IGNITE_DATASET_PART'))").append("\n");
+        builder.append("print('IGNITE_DATASET_LOCAL = ', os.environ.get('IGNITE_DATASET_LOCAL'))").append("\n");
 
         builder.append("os.environ['TF_CONFIG'] = '").append(formatTfConfigVar(srv, ignite)).append("'\n");
         builder.append("server = tf.contrib.distribute.run_standard_tensorflow_server()").append("\n");

http://git-wip-us.apache.org/repos/asf/ignite/blob/ea33ec7f/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java
index 846af71..87aafaf 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.tensorflow.cluster.util;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -96,12 +101,17 @@ public class TensorFlowClusterResolver {
         Affinity<?> affinity = ignite.affinity(upstreamCacheName);
         int parts = affinity.partitions();
 
+        Set<UUID> distinctNodeIds = new HashSet<>();
         for (int part = 0; part < parts; part++) {
             ClusterNode node = affinity.mapPartitionToNode(part);
             UUID nodeId = node.id();
+            distinctNodeIds.add(nodeId);
+        }
+        List<UUID> nodeIds = new ArrayList<>(distinctNodeIds);
+        Collections.sort(nodeIds);
 
+        for (UUID nodeId : nodeIds) {
             int port = portMgr.acquirePort(nodeId);
-
             spec.addTask(WORKER_JOB_NAME, nodeId, port);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ea33ec7f/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java
index 8b95526..73f4b4b 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java
@@ -34,19 +34,19 @@ public class TensorFlowProcessBuilderSupplier extends PythonProcessBuilderSuppli
     /** Prefix for worker environment variables. */
     private static final String ENV_PREFIX = "IGNITE_DATASET_";
 
-    /** Partition of the upstream cache. */
-    private final Integer part;
+    /** Upstream cache query local mode. */
+    private final Boolean loc;
 
     /**
      * Constructs a new instance of Python process builder supplier.
      *
      * @param interactive Interactive flag (allows to used standard input to pass Python
script).
-     * @param part Partition index.
+     * @param loc Upstream cache query local mode.
      * @param meta Meta information that adds to script as arguments.
      */
-    public TensorFlowProcessBuilderSupplier(boolean interactive, Integer part, String...
meta) {
+    public TensorFlowProcessBuilderSupplier(boolean interactive, Boolean loc, String... meta)
{
         super(interactive, meta);
-        this.part = part;
+        this.loc = loc;
     }
 
     /** {@inheritDoc} */
@@ -64,8 +64,8 @@ public class TensorFlowProcessBuilderSupplier extends PythonProcessBuilderSuppli
         if (port != null)
             env.put(ENV_PREFIX + "PORT", String.valueOf(port));
 
-        if (part != null)
-            env.put(ENV_PREFIX + "PART", String.valueOf(part));
+        if (loc != null)
+            env.put(ENV_PREFIX + "LOCAL", String.valueOf(loc));
 
         return pythonProcBuilder;
     }


Mime
View raw message