ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jcoston <jennifer.cos...@raytheon.com>
Subject Re: Ignite and Spark Streaming Integration Using Java
Date Mon, 21 Dec 2015 19:26:07 GMT
I discovered the source of the issue. For some reason it was necessary to add
another Ignition.start() to my countWords method. If anyone can explain why
I would greatly appreciate it. 

Here is the final method:


	public JavaPairRDD<String, Integer> countWords(JavaRDD<String> textFile)
throws Exception {
		
		System.out.println("Made it intoSparkWordCount. textFile = " +
textFile.toString());
		
        //Split the lines into words
		JavaRDD<String> words = textFile
				.flatMap(new FlatMapFunction<String, String>() {
					public Iterable<String> call(String s) {
						return Arrays.asList(s.split(" "));
					}
				});
		
		System.out.println("Split the lines into words");
		//Count the words
		JavaPairRDD<String, Integer> pairs = words
				.mapToPair(new PairFunction<String, String, Integer>() {
					public Tuple2<String, Integer> call(String s) {
						return new Tuple2<String, Integer>(s, 1);
					}
				});
		System.out.println("Counted the words Part 1");
		JavaPairRDD<String, Integer> counts = pairs
				.reduceByKey(new Function2<Integer, Integer, Integer>() {
					public Integer call(Integer a, Integer b) {
						return a + b;
					}
				});
		System.out.println("Counted the words Part 2");
		
		// Mark this cluster member as client.
		*Ignition.start();
*        Ignition.setClientMode(true);

        try (Ignite ignite =
Ignition.start("src/main/resources/config/example-ignite.xml")) {
            if (!ExamplesUtils.hasServerNodes(ignite))
                return counts;

            // The cache is configured with sliding window holding 5 seconds
of the streaming data.
            IgniteCache<AffinityUuid, String> stmCache =
ignite.getOrCreateCache(CacheConfig.wordCache());
            System.out.println("Configured Cache " + stmCache.getName());

            try (IgniteDataStreamer<AffinityUuid, String> stmr =
ignite.dataStreamer(stmCache.getName())) {
                // Stream words from "alice-in-wonderland" book.
                while (true) {
                    InputStream in =
SparkIgniteWordCount.class.getResourceAsStream("alice-in-wonderland.txt");
                     
                    try (LineNumberReader rdr = new LineNumberReader(new
InputStreamReader(in))) {
                        for (String line = rdr.readLine(); line != null;
line = rdr.readLine()) {
                            for (String word : line.split(" "))
                                if (!word.isEmpty())
                                    // Stream words into Ignite.
                                    // By using AffinityUuid we ensure that
identical
                                    // words are processed on the same
cluster node.
                                    stmr.addData(new AffinityUuid(word),
word);
                        }
                    }
                }
            } 
        }
        catch(Exception e){
        	e.printStackTrace();
        	throw e;
        }finally{
        	return counts;
        }
	}
}




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Re-Ignite-and-Spark-Streaming-Integration-Using-Java-tp2255p2268.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Mime
View raw message