ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexey Kukushkin <kukushkinale...@gmail.com>
Subject Re: Job Listeners
Date Wed, 11 Oct 2017 13:37:54 GMT
Hi, your jobs shall not cause any deadlocks since you have no
synchronisation inside execute(). I ran your job on 3 nodes on the same
machine OK - the job completed in about 9 seconds, which matches the random
delay inside execute(). I only had to replace executeAsync() with
execute(). The problem is you do executeAsync() and then leave the cluster.
The default deployment mode is SHARED and that causes your jobs to be
undeployed not waiting for completion. This is the output from your task on
a 3 nodes cluster on the same machine:

Server 1:
[16:27:28] Ignite node started OK (id=1c45f850)
[16:27:28] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8,
heap=3.6GB]
[16:27:30] Topology snapshot [ver=2, servers=2, clients=0, CPUs=8,
heap=7.1GB]
[16:27:32] Topology snapshot [ver=3, servers=3, clients=0, CPUs=8,
heap=11.0GB]
[16:27:41] Topology snapshot [ver=4, servers=3, clients=1, CPUs=8,
heap=14.0GB]
**************************** executed the job **********1 *
*****16:27:50:919
[16:27:50] Topology snapshot [ver=5, servers=3, clients=0, CPUs=8,
heap=11.0GB]

Server 2:
[16:27:31] Ignite node started OK (id=fdecc05f)
[16:27:31] Topology snapshot [ver=2, servers=2, clients=0, CPUs=8,
heap=7.1GB]
[16:27:32] Topology snapshot [ver=3, servers=3, clients=0, CPUs=8,
heap=11.0GB]
[16:27:41] Topology snapshot [ver=4, servers=3, clients=1, CPUs=8,
heap=14.0GB]
**************************** executed the job **********12 *
*****16:27:50:877
[16:27:50] Topology snapshot [ver=5, servers=3, clients=0, CPUs=8,
heap=11.0GB]

Server 3:
[16:27:32] Ignite node started OK (id=34848c7b)
[16:27:32] Topology snapshot [ver=3, servers=3, clients=0, CPUs=8,
heap=11.0GB]
[16:27:41] Topology snapshot [ver=4, servers=3, clients=1, CPUs=8,
heap=14.0GB]
**************************** executed the job **********11 *
*****16:27:50:948
[16:27:50] Topology snapshot [ver=5, servers=3, clients=0, CPUs=8,
heap=11.0GB]

Client:
[16:27:41] Ignite node started OK (id=efb93a25)
[16:27:41] Topology snapshot [ver=4, servers=3, clients=1, CPUs=8,
heap=14.0GB]
[16:27:51] Ignite node stopped OK [uptime=00:00:09.344]


On Tue, Oct 10, 2017 at 3:33 PM, chandrika <chandrika.apm@gmail.com> wrote:

> Hello Alexey,
>
>
> the sample code is as given below:
>
> @ComputeTaskSessionFullSupport
> public class SplitExampleJgraphWithComplexDAGIgniteCachesample extends
> ComputeTaskSplitAdapter<CustomDirectedAcyclicGraph&lt;String,
> DefaultEdge> ,
> Integer> {
>
>
>         // Auto-injected task session.
>     @TaskSessionResource
>     private ComputeTaskSession ses;
>
>
>     private static final Random random = new Random();
>     static int noOftasksExecutedSuccess = 0;
>
>     SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss:SSS");
>
>
>
>
>     @Override protected Collection<? extends ComputeJob> split(int
> clusterSize, CustomDirectedAcyclicGraph<String, DefaultEdge> graph) {
>         Collection<ComputeJob> jobs = new LinkedList<>();
>
>         IgniteCache<String, Object> cacheUp =
> Ignition.ignite().getOrCreateCache("cacheNameNew");
>         ses.addAttributeListener((key, val) -> {
>             if ("COMPLETE".compareTo(key.toString()) == 0) {
>                 nextTaskToExecute(graph, cacheUp);
>                 }
>         }, false);
>
>         String task = null;
>         if (cacheUp.get("CurrentVertex") != null)
>                 task = (String) cacheUp.get("CurrentVertex");
>         for (DefaultEdge outgoingEdge : graph.outgoingEdgesOf(task)) {
>                 String sourceVertex = graph.getEdgeSource(outgoingEdge);
>             String targetVertex = graph.getEdgeTarget(outgoingEdge);
>             graph.setTargetVertex(targetVertex);
>             executingJobsBuilt(graph, jobs);
>         } if (task != null && graph.outgoingEdgesOf(task).size() == 0) {
>                 if (cacheUp.get(task) != null &&
> (Boolean)cacheUp.get(task)) {
>                         String targetVertex = setNextVertexInCache(graph,
> cacheUp);
>                         graph.setTargetVertex(targetVertex);
>                         nextTaskToExecute(graph, cacheUp);
>                 } else {
>                         System.out.println("else parttttt");
>                 }
>         }
>         return jobs;
>     }
>
>
>
>         private void nextTaskToExecute(CustomDirectedAcyclicGraph<String,
> DefaultEdge> graph,
>                         IgniteCache<String, Object> cacheUp) {
>                 Ignite ignite = Ignition.ignite();
>                 if (cacheUp.get("NextVertex") != null) {
>                         String processingVertex = (String)
> cacheUp.get("NextVertex");
>                         if (processingVertex != null &&
> areParentVerticesProcessed(graph,
> processingVertex, cacheUp)) {
>                                 cacheUp.put("CurrentVertex",
> processingVertex);
>                                 // Execute task on the cluster and wait
> for its completion.
>
> ignite.compute().execute(SplitExampleJgraphWithComplexD
> AGIgniteCachesample.class,
> graph);
>                         }
>                 }
>         }
>
>     private void executingJobsBuilt(CustomDirectedAcyclicGraph<String,
> DefaultEdge> graph, Collection<ComputeJob> jobs) {
>         String targetVertex = graph.getTargetVertex();
>         IgniteCache<String, Object> cacheNew =
> Ignition.ignite().getOrCreateCache("cacheNameNew");
>        if (targetVertex != null && !cacheNew.containsKey(targetVertex)) {
>            jobs.add(new ComputeJobAdapter() {
>                    // Auto-injected job context.
>                     @JobContextResource
>                     private ComputeJobContext jobCtx;
>
>                @Nullable @Override public Object execute() {
>                    int duration1 = 8000 + random.nextInt(100);
>                    SimpleDateFormat dateFormatNew = new
> SimpleDateFormat("HH:mm:ss:SSS");
>                    String task = (String) targetVertex;
>                    try {
>                            Thread.sleep(duration1);
>
> System.out.println("****************************executed the job
> **********
> " + task +  "******" + dateFormatNew.format(new Date()));
>                         cacheNew.put(task, true);
>                        } catch (Exception e1) {
>                                                 e1.printStackTrace();
>                                         }
>                                         ses.setAttribute("NEXTVERTEX",
> setNextVertexInCache(graph, cacheNew));
>                                         ses.setAttribute("COMPLETE",
> duration1);
>                    return duration1;
>                }
>
>            });
>
>            }
>
>    }
>
>
>         private String setNextVertexInCache(CustomDirectedAcyclicGraph<
> String,
> DefaultEdge> graph, IgniteCache<String, Object> cache) {
>                 String task = null;
>                 Set<String> dagSourceVertex = graph.vertexSet();
>             Iterator itr = dagSourceVertex.iterator();
>               while (itr.hasNext()) {
>                 task = (String)itr.next();
>                         if(cache.get("CurrentVertex") != null &&
> !task.equalsIgnoreCase((String)cache.get("CurrentVertex")))
>                                         continue;
>                         else {
>                                 task = (String)itr.next();
>                                 cache.put("NextVertex", task);
>                                 break;
>                         }
>                 }
>                 return task;
>         }
>
>
>
>         private Boolean
> areParentVerticesProcessed(CustomDirectedAcyclicGraph<String, DefaultEdge>
> graph, String task, IgniteCache<String, Object> cache) {
>                 Boolean processed = false;
>                 for (DefaultEdge incomingEdge :
> graph.incomingEdgesOf(task)) {
> //graph.outgoingEdgesOf(dagSourceVertex)
>                 String sourceVertex = graph.getEdgeSource(incomingEdge);
>             String targetVertex = graph.getEdgeTarget(incomingEdge);
>             if (cache!= null && cache.get(sourceVertex) != null) {
>                 processed = true;
>             }
>                 }
>                 return processed;
>         }
>
>     /** {@inheritDoc} */
>     @Nullable @Override public Integer reduce(List<ComputeJobResult>
> results) {
>         int sum = 0;
>
>         for (ComputeJobResult res : results) {
>                 sum += res.<Integer>getData();
>         }
>
>
>         return sum;
>     }
> }
>
>
> ******************************************************************
> the call for the same is as given below:
> ignite.compute().executeAsync(SplitExampleJgraphWithComplexD
> AGIgniteCache.class,
> buildGraph());
>
> buildGraph is as given below:
>
> private CustomDirectedAcyclicGraph<String, DefaultEdge> buildGraph() {
>
>                 CustomDirectedAcyclicGraph<String, DefaultEdge> graph =
>                             new CustomDirectedAcyclicGraph<String,
> DefaultEdge>(DefaultEdge.class);
>
>                                 String root = "root";
>                         String a = "1";
>                         String b = "2";
>                         String c = "3";
>                         String d = "4";
>                         String e = "5";
>                         String f = "6";
>                         String g = "7";
>                         String h = "8";
>                         String i = "9";
>                         String j = "10";
>                         String k = "11";
>                         String l = "12";
>                         String m = "13";
>                         String n = "14";
>
>                         graph.addVertex(root);
>                         graph.addVertex(a);
>                         graph.addVertex(l);
>                         graph.addVertex(k);
>
>                         graph.addVertex(b);
>                         graph.addVertex(c);
>                         graph.addVertex(m);
>
>                         graph.addVertex(i);
>                         graph.addVertex(h);
>                         graph.addVertex(g);
>
>                         graph.addVertex(f);
>                         graph.addVertex(e);
>                         graph.addVertex(d);
>                         graph.addVertex(n);
>                         graph.addVertex(j);
>
>                         DefaultEdge edg = graph.addEdge(root, a);
>                         graph.addEdge(root, l);
>                         graph.addEdge(root, k);
>
>                         graph.addEdge(a, b);
>                         graph.addEdge(a, c);
>                         graph.addEdge(l, m);
>
>                         graph.addEdge(b, i);
>                         graph.addEdge(b, h);
>                         graph.addEdge(b, g);
>
>                         graph.addEdge(c, f);
>                         graph.addEdge(c, e);
>                         graph.addEdge(c, d);
>
>                         graph.addEdge(m, d);
>                         graph.addEdge(m, n);
>
>                         graph.addEdge(i, j);
>
>                         graph.setCurrentVertex(root);
>
>
>
>                         return graph;
>         }
>
>
> hope that is sufficient for u to test the same at ur end on a single node,
> as it is working fine on three nodes.
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>



-- 
Best regards,
Alexey

Mime
View raw message