ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jennifer Coston <Jennifer.Cos...@raytheon.com>
Subject Ignite and Spark Streaming Integration Using Java
Date Wed, 16 Dec 2015 14:27:32 GMT

I am trying to understand the integration of Apache Spark Streaming and
Apache Ignite but I’m running into some errors. I’m not sure where I went
wrong, so I am going to walk through all of my steps. I have worked through
two word counting tutorials provided by Apache. In the first tutorial I
created a Java project that uses Ignite to count the words in a document on
a sliding window of 5 seconds and then query the cache to determine the
most common words. I believe it created a structure that looks like this:






In the second tutorial, I created a Java project that counts the words in a
document on a rolling window of 5 seconds and saves the output to a text
document. I believe this tutorial created a structure that looks like this:



Now comes the hard part, trying to integrate the two. Based on the
documentation online, I believe the resulting combination will create a
structure that looks like this:





I started with updating the POM file by adding the dependencies for both
Spark Streaming and Ignite. However, the code doesn’t compile if you have
the dependencies for both of them.
                                                                                 
                                                                                 
                <dependency>                                                     
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-spark</artifactId>                         
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
                <dependency>                                                     
    			<groupId>org.apache.spark</groupId>                           
    			<artifactId>spark-streaming_2.10</artifactId>                 
    			<version>1.5.2</version>                                      
    		</dependency>                                                       
                                                                                 





Here is the full file
                                                                                 
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=                
    "http://www.w3.org/2001/XMLSchema-instance"                                  
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0                     
    http://maven.apache.org/xsd/maven-4.0.0.xsd">                                
    	<modelVersion>4.0.0</modelVersion>                                      
 
    	<groupId>wordCount_Spark</groupId>                                      
 
    	<artifactId>wordCount_Spark</artifactId>                                
 
    	<version>0.0.1-SNAPSHOT</version>                                       
 
    	<name>wordCount_Spark</name>                                            
 
    	<description>Count words using spark</description>                      
 
                                                                                 
    	<dependencies>                                                            
    	<!-- Apache Ignite dependencies -->                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-core</artifactId>                          
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-spring</artifactId>                        
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-indexing</artifactId>                      
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-log4j</artifactId>                         
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-spark</artifactId>                         
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
                <!-- Spark Streaming dependencies -->                            
    		<dependency>                                                        
    			<groupId>org.apache.spark</groupId>                           
    			<artifactId>spark-streaming_2.10</artifactId>                 
    			<version>1.5.2</version>                                      
    		</dependency>                                                       
    	</dependencies>                                                           
                                                                               
     	<build>                                                                   
    		<plugins>                                                           
    			<plugin>                                                      
    				<artifactId>maven-compiler-plugin</artifactId>          
    				<version>2.5.1</version>                                
    				<configuration>                                         
    					<source>1.7</source>                              
    					<target>1.7</target>                              
    					<useIncrementalCompilation>false</                
    useIncrementalCompilation>                                                   
    				</configuration>                                        
    			</plugin>                                                     
    		</plugins>                                                          
    	</build>                                                                  
    </project>                                                                   
                                                                                 




Since I would most likely eventually be wanting to integrate Ignite into an
existing Spark Streaming application, I decided to keep the spark-streaming
dependency and perform some tests. My next step was to combine the two
projects into one project and see if I could get my JUnit tests to run.
They did, so now it was time to begin the actual integration.

Since I want to add Ignite to the Spark Streaming project, I believe I need
to pass the Spark Streaming application the file to store in the Cache
through the Spark Worker. To do this, I added the JavaIgniteContext to the
same file I placed my JavaSparkContext. Here is the file before the change:
                                                                                         
              
    package wordCount_test;                                                              
              
                                                                                         
              
    import static org.junit.Assert.*;                                                    
              
                                                                                         
              
    import java.io.File;                                                                 
              
                                                                                         
              
    import org.apache.spark.api.java.JavaPairRDD;                                        
              
    import org.apache.spark.api.java.JavaRDD;                                            
              
    import org.apache.spark.api.java.JavaSparkContext;                                   
              
    import org.junit.After;                                                              
              
    import org.junit.Before;                                                             
              
    import org.junit.Test;                                                               
              
                                                                                         
              
    import wordCount.SparkWordCount;                                                     
              
                                                                                         
              
    public class TestSparkWordCount {                                                    
              
                                                                                         
              
    	JavaSparkContext jsc;                                                               
            
    	File txtFile;                                                                       
            
                                                                                         
              
    	@Before                                                                             
            
    	public void setUp() throws Exception {                                              
            
    		jsc = new JavaSparkContext("local[2]", "testSparkWordCount");                      
       
    		txtFile = new File("AIW_WordCount");                                               
       
    		if(txtFile.exists()){                                                              
       
    			txtFile.delete();                                                                 
  
    		}                                                                                  
       
    	}                                                                                   
            
                                                                                         
              
    	@After                                                                              
            
    	public void tearDown() throws Exception {                                           
            
    		jsc.stop();                                                                        
       
    		jsc = null;                                                                        
       
    	}                                                                                   
            
                                                                                         
              
    	//@SuppressWarnings("static-access")                                                
            
    	@Test                                                                               
            
    	public void testInit() {                                                            
            
    		assertNotNull(jsc.sc());                                                           
       
    	}                                                                                   
            
                                                                                         
              
    	@Test                                                                               
            
    	public void test() {                                                                
            
    		SparkWordCount streamWords = new SparkWordCount();                                 
       
                                                                                         
              
    		try {                                                                              
       
    			JavaRDD<String> textFile = jsc                                              
        
    					.textFile(                                                               
    "C:/Users/1116962/intersect_workspace/wordCount_SI/src/main/java/wordCount/alice-in-wonderland.txt
 
    ");                                                                                  
              
    			JavaPairRDD<String, Integer> wordCounts = streamWords                       
        
    					.countWords(textFile);                                                   
                                                                                         
              
    			wordCounts.saveAsTextFile("AIW_WordCount");                                       
  
                                                                                         
              
    			Thread.sleep(6000);                                                               
  
                                                                                         
              
    		} catch (InterruptedException e) {                                                 
       
    			e.printStackTrace();                                                              
  
    		}                                                                                  
       
    		assertTrue(true);                                                                  
       
    	}                                                                                   
            
    }                                                                                    
              
                                                                                         
              




The problem appears when I try to add in the JavaIgniteContext whose
dependencies are all found in the ignite-spark dependency. Since it appears
that I can’t have them both, and I know my existing application will need
the Spark-Streaming dependency, I’m not sure how to proceed. Do you have
any suggestions?
Mime
View raw message