lucene-solr-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Solr Wiki] Update of "DataImportHandler" by MarkoBonaci
Date Mon, 20 Sep 2010 11:12:48 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Solr Wiki" for change notification.

The "DataImportHandler" page has been changed by MarkoBonaci.
http://wiki.apache.org/solr/DataImportHandler?action=diff&rev1=260&rev2=261

--------------------------------------------------

  = Scheduling =
  {i}
   * Data``Import``Handler``Scheduler
-  * Version 1.1
+  * Version 1.2
-  * Last revision: 17.09.2010.
+  * Last revision: 20.09.2010.
   * Author: Marko Bonaci
   * Enables scheduling DIH delta and/or full imports
-  * Proofs the concept, but needs more work
+  * Proofs the concept, needs polishing
   * Successfully tested on ''Apache Tomcat v6'' (should work on any servlet container)
   * Hasn't been committed to SVN (published only here)
  
  <<BR>>
  <!> Global TODO:
+ 
+  * add ''cancel'' functionality (to be able to completely disable ''DIHScheduler'' background
thread, without stopping the app/server). Currently, sync can be disabled by setting  ''syncEnabled''
param to anything other than "1" in ''dataimport.properties'', but the background thread still
remains active and reloads the properties file on every run (so that sync can be hot-redeployed)
-  * make it ''core-aware'' (to work regardless of whether a single or a multi-core Solr is
deployed)
-  * add ''cancel'' functionality (to be able to completely disable ''DIHScheduler'' background
thread, without stopping the app/server). Currently, sync can be disabled by emptying ''syncCores''
param in ''dataimport.properties'' file, but the background thread remains active and reloads
the properties file on every run (so that sync can be hot-deployed)
-  * parametrize the schedule interval in minutes (currently set to 10 minutes in ''Application``Listener's
contextInitialized'' method)
   * try to use Solr's classes wherever possible
   * add javadoc style comments
  
@@ -1060, +1059 @@

   {1} working DIH configuration in place <<BR>>
   {2} ''dataimport.properties'' file in folder ''solr.home/conf/'' with mandatory params
inside (see bellow for the example of ''dataimport.properties'') <<BR>>
  
+ {OK} Revisions:
+   v1.2:
+  * became ''core-aware'' (now works regardless of whether single or multi-core Solr is deployed)
+  * parametrized the schedule interval (in minutes)
+   v1.1:
+  * now using ''Solr``Resource``Loader'' to get ''solr.home'' (as opposed to ''System properties''
in v1.0)
+  * forces reloading of the properties file if the response code is not 200
+  * logging done using ''slf4j'' (used ''System.out'' in v1.0)
+ 
  <<BR>>
  == SolrDataImportProperties ==
   * uses [[http://download.oracle.com/javase/6/docs/api/java/util/Properties.html|java.util.Properties]]
to load settings from ''dataimport.properties''
- 
- {OK} Revisions:
-   v1.1:
-  * now using ''Solr``Resource``Loader'' to get ''solr.home'' (as opposed to ''System properties''
in v1.0)
-  * logging done using ''slf4j'' (used ''System.out'' in v1.0)
  
  {{{
  package hr.mbo.solr.dataimport;
@@ -1077, +1080 @@

  import java.io.IOException;
  import java.util.Properties;
  
- import org.apache.commons.io.FilenameUtils;
  import org.apache.solr.core.SolrResourceLoader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
  public class SolrDataImportProperties {
  	private Properties properties;
+ 	
+ 	public static final String SYNC_ENABLED		= "syncEnabled";
- 	public static final String SYNC_CORES	= "syncCores";
+ 	public static final String SYNC_CORES		= "syncCores";
- 	public static final String SERVER 	= "server";
+ 	public static final String SERVER 		= "server";
- 	public static final String PORT 	= "port";
+ 	public static final String PORT 		= "port";
- 	public static final String WEBAPP 	= "webapp";
+ 	public static final String WEBAPP 		= "webapp";
- 	public static final String PARAMS 	= "params";
+ 	public static final String PARAMS 		= "params";
+ 	public static final String INTERVAL		= "interval";
  	
  	private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class);
  	
  	public SolrDataImportProperties(){
- 		loadProperties(true);
+ //		loadProperties(true);
  	}
  	
  	public void loadProperties(boolean force){
  		try{
  			SolrResourceLoader loader = new SolrResourceLoader(null);
- 			logger.info("SolrResourceLoader instance dir: " + loader.getInstanceDir());
+ 			logger.info("Instance dir = " + loader.getInstanceDir());
  			
  			String configDir = loader.getConfigDir();
+ 			configDir = SolrResourceLoader.normalizeDir(configDir);
- 			configDir = FilenameUtils.normalizeNoEndSeparator(configDir);
- 			
  			if(force || properties == null){
  				properties = new Properties();
  							
  				String dataImportPropertiesPath = configDir + "\\dataimport.properties";
  				
  				FileInputStream fis = new FileInputStream(dataImportPropertiesPath);
- 				properties.load(fis);	
+ 				properties.load(fis);
  			}
  		}catch(FileNotFoundException fnfe){
  			logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);
@@ -1123, +1127 @@

  	
  	public String getProperty(String key){
  		return properties.getProperty(key);
- 	}
+ 	}	
- 	
  }
  }}}
  
@@ -1134, +1137 @@

   * uses ''HTTPPostScheduler'', [[http://download.oracle.com/javase/6/docs/api/java/util/Timer.html|java.util.Timer]]
and context attribute map to facilitate periodic method invocation (scheduling)
   * Timer is essentially a facility for threads to schedule tasks for future execution in
a background thread.
  
- {OK} Revisions:
-   v1.1:
-  * logging done using ''slf4j'' (used ''System.out'' in v1.0)
- 
  {{{
  package hr.mbo.solr.scheduler;
  
@@ -1160, +1159 @@

  	public void contextDestroyed(ServletContextEvent servletContextEvent) {
  		ServletContext servletContext = servletContextEvent.getServletContext();
  
- 		// get our timer from the Context
+ 		// get our timer from the context
  		Timer timer = (Timer)servletContext.getAttribute("timer");
  
- 		// cancel all pending tasks in the timers queue
+ 		// cancel all active tasks in the timers queue
  		if (timer != null)
  			timer.cancel();
  
- 		// remove the timer from the servlet context
+ 		// remove the timer from the context
  		servletContext.removeAttribute("timer");
  
  	}
@@ -1178, +1177 @@

  		try{
  			// create the timer and timer task objects
  			Timer timer = new Timer();
- 			HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName());
+ 			HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(),
timer);
- 
+ 			
+ 			// get our interval from HTTPPostScheduler
+ 			int interval = task.getIntervalInt();
+ 			
- 			// get a calendar to initialize the start time
+ 			// get a calendar to set the start time (first run)
  			Calendar calendar = Calendar.getInstance();
+ 			
+ 			// set the first run to now + interval (to avoid fireing while the app/server is starting)
+ 			calendar.add(Calendar.MINUTE, interval);
  			Date startTime = calendar.getTime();
- 
+ 			
- 			// schedule the task to run hourly
+ 			// schedule the task
- 			timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * 10);
+ 			timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval);
  
- 			// save our timer for later use
+ 			// save the timer in context
  			servletContext.setAttribute("timer", timer);
  			
  		} catch (Exception e) {
+ 			if(e.getMessage().endsWith("disabled")){
+ 				logger.info("Schedule disabled");
+ 			}else{
- 			logger.error("Problem initializing the scheduled task: ", e);
+ 				logger.error("Problem initializing the scheduled task: ", e);	
+ 			}			
  		}
- 
  	}
  
  }
@@ -1207, +1215 @@

   * uses DIH params to assemble complete URL
   * invokes URL using HTTP POST request
  
- {OK} Revisions:
-   v1.1:
-  * forces reloading of the properties file if the response code is not 200
-  * logging done using ''slf4j'' (used ''System.out'' in v1.0)
- 
- <!> TODO:
-  * explode ''params'' to specific parameters
- 
  <<BR>>
  {{{
  package hr.mbo.solr.scheduler;
@@ -1228, +1228 @@

  import java.text.DateFormat;
  import java.text.SimpleDateFormat;
  import java.util.Date;
- import java.util.Locale;
+ import java.util.Timer;
  import java.util.TimerTask;
  
  import org.slf4j.Logger;
@@ -1236, +1236 @@

  
  
  public class HTTPPostScheduler extends TimerTask {
+ 	private String syncEnabled;
  	private String[] syncCores;
  	private String server;
  	private String port;
  	private String webapp;
  	private String params;
+ 	private String interval;
+ 	private String cores;
  	private SolrDataImportProperties p;
+ 	private boolean singleCore;
  	
  	private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class);
  	
- 	public HTTPPostScheduler(String webAppName){
+ 	public HTTPPostScheduler(String webAppName, Timer t) throws Exception{
  		//load properties from global dataimport.properties
  		p = new SolrDataImportProperties();
+ 		reloadParams();
+ 		fixParams(webAppName);
+ 		
+ 		if(!syncEnabled.equals("1")) throw new Exception("Schedule disabled");
+ 		
+ 		if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){
+ 			singleCore = true;
+ 			logger.info("<index update process> Single core identified in dataimport.properties");
+ 		}else{
+ 			singleCore = false;
+ 			logger.info("<index update process> Multiple cores identified in dataimport.properties:
" + cores);
+ 		}
+ 	}
+ 	
+ 	private void reloadParams(){
+ 		p.loadProperties(true);
+ 		syncEnabled 	= p.getProperty(SolrDataImportProperties.SYNC_ENABLED);
- 		String cores = p.getProperty(SolrDataImportProperties.SYNC_CORES);
+ 		cores 		= p.getProperty(SolrDataImportProperties.SYNC_CORES);		
- 		syncCores	= cores.split(",");
  		server 		= p.getProperty(SolrDataImportProperties.SERVER);
  		port 		= p.getProperty(SolrDataImportProperties.PORT);
  		webapp 		= p.getProperty(SolrDataImportProperties.WEBAPP);
  		params 		= p.getProperty(SolrDataImportProperties.PARAMS);
- 		
- 		fixParams(webAppName);
+ 		interval	= p.getProperty(SolrDataImportProperties.INTERVAL);
+ 		syncCores = cores != null ? cores.split(",") : null;
  	}
  	
  	private void fixParams(String webAppName){
- 		if(server.isEmpty()) server = "localhost";
+ 		if(server.isEmpty()) 	server = "localhost";
- 		if(port.isEmpty()) port = "80";
+ 		if(port.isEmpty()) 	port = "80";
- 		if(webapp.isEmpty()) webapp = webAppName;
+ 		if(webapp.isEmpty()) 	webapp = webAppName;
+ 		if(interval.isEmpty() || getIntervalInt() <= 0) interval = "30";
  	}
  	
  	public void run() {
  		try{
+ 			// check mandatory params
+ 			if(server.isEmpty() || webapp.isEmpty() || params.isEmpty()){
+ 				logger.warn("<index update process> Insuficient info provided for data import");
+ 				logger.info("<index update process> Reloading global dataimport.properties");
+ 				reloadParams();
  			
+ 			// single-core
+ 			}else if(singleCore){
+ 				prepUrlSendHttpPost();
+ 
+ 			// multi-core
- 			if(syncCores.length < 1 || (syncCores.length == 1 && syncCores[0].isEmpty())){
+ 			}else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){
  				logger.warn("<index update process> No cores scheduled for data import");
  				logger.info("<index update process> Reloading global dataimport.properties");
+ 				reloadParams();
- 				p.loadProperties(true);	//listen for change in properties file
- 				
- 			}else if(server.isEmpty() || webapp.isEmpty() || params.isEmpty()){
- 				logger.warn("<index update process> Insuficient info provided for data import");
- 				logger.info("<index update process> Reloading global dataimport.properties");
- 				p.loadProperties(true);	//listen for change in properties file
  				
  			}else{
  				for(String core : syncCores){
- 					sendHttpPost(core);
+ 					prepUrlSendHttpPost(core);
  				}
  			}
  		}catch(Exception e){
@@ -1287, +1313 @@

  		}
  	}
  	
+ 	
+ 	private void prepUrlSendHttpPost(){
+ 		String coreUrl = "http://" + server + ":" + port + "/" + webapp + params;
+ 		sendHttpPost(coreUrl, null);
+ 	}
+ 	
- 	private void sendHttpPost(String coreName){
+ 	private void prepUrlSendHttpPost(String coreName){
  		String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/" + coreName + params;
  		sendHttpPost(coreUrl, coreName);	
  	}
  	
+ 	
- 	private void sendHttpPost(String completeUrl, String core){
+ 	private void sendHttpPost(String completeUrl, String coreName){
- 		DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS", new Locale("hr", "HR"));
+ 		DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
  		Date startTime = new Date();
+ 
+ 		// prepare the core var
+ 		String core = coreName == null ? "" : "[" + coreName + "] ";
  		
- 		logger.info("[" + core + "] <index update process> Process started at ..............
" + df.format(startTime));
+ 		logger.info(core + "<index update process> Process started at .............. " +
df.format(startTime));
  		
  		try{
- 			// Send HTTP POST
+ 
- 		    URL url = new URL(completeUrl);
+ 			URL url = new URL(completeUrl);
- 		    HttpURLConnection conn = (HttpURLConnection)url.openConnection();	
+ 			HttpURLConnection conn = (HttpURLConnection)url.openConnection();	
  		    
- 		    conn.setRequestMethod("POST");
+ 			conn.setRequestMethod("POST");
- 		    conn.setRequestProperty("type", "submit");
+ 			conn.setRequestProperty("type", "submit");
- 		    conn.setDoOutput(true);
+ 			conn.setDoOutput(true);
  		    
+ 			// Send HTTP POST		    
- 		    conn.connect();
+ 			conn.connect();
- 		    logger.info("[" + core + "] <index update process> Request method\t\t\t" + conn.getRequestMethod());
- 		    logger.info("[" + core + "] <index update process> Succesfully connected to
server\t" + server);		    
- 		    logger.info("[" + core + "] <index update process> Using port\t\t\t" + port);
- 		    logger.info("[" + core + "] <index update process> Application name\t\t\t" +
webapp);
- 		    logger.info("[" + core + "] <index update process> URL params\t\t\t" + params);
- 		    logger.info("[" + core + "] <index update process> Full URL\t\t\t\t" + conn.getURL());
- 		    logger.info("[" + core + "] <index update process> Response message\t\t\t" +
conn.getResponseMessage());
- 		    logger.info("[" + core + "] <index update process> Response code\t\t\t" + conn.getResponseCode());
  		    
- 		    //force reloading the properties file if an error occurs 
- 		    if(conn.getResponseCode() != 200) p.loadProperties(true);
+ 			logger.info(core + "<index update process> Request method\t\t\t" + conn.getRequestMethod());
+ 			logger.info(core + "<index update process> Succesfully connected to server\t" +
server);		    
+ 			logger.info(core + "<index update process> Using port\t\t\t" + port);
+ 			logger.info(core + "<index update process> Application name\t\t\t" + webapp);
+ 			logger.info(core + "<index update process> URL params\t\t\t" + params);
+ 			logger.info(core + "<index update process> Full URL\t\t\t\t" + conn.getURL());
+ 			logger.info(core + "<index update process> Response message\t\t\t" + conn.getResponseMessage());
+ 			logger.info(core + "<index update process> Response code\t\t\t" + conn.getResponseCode());
  		    
+ 			//listen for change in properties file if an error occurs 
+ 			if(conn.getResponseCode() != 200){
+ 				reloadParams();
+ 			}
+ 		    
- 		    conn.disconnect();
+ 			conn.disconnect();
- 		    logger.info("[" + core + "] <index update process> Disconnected from server\t\t"
+ server);
+ 			logger.info(core + "<index update process> Disconnected from server\t\t" + server);
- 		    Date endTime = new Date();
+ 			Date endTime = new Date();
- 		    logger.info("[" + core + "] <index update process> Process ended at ................
" + df.format(endTime));
+ 			logger.info(core + "<index update process> Process ended at ................ "
+ df.format(endTime));
  		}catch(MalformedURLException mue){
  			logger.error("Failed to assemble URL for HTTP POST", mue);
  		}catch(IOException ioe){
@@ -1333, +1373 @@

  		}
  	}
  
+ 	public int getIntervalInt() {
+ 		try{
+ 			return Integer.parseInt(interval);	
+ 		}catch(NumberFormatException e){
+ 			logger.warn("Unable to convert 'interval' to number. Using default value (30) instead",
e);
+ 			return 30; //return default in case of error
+ 		}
+ 	}	
  }
  }}}
  
  <<BR>>
  == dataimport.properties example ==
-  * copy/paste ''dataimport scheduler properties'' part to your ''dataimport.properties''
file (param explanations)
+  * copy everything bellow ''dataimport scheduler properties'' to your ''dataimport.properties''
file and then change params
+  * regardless of whether you have single or multiple-core Solr, use dataimport.properties
located in your solr.home/conf (NOT solr.home/core/conf)
  
  {{{
- #Tue Jul 20 15:12:52 CEST 2010
+ #Tue Jul 21 12:10:50 CEST 2010
- metadataObject.last_index_time=2010-07-20 15\:12\:47
+ metadataObject.last_index_time=2010-09-20 11\:12\:47
- last_index_time=2010-07-20 15\:12\:47
+ last_index_time=2010-09-20 11\:12\:47
  
  
  #################################################
@@ -1352, +1401 @@

  #						#
  #################################################
  
- #which cores you want to schedule [mandatory]
+ #  to sync or not to sync
+ #  1 - active; anything else - inactive
+ syncEnabled=1
+ 
+ #  which cores to schedule
+ #  in a multi-core environment you can decide which cores you want syncronized
+ #  leave empty or comment it out if using single-core deployment
  syncCores=coreHr,coreEn
  
- #solr server name or IP address [defaults to localhost if empty]
- server=
+ #  solr server name or IP address 
+ #  [defaults to localhost if empty]
+ server=localhost
  
+ #  solr server port
- #solr server port [defaults to 80 if empty]
+ #  [defaults to 80 if empty]
  port=8080
  
- #application name/context [defaults to ServletContextListener's context name (web app name)]
- webapp=solrDIHSchedulerTest
+ #  application name/context
+ #  [defaults to current ServletContextListener's context (app) name]
+ webapp=solrTest_WEB
  
- #URL params [mandatory]
+ #  URL params [mandatory]
+ #  remainder of URL
  params=/select?clean=false&commit=true&command=delta-import&qt=/dataimport&handler=/dataimport
+ 
+ #  schedule interval
+ #  number of minutes between two runs
+ #  [defaults to 30 if empty]
+ interval=10
  }}}
  
  <<BR>>

Mime
View raw message