marmotta-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ja...@apache.org
Subject [1/8] git commit: MARMOTTA-345: Started restructuring or the local file import MARMOTTA-294 #resolve #comment implemented MARMOTTA-295 #resolve #comment implemented
Date Fri, 25 Oct 2013 08:11:44 GMT
Updated Branches:
  refs/heads/develop c8cd2d3e2 -> b3e26f792


MARMOTTA-345: Started restructuring or the local file import
MARMOTTA-294 #resolve #comment implemented
MARMOTTA-295 #resolve #comment implemented


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/3d388d38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/3d388d38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/3d388d38

Branch: refs/heads/develop
Commit: 3d388d38da2bccd19673bd9200e449bcaed9c2d6
Parents: 6fd4e02
Author: Jakob Frank <jakob@apache.org>
Authored: Thu Oct 17 17:22:20 2013 +0200
Committer: Jakob Frank <jakob@apache.org>
Committed: Thu Oct 17 17:22:20 2013 +0200

----------------------------------------------------------------------
 .../commons/nio/watch/AbstractTreeWatcher.java  | 250 ++++++++++
 .../commons/nio/watch/SimpleTreeWatcher.java    |  78 +++
 .../core/api/importer/ImportWatchService.java   |  22 +-
 .../importer/ImportWatchServiceImpl.java        | 497 +++++++++++++------
 4 files changed, 689 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/3d388d38/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/nio/watch/AbstractTreeWatcher.java
----------------------------------------------------------------------
diff --git a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/nio/watch/AbstractTreeWatcher.java
b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/nio/watch/AbstractTreeWatcher.java
new file mode 100644
index 0000000..f92bdf1
--- /dev/null
+++ b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/nio/watch/AbstractTreeWatcher.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.marmotta.commons.nio.watch;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
+
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AbstractTreeWatcher is a convenience wrapper around the Java7 {@link WatchService}.
+ * In most cases you will use the {@link SimpleTreeWatcher}
+ * 
+ * @see SimpleTreeWatcher
+ * 
+ * @author Jakob Frank <jakob@apache.org>
+ * 
+ */
+public abstract class AbstractTreeWatcher implements Runnable {
+
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+    private final HashMap<WatchKey, Path> pathTable;
+    protected final Path root;
+    protected final boolean recursive;
+
+    private WatchService watchService;
+
+    /**
+     * Create a new {@link AbstractTreeWatcher} watching on the target path.
+     * @param target the {@link Path} to watch, must be a directory.
+     * @param recursive will also watch subdirectories if <code>true</code>.
+     */
+    public AbstractTreeWatcher(Path target, boolean recursive) {
+        this.root = target;
+        this.recursive = recursive;
+        this.pathTable = new HashMap<WatchKey, Path>();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+        log.debug("file-system watcher on {} ({}) starting up...", root, recursive?"recursive":"non-recursive");
+        try {
+            synchronized (this) {
+                if (watchService == null) {
+                    watchService = FileSystems.getDefault().newWatchService();
+                }
+            }
+            pathTable.clear();
+            if (recursive) {
+                registerAll(watchService, root);
+            } else {
+                register(watchService, root);
+            }
+            log.debug("watching...");
+            while (true) {
+                try {
+                    final WatchKey key = watchService.take();
+
+                    final Path parent = pathTable.get(key);
+                    if (parent == null) {
+                        log.warn("WatchKey not recognized: {}, ignoring event", key);
+                        continue;
+                    }
+                    try {
+                        for (WatchEvent<?> event : key.pollEvents()) {
+                            final WatchEvent.Kind<?> kind = event.kind();
+
+                            if (kind == OVERFLOW) {
+                                log.trace("overflow event for {}", parent);
+                                continue;
+                            }
+
+                            @SuppressWarnings("unchecked")
+                            final WatchEvent<Path> pathEvent = (WatchEvent<Path>)
event;
+
+                            final Path localPath = pathEvent.context();
+                            if (localPath == null) {
+                                log.warn("Could not get context for %s in %s", kind, parent);
+                                continue;
+                            }
+                            final Path target = parent.resolve(localPath);
+                            if (kind == ENTRY_CREATE) {
+                                if (Files.isDirectory(target)) {
+                                    log.trace("created dir: {}", target);
+                                    onDirectoryCreated(target);
+                                    if (recursive) {
+                                        registerAll(watchService, target);
+                                    }
+                                } else {
+                                    log.trace("created file: {}", target);
+                                    onFileCreated(target);
+                                }
+                                log.trace("new child in {}: {}", parent, localPath);
+                                onChildCreated(parent, target);
+                            } else if (kind == ENTRY_MODIFY) {
+                                log.trace("modified file: {}", target);
+                                onFileModified(target);
+                            } else if (kind == ENTRY_DELETE) {
+                                log.trace("deleted child in {}: {}", parent, localPath);
+                                onChildDeleted(parent, target);
+                            } else {
+                                log.error("Unexpected event type: {}", kind);
+                                continue;
+                            }
+                        }
+                    } finally {
+                        if (!key.reset()) {
+                            pathTable.remove(key);
+                        }
+                    }
+                } catch (InterruptedException | ClosedWatchServiceException e) {
+                    log.trace("shutting down...");
+                    break;
+                }
+            }
+            watchService.close();
+            watchService = null;
+            log.info("file-system watcher on {} ({}) stopped.", root, recursive?"recursive":"non-recursive");
+        } catch (IOException e) {
+            log.error("file-system watcher on {} ({}) died: {}", root, recursive?"recursive":"non-recursive",
e.getMessage());
+        } finally {
+            
+        }
+    }
+
+    /**
+     * Register the start dir and all subdirs for changes.
+     * @param watcher the {@link WatchService} to register to
+     * @param start the start directory
+     * @throws IOException
+     */
+    private void registerAll(final WatchService watcher, final Path start) throws IOException
{
+        Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
+                register(watcher, dir);
+                return FileVisitResult.CONTINUE;
+            }
+        });
+    }
+
+    /**
+     * Register the provided dir for changes.
+     * @param watcher the {@link WatchService} to register to
+     * @param dir the dir to register
+     * @return the registered {@link WatchKey}
+     * @throws IOException
+     */
+    private WatchKey register(WatchService watcher, Path dir) throws IOException {
+        WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+
+        Path prev = pathTable.get(key);
+        if (prev == null) {
+            log.trace("new watch on {}", dir);
+        } else {
+            if (!dir.equals(prev)) {
+                log.trace("updated watch on {} -> {}", prev, dir);
+            }
+        }
+        pathTable.put(key, dir);
+        return key;
+    }
+
+    /**
+     * Shutdown the treewatcher.
+     * @throws IOException
+     */
+    public void shutdown() throws IOException {
+        if (watchService != null) {
+            watchService.close();
+        }
+    }
+
+    /**
+     * Notification hook for created directories
+     * @param createdDir the path of the created directory.
+     * @see #onChildCreated(Path, Path)
+     */
+    public abstract void onDirectoryCreated(Path createdDir);
+
+    /**
+     * Notification hook for created files.
+     * <strong>Note:</strong> after creation also a {@link #onFileModified(Path)}
is invoked.
+     * @param createdFile the path of the created file
+     * @see #onFileModified(Path)
+     * @see #onChildCreated(Path, Path)
+     */
+    public abstract void onFileCreated(Path createdFile);
+
+    /**
+     * Notificaton hook for modified files.
+     * <strong>Note:</strong> this hook is also invoked after file creation.
+     * @param modifiedFile the path of the modified file
+     * @see #onFileCreated(Path)
+     */
+    public abstract void onFileModified(Path modifiedFile);
+
+    /**
+     * Notification hook for a created child in a directory.
+     * @param parent the container of the newly created child
+     * @param child the created child
+     * @see #onFileCreated(Path)
+     * @see #onDirectoryCreated(Path)
+     */
+    public abstract void onChildCreated(Path parent, Path child);
+
+    /**
+     * Notification hook for a created child in a directory
+     * @param parent the container from which the child was deleted
+     * @param child the local path of the deleted child
+     */
+    public abstract void onChildDeleted(Path parent, Path child);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/3d388d38/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/nio/watch/SimpleTreeWatcher.java
----------------------------------------------------------------------
diff --git a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/nio/watch/SimpleTreeWatcher.java
b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/nio/watch/SimpleTreeWatcher.java
new file mode 100644
index 0000000..650b3ae
--- /dev/null
+++ b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/nio/watch/SimpleTreeWatcher.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.marmotta.commons.nio.watch;
+
+import java.nio.file.Path;
+
+/**
+ * {@link SimpleTreeWatcher} with empty stub implementations of all abstract methods from
{@link AbstractTreeWatcher}.
+ * 
+ * @author Jakob Frank <jakob@apache.org>
+ *
+ */
+public class SimpleTreeWatcher extends AbstractTreeWatcher {
+
+    /**
+     * @param target
+     * @param recursive
+     */
+    public SimpleTreeWatcher(Path target, boolean recursive) {
+        super(target, recursive);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.marmotta.commons.nio.watch.AbstractTreeWatcher#onDirectoryCreated(java.nio.file.Path)
+     */
+    @Override
+    public void onDirectoryCreated(Path createdDir) {
+        // Empty placeholder hook, overwrite if required
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.marmotta.commons.nio.watch.AbstractTreeWatcher#onFileCreated(java.nio.file.Path)
+     */
+    @Override
+    public void onFileCreated(Path createdFile) {
+        // Empty placeholder hook, overwrite if required    
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.marmotta.commons.nio.watch.AbstractTreeWatcher#onFileModified(java.nio.file.Path)
+     */
+    @Override
+    public void onFileModified(Path modifiedFile) {
+        // Empty placeholder hook, overwrite if required
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.marmotta.commons.nio.watch.AbstractTreeWatcher#onChildCreated(java.nio.file.Path,
java.nio.file.Path)
+     */
+    @Override
+    public void onChildCreated(Path parent, Path child) {
+        // Empty placeholder hook, overwrite if required
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.marmotta.commons.nio.watch.AbstractTreeWatcher#onChildDeleted(java.nio.file.Path,
java.nio.file.Path)
+     */
+    @Override
+    public void onChildDeleted(Path parent, Path child) {
+        // Empty placeholder hook, overwrite if required
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/3d388d38/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
index 6472276..75b6089 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
@@ -18,9 +18,9 @@
 package org.apache.marmotta.platform.core.api.importer;
 
 import java.io.File;
+import java.nio.file.Path;
 
-import org.apache.marmotta.platform.core.events.SystemStartupEvent;
-import org.openrdf.model.URI;
+import org.apache.marmotta.platform.core.exception.io.MarmottaImportException;
 
 /**
  * A service for watching import directory
@@ -31,18 +31,20 @@ import org.openrdf.model.URI;
 public interface ImportWatchService {
 
 	/**
-	 * Initialize the directory watching, performing an importation
-	 * of new files copied there
-	 */
-	public void initialize(SystemStartupEvent event);
-
-	/**
 	 * Import an observed item 
 	 * 
 	 * @param file
-	 * @param context
 	 * @return
+	 * @throws MarmottaImportException 
 	 */
-	boolean execImport(File file, URI context);
+    boolean importFile(File file) throws MarmottaImportException;
+
+    boolean importFile(Path file) throws MarmottaImportException;
+
+    void startup();
+
+    void shutdown();
+
+    Path getImportRoot();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/3d388d38/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index c7edb6f..1448a38 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
@@ -24,23 +24,27 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URISyntaxException;
 import java.net.URLDecoder;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardWatchEventKinds;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchEvent.Kind;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Date;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import javax.enterprise.context.ApplicationScoped;
 import javax.enterprise.event.Observes;
 import javax.inject.Inject;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.marmotta.commons.nio.watch.SimpleTreeWatcher;
 import org.apache.marmotta.platform.core.api.config.ConfigurationService;
 import org.apache.marmotta.platform.core.api.importer.ImportService;
 import org.apache.marmotta.platform.core.api.importer.ImportWatchService;
@@ -48,6 +52,7 @@ import org.apache.marmotta.platform.core.api.task.Task;
 import org.apache.marmotta.platform.core.api.task.TaskManagerService;
 import org.apache.marmotta.platform.core.api.triplestore.ContextService;
 import org.apache.marmotta.platform.core.api.user.UserService;
+import org.apache.marmotta.platform.core.events.ConfigurationChangedEvent;
 import org.apache.marmotta.platform.core.events.SystemStartupEvent;
 import org.apache.marmotta.platform.core.exception.io.MarmottaImportException;
 import org.openrdf.model.URI;
@@ -59,19 +64,27 @@ import com.ibm.icu.text.CharsetDetector;
 import com.ibm.icu.text.CharsetMatch;
 
 /**
- * Implementation for watching import directory
+ * Implementation for watching import directory.
+ * This service watches the import directory (see {@link #getImportRoot()}) for (new) files
and imports them. 
  * 
  * @author Sergio Fernández
+ * @author Jakob Frank <jakob@apache.org>
  * 
  */
 @ApplicationScoped
 public class ImportWatchServiceImpl implements ImportWatchService {
 
-	private static final String TASK_GROUP = "Import Watch";
+	private static final String CONFIG_PREFIX = "file-import.";
+    private static final String CONFIG_KEY_LOCK_FILE = CONFIG_PREFIX + "lockFile";
+    private static final String CONFIG_KEY_CONF_FILE = CONFIG_PREFIX + "dirConfigFile";
+    private static final String CONFIG_KEY_IMPORT_DELAY = CONFIG_PREFIX + "importDelay";
+    private static final String CONFIG_KEY_DELETE_AFTER_IMPORT = CONFIG_PREFIX + "deleteAfterImport";
+    private static final String CONFIG_KEY_SERVICE_ENABLED = CONFIG_PREFIX + "enabled";
+
+    private static final String TASK_GROUP = "Import Watch";
 
 	private static final String TASK_DETAIL_PATH = "path";
-	
-	private static final String TASK_DETAIL_CONTEXT = "context";
+	private static final String TASK_DETAIL_QUEUE = "import queue";
 	
 	@Inject
 	private Logger log;
@@ -91,122 +104,108 @@ public class ImportWatchServiceImpl implements ImportWatchService {
 	@Inject
 	private UserService userService;
 	
-	private Map<WatchKey,Path> keys;
+    private ImportWatcher importWatcher = null;
 
-	private String path;
-
-	private int count;
-
-	public ImportWatchServiceImpl() {
-		this.keys = new HashMap<WatchKey,Path>();
-		count = 0;
+    /**
+     * Initialize and start the watcher service.
+     */
+	@Override
+	public void startup() {
+	    if (importWatcher == null && configurationService.getBooleanConfiguration(CONFIG_KEY_SERVICE_ENABLED,
true)) {
+    	    importWatcher = new ImportWatcher(getImportRoot());
+    	    importWatcher.setDeleteAfterImport(configurationService.getBooleanConfiguration(CONFIG_KEY_DELETE_AFTER_IMPORT,
true));
+            importWatcher.setImportDelay(configurationService.getIntConfiguration(CONFIG_KEY_IMPORT_DELAY,
2500));
+    	    importWatcher.setDirConfigFileName(configurationService.getStringConfiguration(CONFIG_KEY_CONF_FILE,
"config"));
+    	    importWatcher.setLockFile(configurationService.getStringConfiguration(CONFIG_KEY_LOCK_FILE,
"lock"));
+    	    new Thread(importWatcher).start();
+	    }
 	}
 
+	/**
+	 * The import root. all files put into this directory (and any subdir) will be imported.
+	 * Directories containing a file called "lock" (configurable, see {@link #CONFIG_KEY_LOCK_FILE})
are ignored.
+	 */
 	@Override
-	public void initialize(@Observes SystemStartupEvent event) {
-		this.path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
-
-		Runnable r = new Runnable() {
-
-			@Override
-			public void run() {
-				final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
-				task.updateMessage("watching...");
-				task.updateDetailMessage(TASK_DETAIL_PATH, path);
-
-				try {
-					Path root = Paths.get(path);
-					WatchService watcher = root.getFileSystem().newWatchService();
-					register(root, watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
-					while (true) {
-						final WatchKey key = watcher.take();
-						for (WatchEvent<?> event : key.pollEvents()) {
-							
-							try {
-								@SuppressWarnings("unchecked")
-								Path item = ((WatchEvent<Path>) event).context();
-								Path dir = keys.get(key);
-								File file = new File(dir.toString(), item.toString()).getAbsoluteFile();
-								
-								if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
-									if (file.isDirectory()) {
-										//recursive registration of sub-directories
-										register(Paths.get(dir.toString(), item.toString()), watcher, StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE);
-										task.updateProgress(++count);
-									} else {
-										URI context = getTargetContext(file);
-										log.debug("Importing '{}'...", file.getAbsolutePath());
-										task.updateMessage("importing...");
-										task.updateDetailMessage(TASK_DETAIL_PATH, file.getAbsolutePath());
-										task.updateDetailMessage(TASK_DETAIL_CONTEXT, context.stringValue());
-										if (execImport(file, context)) {
-											log.info("Sucessfully imported file '{}' into {}", file.getAbsolutePath(), context.stringValue());
-											try {
-												//delete the imported file
-												log.debug("Deleting {}...", file.getAbsolutePath());
-												file.delete();
-											} catch (Exception ex) {
-												log.error("Error deleing {}: {}", file.getAbsolutePath(), ex.getMessage());
-											}
-										}
-										task.updateProgress(++count);
-										task.updateMessage("watching...");
-										task.updateDetailMessage(TASK_DETAIL_PATH, path);
-										task.removeDetailMessage(TASK_DETAIL_CONTEXT);
-									}
-								} else if (StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind()) && Files.isDirectory(item))
{
-									//TODO: unregister deleted directories?
-									task.updateProgress(++count);
-								}
-							} catch (IOException e) {
-								log.error("Error importing '{}': {}", path, e.getMessage());
-							} catch (URISyntaxException e) {
-								log.error("Error creating context uri for file '{}': {}", path, e.getMessage());
-							}
-							
-						}
-						if (!key.reset()) {
-							// exit loop if the key is not valid
-							// e.g. if the directory was deleted
-							break;
-						}
-					}
-				} catch (IOException e) {
-					log.error("Error registering the import watch service over '{}': {}", path, e.getMessage());
-				} catch (InterruptedException e) {
-					log.error("Import watch service has been interrupted");
-				}
-			}
-
-		};
-
-		Thread t = new Thread(r);
-		t.setName(TASK_GROUP + "(start:" + new Date() + ",path:" + this.path + ")");
-		t.setDaemon(true);
-		t.start();
+    public Path getImportRoot() {
+        return Paths.get(configurationService.getHome(), ConfigurationService.DIR_IMPORT).toAbsolutePath();
+    }
+	
+	/**
+	 * Shutdown the directory.
+	 */
+    @Override
+    public void shutdown() {
+        if (importWatcher != null) {
+            try {
+                importWatcher.shutdown();
+            } catch (IOException e) {
+                log.error("Exception while shutting down import watcher: {}\n{}", e.getMessage(),
e);
+            }
+            importWatcher = null;
+        }
+    }
+    
+    protected void onConfigurationChangedEvent(@Observes ConfigurationChangedEvent event)
{
+        if (event.containsChangedKeyWithPrefix(CONFIG_PREFIX)) {
+            if (event.containsChangedKey(CONFIG_KEY_SERVICE_ENABLED)) {
+                shutdown();
+                startup();
+            } else if (importWatcher != null) {
+                importWatcher.setDeleteAfterImport(configurationService.getBooleanConfiguration(CONFIG_KEY_DELETE_AFTER_IMPORT,
true));
+                importWatcher.setImportDelay(configurationService.getIntConfiguration(CONFIG_KEY_IMPORT_DELAY,
2500));
+                importWatcher.setDirConfigFileName(configurationService.getStringConfiguration(CONFIG_KEY_CONF_FILE,
"config"));
+                importWatcher.setLockFile(configurationService.getStringConfiguration(CONFIG_KEY_LOCK_FILE,
"lock"));
+            }
+        }
+    }
 
+	protected void onSystemStartupEvent(@Observes SystemStartupEvent event) {
+	    shutdown();
+	    startup();
 	}
 
+	/**
+	 * Import the given file.
+	 * @see #importFile(Path)
+	 */
 	@Override
-	public boolean execImport(File file, URI context) {
-		try {
-			String format = detectFormat(file);
-			FileInputStream is = new FileInputStream(file);
-			URI user = userService.getAdminUser();
-			importService.importData(is, format, user, context);
-			return true;
-		} catch (MarmottaImportException e) {
-			log.error("Error importing file {} from the local directory: {}", file.getAbsolutePath(),
e.getMessage());
-			return false;
-		} catch (IOException e) {
-			log.error("Error retrieving file {} from the local directory: {}", file.getAbsolutePath(),
e.getMessage());
-			return false;
-		}
+	public boolean importFile(File file) throws MarmottaImportException {
+	    return importFile(file.toPath());
+	}
+	
+	/**
+	 * Import the given file.
+	 * TODO
+	 */
+	@Override
+	public boolean importFile(Path file) throws MarmottaImportException {
+	    try {
+	        URI context;
+            try {
+                context = getTargetContext(file);
+            } catch (URISyntaxException e) {
+                log.warn("Could not build context for file {}: {}", file, e.getMessage());
+                context = null;
+            }
+	        String format = detectFormat(file);
+	        FileInputStream is = new FileInputStream(file.toFile());
+	        URI user = userService.getAdminUser();
+	        importService.importData(is, format, user, context);
+	        return true;
+	    } catch (IOException e) {
+	        throw new MarmottaImportException("Could not read input file " + file.toFile().getAbsolutePath(),
e);
+	    }
 	}
 
-	private String detectFormat(File file) throws MarmottaImportException {
+	/**
+	 * Detect the import format of the given file (mime-type)
+	 * @param file the file to check
+	 * @return the mime-type
+	 * @throws MarmottaImportException
+	 */
+	private String detectFormat(Path file) throws MarmottaImportException {
 		String format = null;
-		String fileName = file.getName();
+		final String fileName = file.toFile().getName();
 		
 		//mimetype detection
 		RDFFormat rdfFormat = Rio.getParserFormatForFileName(fileName);
@@ -217,8 +216,7 @@ public class ImportWatchServiceImpl implements ImportWatchService {
 		}
 
 	    //encoding detection
-		try {
-			BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
+		try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file.toFile())))
{
 			CharsetDetector cd = new CharsetDetector();
 			cd.setText(bis);
 			CharsetMatch cm = cd.detect();
@@ -233,41 +231,244 @@ public class ImportWatchServiceImpl implements ImportWatchService {
 	}
 	
 	/**
-	 * Get the target context, according the path relative to the base import directory
+	 * Get the target context. 
+	 * The algorithm is as follows:
+	 * <ol>
+	 * <li>check for a file "conf" (configurable, see {@link #CONFIG_KEY_CONF_FILE}) which
specifies 
+	 * the target content using {@link Properties} syntax (key {@code context}), then use is;
or
+	 * <li>check if the sub-directory is a url-encoded URI, then use it; or
+	 * <li>construct the context by using {@link ConfigurationService#getBaseContext()}
and the relative sub-dirs and use it; or
+	 * <li>use the default context as a general fallback.
+	 * </ol>
 	 * 
-	 * @param file
-	 * @return
+	 * @param file the file
+	 * @return the context URI
 	 * @throws URISyntaxException 
 	 */
-	private URI getTargetContext(File file) throws URISyntaxException {
-		String subdir = StringUtils.removeStart(file.getParentFile().getAbsolutePath(), this.path);
-		if (StringUtils.isBlank(subdir)) {
-			return contextService.getDefaultContext();
-		} else {
-			subdir = subdir.substring(1); //remove initial slash
-			if (StringUtils.startsWith(subdir, "http%3A%2F%2F")) {
-				try {
-					return contextService.createContext(URLDecoder.decode(subdir, "UTF-8"));
-				} catch (UnsupportedEncodingException e) {
-					log.error("Error url-decoding context name '{}', so using the default one: {}", subdir,
e.getMessage());
-					return contextService.getDefaultContext();
-				}
-			} else {
-				return contextService.createContext(configurationService.getBaseContext() + subdir);
-			}
-		}
+	private URI getTargetContext(Path file) throws URISyntaxException {
+	    // Check for a configFile
+	    final Path config = file.getParent().resolve(configurationService.getStringConfiguration(CONFIG_KEY_CONF_FILE,
"config"));
+	    if (Files.isReadable(config)) {
+	        try {
+	            Properties prop = new Properties();
+	            prop.load(new FileInputStream(config.toFile()));
+	            final String _c = prop.getProperty("context");
+	            if (_c != null) {
+	                try {
+	                    URI context = contextService.createContext(_c);
+	                    log.debug("using context {} from config file {}", context, config);
+	                    return context;
+	                } catch (URISyntaxException e) {
+	                    log.warn("invalid context {} in config file {}, ignoring", _c, config);
+	                }
+	            } else {
+	                log.trace("no context defined in config file {}", config);
+	            }
+	        } catch (IOException e) {
+	            log.warn("could not read dirConfigFile {}: {}", config, e.getMessage());
+	        }
+	    }
+
+	    // Check for url-encoded directory
+	    Path subDir = file.getParent().relativize(getImportRoot());
+	    if (StringUtils.isBlank(subDir.toString())) {
+	        log.trace("using default context for file {}", file);
+	        return contextService.getDefaultContext();
+	    } else if (StringUtils.startsWith(subDir.toString(), "http%3A%2F%2F")){
+	        log.debug("using url-encoded context {} for import of {}", subDir, file);
+	        try {
+	            return contextService.createContext(URLDecoder.decode(subDir.toString(), "UTF-8"));
+	        } catch (UnsupportedEncodingException e) {
+	            log.error("Error url-decoding context name '{}', so using the default one: {}",
subDir, e.getMessage());
+	            return contextService.getDefaultContext();
+	        }
+	    } else {
+	        final String _c = String.format("%s/%s", configurationService.getBaseContext().replaceFirst("/$",
""), subDir);
+	        final URI context = contextService.createContext(_c);
+	        log.debug("using context {} based on relative subdir {} for file {}", context, subDir,
file);
+            return context;
+	    }
 	}
 	
-	/**
-	 * Registers a new path in the watcher, keeping the path mapping for future uses
-	 * 
-	 * @param path
-	 * @param watcher
-	 * @param events
-	 * @throws IOException
-	 */
-	private void register(Path path, WatchService watcher, Kind<?>... events) throws IOException
{
-		keys.put(path.register(watcher, events), path);
+	private class ImportWatcher extends SimpleTreeWatcher {
+
+        private String dirConfigFileName = null;
+        private boolean deleteAfterImport = false;
+        private int importDelay = 2500;
+        private String lockFile = null;
+        
+        private final ScheduledThreadPoolExecutor executor;
+        private final Map<Path, ScheduledFuture<?>> fileSchedules;
+        
+        private final Task task;
+
+        public ImportWatcher(Path target) {
+            super(target, true);
+            
+            executor = new ScheduledThreadPoolExecutor(1);
+            executor.setMaximumPoolSize(1);
+
+            fileSchedules = new HashMap<>();
+            
+            task = taskManagerService.createTask("Import Watch", TASK_GROUP);
+            task.updateMessage("off");
+            task.updateDetailMessage(TASK_DETAIL_PATH, target.toAbsolutePath().toString());
+        }
+        
+        public void setLockFile(String lockFile) {
+            this.lockFile = lockFile;
+        }
+
+        public void setDirConfigFileName(String configFileName) {
+            this.dirConfigFileName = configFileName;
+        }
+
+        public void setDeleteAfterImport(boolean deleteAfterImport) {
+            this.deleteAfterImport = deleteAfterImport;
+        }
+        
+        /**
+         * Wait for some time before actually starting the import.
+         * @param importDelay the delay in milliseconds.
+         */
+        public void setImportDelay(int importDelay) {
+            this.importDelay = importDelay;
+        }
+
+        @Override
+        public void run() {
+            task.updateMessage("waiting for new files");
+            scheduleDirectoryRecursive(root);
+            super.run();
+        }
+        
+        @Override
+        public void shutdown() throws IOException {
+            try {
+                task.updateMessage("shutting down");
+                super.shutdown();
+                executor.shutdownNow();
+            } finally {
+                task.endTask();
+            }
+        }
+        
+        @Override
+        public void onChildDeleted(final Path parent, Path child) {
+            // if the lockfile is deleted, import the full directory
+            if (lockFile != null && child.endsWith(lockFile)) {
+                scheduleDirectory(parent);
+            } else {
+                // otherwise remove a potential scheduled import
+                final ScheduledFuture<?> scheduled = fileSchedules.remove(child);
+                if (scheduled != null) {
+                    scheduled.cancel(true);
+                    updateQueueSizeMonitor();
+                }
+            }
+        }
+        
+        private void scheduleDirectory(Path dir) {
+            if (!isLocked(dir)) {
+                try {
+                    Files.walkFileTree(dir, EnumSet.noneOf(FileVisitOption.class), 1, new
SimpleFileVisitor<Path>() {
+                        @Override
+                        public FileVisitResult visitFile(Path file,
+                                BasicFileAttributes attrs) throws IOException {
+                            scheduleFile(file);
+                            return FileVisitResult.CONTINUE;
+                        }
+                    });
+                } catch (IOException e) {
+                    log.warn("Could not schedule directory {} for import: {}", dir, e.getMessage());
+                }
+            }
+        }
+        
+        private boolean isLocked(Path dir) {
+            if (lockFile == null) {
+                return false;
+            } else {
+                return Files.exists(dir.resolve(lockFile));
+            }
+        }
+
+        private void scheduleFile(final Path file) {
+            // if the dir is locked, do not schedule
+            if (isLocked(file.getParent())) return;
+            
+            // do not schedule a config file
+            if (dirConfigFileName != null && file.endsWith(dirConfigFileName)) return;
+
+            // schedule the import
+            final ScheduledFuture<?> prevSchedule = fileSchedules.put(file, executor.schedule(new
Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        task.updateMessage("importing " + file);
+                        final File f = file.toFile();
+                        if (importFile(f)) {
+                            fileSchedules.remove(file);
+                            updateQueueSizeMonitor();
+                            if (deleteAfterImport) {
+                                Files.delete(file);
+                            }
+                        }
+                    } catch (IOException e) {
+                        log.warn("Could not delete file {} after successful import: {}",
file, e.getMessage());
+                    } catch (MarmottaImportException e) {
+                        log.warn("importing {} failed: {}", file, e.getMessage());
+                    } catch (final Throwable t) {
+                        log.error("{} during file-import: {}", t.getClass().getSimpleName(),
t.getMessage());
+                        throw t;
+                    } finally {
+                        task.updateMessage("waiting for new files");
+                    }
+                }
+            }, importDelay, TimeUnit.MILLISECONDS));
+            
+            // cancel any previously scheduled import for this file.
+            if (prevSchedule != null) {
+                prevSchedule.cancel(true);
+            }
+
+            updateQueueSizeMonitor();
+        }
+
+        private void updateQueueSizeMonitor() {
+            task.updateDetailMessage(TASK_DETAIL_QUEUE, executor.getQueue().size() + " files");
+        }
+
+        @Override
+        public void onFileCreated(Path createdFile) {
+            scheduleFile(createdFile);
+        }
+        
+        @Override
+        public void onFileModified(Path modifiedFile) {
+            scheduleFile(modifiedFile);
+        }
+        
+        @Override
+        public void onDirectoryCreated(Path createdDir) {
+            scheduleDirectoryRecursive(createdDir);
+        }
+
+        private void scheduleDirectoryRecursive(Path directory) {
+            try {
+                Files.walkFileTree(directory, new SimpleFileVisitor<Path> () {
+                    @Override
+                    public FileVisitResult preVisitDirectory(Path dir,
+                            BasicFileAttributes attrs) throws IOException {
+                        scheduleDirectory(dir);
+                        return FileVisitResult.CONTINUE;
+                    }
+                });
+            } catch (IOException e) {
+                log.warn("Could not schedule directory {} for import: {}", directory, e.getMessage());
+            }
+        }
+	    
 	}
 
 }


Mime
View raw message