ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chandrika <chandrika....@gmail.com>
Subject Re: Job Listeners
Date Tue, 10 Oct 2017 12:33:31 GMT
Hello Alexey,


the sample code is as given below:

@ComputeTaskSessionFullSupport
public class SplitExampleJgraphWithComplexDAGIgniteCachesample extends
ComputeTaskSplitAdapter<CustomDirectedAcyclicGraph&lt;String, DefaultEdge> ,
Integer> {
	
   
	// Auto-injected task session.
    @TaskSessionResource 
    private ComputeTaskSession ses;
    
    
    private static final Random random = new Random();
    static int noOftasksExecutedSuccess = 0;
    
    SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss:SSS");

    

    
    @Override protected Collection<? extends ComputeJob> split(int
clusterSize, CustomDirectedAcyclicGraph<String, DefaultEdge> graph) {
        Collection<ComputeJob> jobs = new LinkedList<>();
        
        IgniteCache<String, Object> cacheUp = 
Ignition.ignite().getOrCreateCache("cacheNameNew");
        ses.addAttributeListener((key, val) -> {
            if ("COMPLETE".compareTo(key.toString()) == 0) {
            	nextTaskToExecute(graph, cacheUp);
                }
        }, false);
        
        String task = null;
        if (cacheUp.get("CurrentVertex") != null)
        	task = (String) cacheUp.get("CurrentVertex");
        for (DefaultEdge outgoingEdge : graph.outgoingEdgesOf(task)) {
        	String sourceVertex = graph.getEdgeSource(outgoingEdge);
            String targetVertex = graph.getEdgeTarget(outgoingEdge);
            graph.setTargetVertex(targetVertex);
            executingJobsBuilt(graph, jobs);
        } if (task != null && graph.outgoingEdgesOf(task).size() == 0) {
        	if (cacheUp.get(task) != null && (Boolean)cacheUp.get(task)) {
        		String targetVertex = setNextVertexInCache(graph, cacheUp);
        		graph.setTargetVertex(targetVertex);
        		nextTaskToExecute(graph, cacheUp);
        	} else {
        		System.out.println("else parttttt");
        	}
        }
        return jobs;
    }

    
    
	private void nextTaskToExecute(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph,
			IgniteCache<String, Object> cacheUp) {
		Ignite ignite = Ignition.ignite();
		if (cacheUp.get("NextVertex") != null) {
			String processingVertex = (String) cacheUp.get("NextVertex");
			if (processingVertex != null && areParentVerticesProcessed(graph,
processingVertex, cacheUp)) {
				cacheUp.put("CurrentVertex", processingVertex);
				// Execute task on the cluster and wait for its completion.
			
ignite.compute().execute(SplitExampleJgraphWithComplexDAGIgniteCachesample.class,
graph);
			}
		}
	}
    
    private void executingJobsBuilt(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph, Collection<ComputeJob> jobs) {
   	String targetVertex = graph.getTargetVertex();
   	IgniteCache<String, Object> cacheNew = 
Ignition.ignite().getOrCreateCache("cacheNameNew");
       if (targetVertex != null && !cacheNew.containsKey(targetVertex)) {
           jobs.add(new ComputeJobAdapter() {
        	   // Auto-injected job context.
        	    @JobContextResource
        	    private ComputeJobContext jobCtx;
        	    
               @Nullable @Override public Object execute() {
            	   int duration1 = 8000 + random.nextInt(100);
            	   SimpleDateFormat dateFormatNew = new
SimpleDateFormat("HH:mm:ss:SSS");
                   String task = (String) targetVertex;
                   try {
                	   Thread.sleep(duration1);
                       
System.out.println("****************************executed the job **********  
" + task +  "******" + dateFormatNew.format(new Date()));
                       	cacheNew.put(task, true);
	               } catch (Exception e1) {
						e1.printStackTrace();
					} 
					ses.setAttribute("NEXTVERTEX", setNextVertexInCache(graph, cacheNew));
					ses.setAttribute("COMPLETE", duration1);
            	   return duration1;
               }
               
           });
           
           }
       
   }
    

	private String setNextVertexInCache(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph, IgniteCache<String, Object> cache) {
		String task = null; 
		Set<String> dagSourceVertex = graph.vertexSet();
	    Iterator itr = dagSourceVertex.iterator();
	      while (itr.hasNext()) {
	      	task = (String)itr.next();
		      	if(cache.get("CurrentVertex") != null &&
!task.equalsIgnoreCase((String)cache.get("CurrentVertex")))
		      			continue;
		      	else {
		      		task = (String)itr.next();
		      		cache.put("NextVertex", task);
		      		break;
		      	}
	        }
		return task;
	}
	
	
	
	private Boolean
areParentVerticesProcessed(CustomDirectedAcyclicGraph<String, DefaultEdge>
graph, String task, IgniteCache<String, Object> cache) {
		Boolean processed = false;
		for (DefaultEdge incomingEdge : graph.incomingEdgesOf(task)) {
//graph.outgoingEdgesOf(dagSourceVertex)
        	String sourceVertex = graph.getEdgeSource(incomingEdge);
            String targetVertex = graph.getEdgeTarget(incomingEdge);
            if (cache!= null && cache.get(sourceVertex) != null) {
            	processed = true;
            }
		}
		return processed;
	}
	
    /** {@inheritDoc} */
    @Nullable @Override public Integer reduce(List<ComputeJobResult>
results) {
        int sum = 0;

        for (ComputeJobResult res : results) {
        	sum += res.<Integer>getData();
        }
            

        return sum;
    }
}


******************************************************************
the call for the same is as given below:
ignite.compute().executeAsync(SplitExampleJgraphWithComplexDAGIgniteCache.class,
buildGraph());

buildGraph is as given below:

private CustomDirectedAcyclicGraph<String, DefaultEdge> buildGraph() {
		
		CustomDirectedAcyclicGraph<String, DefaultEdge> graph =
		            new CustomDirectedAcyclicGraph<String,
DefaultEdge>(DefaultEdge.class);

				String root = "root";
		        String a = "1";
		        String b = "2";
		        String c = "3";
		        String d = "4";
		        String e = "5";
		        String f = "6";
		        String g = "7";
		        String h = "8";
		        String i = "9";
		        String j = "10";
		        String k = "11";
		        String l = "12";
		        String m = "13";
		        String n = "14";

		        graph.addVertex(root);
		        graph.addVertex(a);
		        graph.addVertex(l);
		        graph.addVertex(k);
	        
		        graph.addVertex(b);
		        graph.addVertex(c);
		        graph.addVertex(m);
		        
		        graph.addVertex(i);
		        graph.addVertex(h);
		        graph.addVertex(g);
		        		        
		        graph.addVertex(f);
		        graph.addVertex(e);
		        graph.addVertex(d);
		        graph.addVertex(n);
		        graph.addVertex(j);
		        
		        DefaultEdge edg = graph.addEdge(root, a);
		        graph.addEdge(root, l);
		        graph.addEdge(root, k);
		        
		        graph.addEdge(a, b);
		        graph.addEdge(a, c);
		        graph.addEdge(l, m);
		       
		        graph.addEdge(b, i);
		        graph.addEdge(b, h);
		        graph.addEdge(b, g);
		        
		        graph.addEdge(c, f);
		        graph.addEdge(c, e);
		        graph.addEdge(c, d);
		        
		        graph.addEdge(m, d);
		        graph.addEdge(m, n);
		        
		        graph.addEdge(i, j);
		        
		        graph.setCurrentVertex(root);
		        
		        
		        
		        return graph;
	}


hope that is sufficient for u to test the same at ur end on a single node,
as it is working fine on three nodes.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Mime
View raw message