atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [15/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
Date Wed, 28 Jun 2017 05:57:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java
new file mode 100755
index 0000000..f999ecf
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+
+public interface ODFRunnable extends Runnable {
+
+	void setExecutorService(ExecutorService service);
+	
+	void cancel();
+	
+	// return true if the runnable is likely to be ready to receive data
+	boolean isReady();
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java
new file mode 100755
index 0000000..e6642c5
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+
+
+public interface QueueMessageProcessor {
+
+	/**
+	 * callback to process the message taken from the queue.
+	 * 
+	 * @param executorService
+	 * @param msg The message to be processed
+	 * @param partition The kafka topic partition this message was read from
+	 * @param msgOffset The offset of this particular message on this kafka partition
+	 * @return
+	 */
+	void process(ExecutorService executorService, String msg, int partition, long msgOffset);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java
new file mode 100755
index 0000000..da06dd2
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+
+public interface ServiceRuntime {
+	
+	String getName();
+	
+	/**
+	 * Check if the runtime is currently available for processing.
+	 * Returns <= 0 if the runtime is available immediately. A number > 0
+	 * indicates how many seconds to wait until retrying.
+	 * 
+	 * Note: If this method returns > 0 the Kafka consumer will be shut down and only be 
+	 * started again when it returns <= 0. Shutting down and restarting the consumer is
+	 * rather costly so this should only be done if the runtime won't be accepting requests
+	 * for a foreseeable period of time.
+	 */
+	long getWaitTimeUntilAvailable();
+
+	DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props);
+
+	String getDescription();
+	
+	void validate(DiscoveryServiceProperties props) throws ValidationException;
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java
new file mode 100755
index 0000000..a867580
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.engine.ServiceRuntimeInfo;
+import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+
+public class ServiceRuntimes {
+
+	static Logger logger = Logger.getLogger(ServiceRuntimes.class.getName());
+
+	static List<ServiceRuntime> getRuntimeExtensions() throws IOException {
+		ClassLoader cl = ServiceRuntimes.class.getClassLoader();
+		Enumeration<URL> services = cl.getResources("META-INF/odf/odf-runtimes.txt");
+		List<ServiceRuntime> result = new ArrayList<>();
+		while (services.hasMoreElements()) {
+			URL url = services.nextElement();
+			InputStream is = url.openStream();
+			InputStreamReader isr = new InputStreamReader(is, "UTF-8");
+			LineNumberReader lnr = new LineNumberReader(isr);
+			String line = null;
+			while ((line = lnr.readLine()) != null) {
+				line = line.trim();
+				logger.log(Level.INFO,  "Loading runtime extension ''{0}''", line);
+				try {
+					@SuppressWarnings("unchecked")
+					Class<ServiceRuntime> clazz = (Class<ServiceRuntime>) cl.loadClass(line);
+					ServiceRuntime sr = clazz.newInstance();
+					result.add(sr);
+				} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+					logger.log(Level.WARNING, MessageFormat.format("Runtime extension of class ''{0}'' could not be instantiated", line), e);
+				} 
+			}
+		}
+		logger.log(Level.INFO, "Number of classpath services found: {0}", result.size());
+		return result;
+	}
+	
+	static {
+		List<ServiceRuntime> allRuntimes = new ArrayList<>(Arrays.asList( //
+				new HealthCheckServiceRuntime(), //
+				new JavaServiceRuntime(), //
+				new SparkServiceRuntime() //
+		));
+		try {
+			List<ServiceRuntime> runtimeExtensions = getRuntimeExtensions();
+			allRuntimes.addAll(runtimeExtensions);
+		} catch (IOException e) {
+			logger.log(Level.WARNING, "An exception occurred when loading runtime extensions, ignoring them", e);
+		}
+		runtimes = Collections.unmodifiableList(allRuntimes);
+	}
+
+	private static List<ServiceRuntime> runtimes;
+
+	public static List<ServiceRuntime> getActiveRuntimes() {
+		Environment env = new ODFInternalFactory().create(Environment.class);
+		List<String> activeRuntimeNames = env.getActiveRuntimeNames();
+		if (activeRuntimeNames == null) {
+			return getAllRuntimes();
+		}
+		// always add health check runtime
+		Set<String> activeRuntimeNamesSet = new HashSet<>(activeRuntimeNames);
+		activeRuntimeNamesSet.add(HealthCheckServiceRuntime.HEALTH_CHECK_RUNTIME_NAME);
+		List<ServiceRuntime> activeRuntimes = new ArrayList<>();
+		for (ServiceRuntime rt : runtimes) {
+			if (activeRuntimeNamesSet.contains(rt.getName())) {
+				activeRuntimes.add(rt);
+			}
+		}
+		return activeRuntimes;
+	}
+
+	public static List<ServiceRuntime> getAllRuntimes() {
+		return runtimes;
+	}
+
+	public static ServiceRuntime getRuntimeForDiscoveryService(DiscoveryServiceProperties discoveryServiceProps) {
+		DiscoveryServiceEndpoint ep = discoveryServiceProps.getEndpoint();
+		for (ServiceRuntime runtime : getAllRuntimes()) {
+			if (runtime.getName().equals(ep.getRuntimeName())) {
+				return runtime;
+			}
+		}
+		return null;
+	}
+
+	public static ServiceRuntime getRuntimeForDiscoveryService(String discoveryServiceId) {
+		// special check because the healch check runtime is not part of the configuration
+		if (discoveryServiceId.startsWith(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID)) {
+			return new HealthCheckServiceRuntime();
+		}
+		DiscoveryServiceManager dsm = new ODFInternalFactory().create(DiscoveryServiceManager.class);
+		try {
+			DiscoveryServiceProperties props = dsm.getDiscoveryServiceProperties(discoveryServiceId);
+			return getRuntimeForDiscoveryService(props);
+		} catch (ServiceNotFoundException e) {
+			return null;
+		}
+	}
+
+	public static ServiceRuntimesInfo getRuntimesInfo(List<ServiceRuntime> runtimes) {
+		List<ServiceRuntimeInfo> rts = new ArrayList<>();
+		for (ServiceRuntime rt : runtimes) {
+			ServiceRuntimeInfo sri = new ServiceRuntimeInfo();
+			sri.setName(rt.getName());
+			sri.setDescription(rt.getDescription());
+			rts.add(sri);
+		}
+		ServiceRuntimesInfo result = new ServiceRuntimesInfo();
+		result.setRuntimes(rts);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java
new file mode 100755
index 0000000..6dc1fd0
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.spark.SparkServiceExecutor;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.core.Utils;
+
+/**
+ * Proxy for calling any type of Spark discovery services.
+ * 
+ *
+ */
+
+public class SparkDiscoveryServiceProxy implements SyncDiscoveryService {
+	Logger logger = Logger.getLogger(SparkDiscoveryServiceProxy.class.getName());
+
+	protected MetadataStore metadataStore;
+	protected AnnotationStore annotationStore;
+	protected ExecutorService executorService;
+	private DiscoveryServiceProperties dsri;
+
+	public SparkDiscoveryServiceProxy(DiscoveryServiceProperties dsri) {
+		this.dsri = dsri;
+	}
+
+	@Override
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
+
+	@Override
+	public void setMetadataStore(MetadataStore metadataStore) {
+		this.metadataStore = metadataStore;
+	}
+
+	@Override
+	public DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer) {
+		DataSetCheckResult checkResult = new DataSetCheckResult();
+		checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible);
+		try {
+			SparkServiceExecutor executor = new ODFInternalFactory().create(SparkServiceExecutor.class);
+			checkResult = executor.checkDataSet(this.dsri, dataSetContainer);
+		} catch (Exception e) {
+			logger.log(Level.WARNING,"Error running discovery service.", e);
+			checkResult.setDetails(Utils.getExceptionAsString(e));
+		}
+		return checkResult;
+	}
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+		logger.log(Level.INFO,MessageFormat.format("Starting Spark discovery service ''{0}'', id {1}.", new Object[]{ dsri.getName(), dsri.getId() }));
+		DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+		DiscoveryServiceSparkEndpoint endpoint;
+		try {
+			endpoint = JSONUtils.convert(dsri.getEndpoint(),  DiscoveryServiceSparkEndpoint.class);
+		} catch (JSONException e1) {
+			throw new RuntimeException(e1);
+		}
+		if ((endpoint.getJar() == null) || (endpoint.getJar().isEmpty())) {
+			response.setDetails("No jar file  was provided that implements the Spark application.");
+		} else try {
+			SparkServiceExecutor executor = new ODFInternalFactory().create(SparkServiceExecutor.class);
+			response = executor.runAnalysis(this.dsri, request);
+			logger.log(Level.FINER, "Spark discovery service response: " + response.toString());
+			logger.log(Level.INFO,"Spark discover service finished.");
+			return response;
+		} catch (Exception e) {
+			logger.log(Level.WARNING,"Error running Spark application: ", e);
+			response.setDetails(Utils.getExceptionAsString(e));
+		}
+		response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+		return response;
+	}
+
+	@Override
+	public void setAnnotationStore(AnnotationStore annotationStore) {
+		this.annotationStore = annotationStore;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java
new file mode 100755
index 0000000..91056b3
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class SparkServiceRuntime implements ServiceRuntime {
+
+	public static final String SPARK_RUNTIME_NAME = "Spark";
+	
+	@Override
+	public String getName() {
+		return SPARK_RUNTIME_NAME;
+	}
+
+	@Override
+	public long getWaitTimeUntilAvailable() {
+		return 0;
+	}
+
+	@Override
+	public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+		return new SparkDiscoveryServiceProxy(props);
+	}
+
+	@Override
+	public String getDescription() {
+		return "The default Spark runtime";
+	}
+
+	@Override
+	public void validate(DiscoveryServiceProperties props) throws ValidationException {
+		try {
+			JSONUtils.convert(props.getEndpoint(),  DiscoveryServiceSparkEndpoint.class);
+		} catch (JSONException e1) {
+			throw new ValidationException("Endpoint definition for Spark service is not correct: " + Utils.getExceptionAsString(e1));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java
new file mode 100755
index 0000000..206a6d0
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+
+// JSON
+public class StatusQueueEntry {
+
+	private Annotation annotation;
+	private AnalysisRequestTracker analysisRequestTracker;
+
+	public Annotation getAnnotation() {
+		return annotation;
+	}
+
+	public void setAnnotation(Annotation annotation) {
+		this.annotation = annotation;
+	}
+
+	public AnalysisRequestTracker getAnalysisRequestTracker() {
+		return analysisRequestTracker;
+	}
+
+	public void setAnalysisRequestTracker(AnalysisRequestTracker analysisRequestTracker) {
+		this.analysisRequestTracker = analysisRequestTracker;
+	}
+
+	
+	public static String getRequestId(StatusQueueEntry seq) {
+		if (seq.getAnnotation() != null) {
+			return seq.getAnnotation().getAnalysisRun();
+		} else if (seq.getAnalysisRequestTracker() != null) {
+			return seq.getAnalysisRequestTracker().getRequest().getId();
+		}
+		return null;
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java
new file mode 100755
index 0000000..33dba10
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+
+public interface ThreadManager {
+
+	void waitForThreadsToBeReady(long waitingLimitMs, List<ThreadStartupResult> startedThreads) throws TimeoutException;
+
+	ThreadStartupResult startUnmanagedThread(String name, ODFRunnable runnable);
+	
+	ThreadStatus.ThreadState getStateOfUnmanagedThread(String name);
+	
+	ODFRunnable getRunnable(String name);
+	
+	void setExecutorService(ExecutorService executorService);
+	
+	void shutdownAllUnmanagedThreads();
+	
+	void shutdownThreads(List<String> names);
+	
+	int getNumberOfRunningThreads();
+
+	List<ThreadStatus> getThreadManagerStatus();
+
+	public abstract class ThreadStartupResult {
+
+		private String threadId;
+		private boolean newThreadCreated;
+
+		public ThreadStartupResult(String id) {
+			this.threadId = id;
+		}
+
+		public String getThreadId() {
+			return threadId;
+		}
+
+		public boolean isNewThreadCreated() {
+			return newThreadCreated;
+		}
+
+		public void setNewThreadCreated(boolean newThreadCreated) {
+			this.newThreadCreated = newThreadCreated;
+		}
+
+		public abstract boolean isReady();
+
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java
new file mode 100755
index 0000000..f1c7704
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+
+public class TrackerUtil {
+	
+	/**
+	 * @param tracker
+	 * @return true if the first analysis of the tracker has not yet been started
+	 */
+	public static boolean isAnalysisWaiting(AnalysisRequestTracker tracker) {
+		return tracker.getNextDiscoveryServiceRequest() == 0 && (tracker.getStatus() == STATUS.IN_DISCOVERY_SERVICE_QUEUE || tracker.getStatus() == STATUS.INITIALIZED); // || tracker.getStatus() == STATUS.DISCOVERY_SERVICE_RUNNING);
+	}
+	
+	public static boolean isCancellable(AnalysisRequestTracker tracker)  {
+		return (tracker.getStatus() == STATUS.IN_DISCOVERY_SERVICE_QUEUE || tracker.getStatus() == STATUS.INITIALIZED || tracker.getStatus() == STATUS.DISCOVERY_SERVICE_RUNNING);
+	}
+
+	public static DiscoveryServiceRequest getCurrentDiscoveryServiceStartRequest(AnalysisRequestTracker tracker) {
+		int i = tracker.getNextDiscoveryServiceRequest();
+		List<DiscoveryServiceRequest> requests = tracker.getDiscoveryServiceRequests();
+		if (i >= 0 && i < requests.size()) {
+			return requests.get(i);
+		}
+		return null;
+	}
+
+	public static DiscoveryServiceResponse getCurrentDiscoveryServiceStartResponse(AnalysisRequestTracker tracker) {
+		int i = tracker.getNextDiscoveryServiceRequest();
+		List<DiscoveryServiceResponse> responses = tracker.getDiscoveryServiceResponses();
+		if (responses == null || responses.isEmpty()) {
+			return null;
+		}
+		if (i >= 0 && i < responses.size()) {
+			return responses.get(i);
+		}
+		return null;
+	}
+
+	public static void moveToNextDiscoveryService(AnalysisRequestTracker tracker) {
+		int i = tracker.getNextDiscoveryServiceRequest();
+		List<DiscoveryServiceRequest> requests = tracker.getDiscoveryServiceRequests();
+		if (i >= 0 && i < requests.size()) {
+			tracker.setNextDiscoveryServiceRequest(i+1);
+		}
+	}
+
+	public static void addDiscoveryServiceStartResponse(AnalysisRequestTracker tracker, DiscoveryServiceResponse response) {
+		List<DiscoveryServiceResponse> l = tracker.getDiscoveryServiceResponses();
+		if (l == null) {
+			l = new ArrayList<DiscoveryServiceResponse>();
+			tracker.setDiscoveryServiceResponses(l);
+		}
+		l.add(response);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java
new file mode 100755
index 0000000..1a3de04
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+
+public class TransactionAsyncDiscoveryServiceProxy implements AsyncDiscoveryService {
+
+	private AsyncDiscoveryService wrappedService;
+
+	public TransactionAsyncDiscoveryServiceProxy(AsyncDiscoveryService wrappedService) {
+		this.wrappedService = wrappedService;
+	}
+
+	public DiscoveryServiceAsyncStartResponse startAnalysis(final DiscoveryServiceRequest request) {
+		TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class);
+		try {
+			return (DiscoveryServiceAsyncStartResponse) transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+				@Override
+				public Object call() throws Exception {
+					return wrappedService.startAnalysis(request);
+				}
+			});
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	public DiscoveryServiceAsyncRunStatus getStatus(final String runId) {
+		TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class);
+		try {
+			return (DiscoveryServiceAsyncRunStatus) transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+				@Override
+				public Object call() throws Exception {
+					return wrappedService.getStatus(runId);
+				}
+			});
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		wrappedService.setExecutorService(executorService);
+	}
+
+	public void setMetadataStore(MetadataStore metadataStore) {
+		wrappedService.setMetadataStore(metadataStore);
+	}
+
+	public void setAnnotationStore(AnnotationStore annotationStore) {
+		wrappedService.setAnnotationStore(annotationStore);
+	}
+
+	public DataSetCheckResult checkDataSet(final DataSetContainer dataSetContainer) {
+		TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class);
+		try {
+			return (DataSetCheckResult) transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+				@Override
+				public Object call() throws Exception {
+					return wrappedService.checkDataSet(dataSetContainer);
+				}
+			});
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java
new file mode 100755
index 0000000..6c17686
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Use this interface in the core framework whenever you want to run code that is run from an unmanaged thread (typically in the Kafka consumers)
+ * and that accesses the metadata repository. The implementation of this class will ensure that the code will be run in the
+ * correct context (regarding transactions etc.)
+ * 
+ *
+ */
+public interface TransactionContextExecutor {
+	
+	/**
+	 * Run a generic callable in a transaction context. This is not a template function as some of the underlying infrastructures
+	 * might not be able to support it.
+	 */
+	Object runInTransactionContext(Callable<Object> callable) throws Exception;
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java
new file mode 100755
index 0000000..ec96e96
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+
+public class TransactionSyncDiscoveryServiceProxy implements SyncDiscoveryService {
+
+	private SyncDiscoveryService wrappedService;
+
+	public TransactionSyncDiscoveryServiceProxy(SyncDiscoveryService wrappedService) {
+		this.wrappedService = wrappedService;
+	}
+
+	public DiscoveryServiceSyncResponse runAnalysis(final DiscoveryServiceRequest request) {
+		TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class);
+		try {
+			return (DiscoveryServiceSyncResponse) transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+				@Override
+				public Object call() throws Exception {
+					return wrappedService.runAnalysis(request);
+				}
+			});
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		wrappedService.setExecutorService(executorService);
+	}
+
+	public void setMetadataStore(MetadataStore metadataStore) {
+		wrappedService.setMetadataStore(metadataStore);
+	}
+
+	public void setAnnotationStore(AnnotationStore annotationStore) {
+		wrappedService.setAnnotationStore(annotationStore);
+	}
+
+	public DataSetCheckResult checkDataSet(final DataSetContainer dataSetContainer) {
+		TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class);
+		try {
+			return (DataSetCheckResult) transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+				@Override
+				public Object call() throws Exception {
+					return wrappedService.checkDataSet(dataSetContainer);
+				}
+			});
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java
new file mode 100755
index 0000000..e7cbc44
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.discoveryservice;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRuntimeStatistics;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceStatus;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.discoveryservice.ServiceStatusCount;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+
+/**
+ *
+ * External Java API for creating and managing discovery services
+ *
+ */
+public class DiscoveryServiceManagerImpl implements DiscoveryServiceManager {
+	private Logger logger = Logger.getLogger(DiscoveryServiceManagerImpl.class.getName());
+	public ConfigManager configManager;
+
+	public DiscoveryServiceManagerImpl() {
+		configManager = new ODFInternalFactory().create(ConfigManager.class);
+	}
+
+	/**
+	 * Retrieve list of discovery services registered in ODF
+	 * @return List of registered ODF discovery services
+	 */
+	public List<DiscoveryServiceProperties> getDiscoveryServicesProperties() {
+		logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServicesProperties");
+		List<DiscoveryServiceProperties> dsProperties = configManager.getConfigContainer().getRegisteredServices();
+		return dsProperties;
+	};
+
+	/**
+	 * Register a new service in ODF
+	 * @param dsProperties Properties of the discovery service to register
+	 * @throws ValidationException Validation of a property failed
+	 */
+	public void createDiscoveryService(DiscoveryServiceProperties dsProperties) throws ValidationException {
+		logger.entering(DiscoveryServiceManager.class.getName(), "createDiscoveryService");
+		ConfigContainer update = new ConfigContainer();
+		List<DiscoveryServiceProperties> registeredServices = configManager.getConfigContainer().getRegisteredServices();
+		registeredServices.addAll(Collections.singletonList(dsProperties));
+		update.setRegisteredServices(registeredServices);
+		configManager.updateConfigContainer(update);
+
+
+	};
+
+	/**
+	 * Update configuration of an ODF discovery service
+	 * @param dsProperties Properties of the discovery service to update
+	 */
+	public void replaceDiscoveryService(DiscoveryServiceProperties dsProperties) throws ServiceNotFoundException, ValidationException {
+		logger.entering(DiscoveryServiceManager.class.getName(), "updateDiscoveryService");
+		String serviceId = dsProperties.getId();
+		deleteDiscoveryService(serviceId);
+		createDiscoveryService(dsProperties);
+	};
+
+	/**
+	 * Remove a registered service from ODF
+	 * @param serviceId Discovery service ID
+	 */
+	public void deleteDiscoveryService(String serviceId) throws ServiceNotFoundException, ValidationException {
+		logger.entering(DiscoveryServiceManager.class.getName(), "deleteDiscoveryService");
+		ConfigContainer cc = configManager.getConfigContainer();
+		Iterator<DiscoveryServiceProperties> iterator = cc.getRegisteredServices().iterator();
+		boolean serviceFound = false;
+		while (iterator.hasNext()) {
+			if (iterator.next().getId().equals(serviceId)) {
+				iterator.remove();
+				serviceFound = true;
+			}
+		}
+		if (!serviceFound) {
+			throw new ServiceNotFoundException(serviceId);
+		} else {
+			configManager.updateConfigContainer(cc);
+		}
+	};
+
+	/**
+	 * Retrieve current configuration of a discovery services registered in ODF
+	 * @param serviceId Discovery Service ID
+	 * @return Properties of the service with this ID
+	 * @throws ServiceNotFoundException A service with this ID is not registered
+	 */
+	public DiscoveryServiceProperties getDiscoveryServiceProperties(String serviceId) throws ServiceNotFoundException {
+		logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServiceProperties");
+		DiscoveryServiceProperties serviceFound = null;
+		List<DiscoveryServiceProperties> registeredServices;
+		registeredServices = configManager.getConfigContainer().getRegisteredServices();
+		for (DiscoveryServiceProperties service : registeredServices) {
+			if (service.getId().equals(serviceId)) {
+				serviceFound = service;
+				break;
+			}
+		}
+		if (serviceFound == null) {
+			throw new ServiceNotFoundException(serviceId);
+		}
+		return serviceFound;
+	};
+
+	/**
+	 * Retrieve status overview of all discovery services registered in ODF
+	 * @return List of status count maps for all discovery services
+	 */
+	public List<ServiceStatusCount> getDiscoveryServiceStatusOverview() {
+		DiscoveryServiceStatistics stats = new DiscoveryServiceStatistics(new ODFInternalFactory().create(AnalysisRequestTrackerStore.class).getRecentTrackers(0,-1));
+		return stats.getStatusCountPerService();
+	}
+
+	/**
+	 * Retrieve status of a specific discovery service. Returns null if no service info can be obtained
+	 * @param serviceId Discovery Service ID
+	 * @return Status of the service with this ID
+	 */
+	public DiscoveryServiceStatus getDiscoveryServiceStatus(String serviceId) throws ServiceNotFoundException {
+		logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServiceStatus");
+
+		DiscoveryServiceStatus dsStatus = null;
+		ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+		DiscoveryService ds = cc.getDiscoveryServiceProxy(serviceId, null);
+		if (ds == null) {
+			throw new ServiceNotFoundException(serviceId);
+		}
+		dsStatus = new DiscoveryServiceStatus();
+		dsStatus.setStatus(DiscoveryServiceStatus.Status.OK);
+		dsStatus.setMessage(MessageFormat.format("Discovery service ''{0}'' status is OK", serviceId));
+		ServiceStatusCount serviceStatus = null;
+		List<ServiceStatusCount> statusCounts = getDiscoveryServiceStatusOverview();
+		for (ServiceStatusCount cnt : statusCounts) {
+			if (cnt.getId().equals(serviceId)) {
+				serviceStatus = cnt;
+				break;
+			}
+		}
+		if (serviceStatus != null) {
+			dsStatus.setStatusCount(serviceStatus);
+		}
+		return dsStatus;
+	};
+
+	/**
+	 * Retrieve runtime statistics of a specific discovery service
+	 * @param serviceId Discovery Service ID
+	 * @return Runtime statistics of the service with this ID
+	 */
+	public DiscoveryServiceRuntimeStatistics getDiscoveryServiceRuntimeStatistics(String serviceId) throws ServiceNotFoundException {
+		logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServiceRuntimeStatistics");
+		DiscoveryServiceRuntimeStatistics dsrs = new DiscoveryServiceRuntimeStatistics();
+		dsrs.setAverageProcessingTimePerItemInMillis(0);   // TODO: implement
+		return dsrs;
+	};
+
+	/**
+	 * Delete runtime statistics of a specific discovery service
+	 * @param serviceId Discovery Service ID
+	 */
+	public void deleteDiscoveryServiceRuntimeStatistics(String serviceId) throws ServiceNotFoundException {
+		logger.entering(DiscoveryServiceManager.class.getName(), "deleteDiscoveryServiceRuntimeStatistics");
+		// TODO: implement
+	};
+
+	/**
+	 * Retrieve picture representing a discovery service
+	 * @param serviceId Discovery Service ID
+	 * @return Input stream for image
+	 */
+	public InputStream getDiscoveryServiceImage(String serviceId) throws ServiceNotFoundException {
+		logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServiceImage");
+		final String defaultImageDir = "org/apache/atlas/odf/images";
+
+		String imgUrl = null;
+		for (DiscoveryServiceProperties info : configManager.getConfigContainer().getRegisteredServices()) {
+			if (info.getId().equals(serviceId)) {
+				imgUrl = info.getIconUrl();
+				break;
+			}
+		}
+
+		ClassLoader cl = this.getClass().getClassLoader();
+		InputStream is = null;
+		if (imgUrl != null) {
+			is = cl.getResourceAsStream("META-INF/odf/" + imgUrl);
+			if (is == null) {
+				is = cl.getResourceAsStream(defaultImageDir + "/" + imgUrl);
+				if (is == null) {
+					try {
+						is = new URL(imgUrl).openStream();
+					} catch (MalformedURLException e) {
+						logger.log(Level.WARNING, "The specified image url {0} for service {1} is invalid!", new String[] { imgUrl, serviceId });
+					} catch (IOException e) {
+						logger.log(Level.WARNING, "The specified image url {0} for service {1} could not be accessed!", new String[] { imgUrl, serviceId });
+					}
+				}
+			}
+		}
+		if (imgUrl == null || is == null) {
+			//TODO is this correct? maybe we should use a single default image instead of a random one
+			try {
+				is = cl.getResourceAsStream(defaultImageDir);
+				if (is != null) {
+					InputStreamReader r = new InputStreamReader(is);
+					BufferedReader br = new BufferedReader(r);
+					List<String> images = new ArrayList<>();
+					String line = null;
+					while ((line = br.readLine()) != null) {
+						images.add(line);
+					}
+					// return random image
+					int ix = Math.abs(serviceId.hashCode()) % images.size();
+					is = cl.getResourceAsStream(defaultImageDir + "/" + images.get(ix));
+				}
+			} catch (IOException exc) {
+				logger.log(Level.WARNING, "Exception occurred while retrieving random image, ignoring it", exc);
+				is = null;
+			}
+		}
+		return is;
+	};
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java
new file mode 100755
index 0000000..6be0e5a
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.discoveryservice;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.ServiceStatusCount;
+
+public class DiscoveryServiceStatistics {
+
+	private List<AnalysisRequestTracker> requests = new ArrayList<AnalysisRequestTracker>();
+
+	public DiscoveryServiceStatistics(List<AnalysisRequestTracker> requests) {
+		this.requests = requests;
+	}
+
+	public List<ServiceStatusCount> getStatusCountPerService() {
+		List<ServiceStatusCount> result = new ArrayList<ServiceStatusCount>();
+
+		Map<String, LinkedHashMap<STATUS, Integer>> statusMap = new HashMap<String, LinkedHashMap<STATUS, Integer>>();
+
+		for (AnalysisRequestTracker tracker : requests) {
+			int maxDiscoveryServiceRequest = (tracker.getNextDiscoveryServiceRequest() == 0 ? 1 : tracker.getNextDiscoveryServiceRequest());
+			for (int no = 0; no < maxDiscoveryServiceRequest; no++) {
+				STATUS cntStatus = tracker.getStatus();
+
+				//No parallel requests are possible atm -> all requests leading to current one must be finished
+				if (no < maxDiscoveryServiceRequest - 1) {
+					cntStatus = STATUS.FINISHED;
+				}
+
+				DiscoveryServiceRequest req = tracker.getDiscoveryServiceRequests().get(no);
+				LinkedHashMap<STATUS, Integer> cntMap = statusMap.get(req.getDiscoveryServiceId());
+				if (cntMap == null) {
+					cntMap = new LinkedHashMap<STATUS, Integer>();
+					//add 0 default values
+					for (STATUS status : STATUS.values()) {
+						cntMap.put(status, 0);
+					}
+				}
+				Integer val = cntMap.get(cntStatus);
+				val++;
+				cntMap.put(cntStatus, val);
+				statusMap.put(req.getDiscoveryServiceId(), cntMap);
+			}
+		}
+
+		for (String key : statusMap.keySet()) {
+			ServiceStatusCount cnt = new ServiceStatusCount();
+			cnt.setId(key);
+			for (DiscoveryServiceProperties info : new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()) {
+				if (info.getId().equals(key)) {
+					cnt.setName(info.getName());
+					break;
+				}
+			}
+			cnt.setStatusCountMap(statusMap.get(key));
+			result.add(cnt);
+		}
+
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java
new file mode 100755
index 0000000..d09297a
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java
@@ -0,0 +1,221 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.engine;
+
+import java.io.InputStream;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.api.engine.ODFEngineOptions;
+import org.apache.atlas.odf.api.engine.ODFStatus;
+import org.apache.atlas.odf.api.engine.ODFVersion;
+import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo;
+import org.apache.atlas.odf.api.engine.SystemHealth;
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.ODFUtils;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+
+/**
+*
+* External Java API for managing and controlling the ODF engine
+*
+*/
+public class EngineManagerImpl implements EngineManager {
+
+	private Logger logger = Logger.getLogger(EngineManagerImpl.class.getName());
+
+	public EngineManagerImpl() {
+	}
+
+	/**
+	 * Checks the health status of ODF
+	 *
+	 * @return Health status of the ODF engine
+	 */
+	public SystemHealth checkHealthStatus() {
+		SystemHealth health = new SystemHealth();
+		try {
+			AnalysisRequest dummyRequest = new AnalysisRequest();
+			String dataSetID = ControlCenter.HEALTH_TEST_DATA_SET_ID_PREFIX + UUID.randomUUID().toString();
+			MetaDataObjectReference dataSetRef = new MetaDataObjectReference();
+			dataSetRef.setId(dataSetID);
+			dummyRequest.setDataSets(Collections.singletonList(dataSetRef));
+			List<String> discoveryServiceSequence = new ArrayList<String>();
+			discoveryServiceSequence.add(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID);
+			dummyRequest.setDiscoveryServiceSequence(discoveryServiceSequence);
+
+			AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+			AnalysisResponse resp = analysisManager.runAnalysis(dummyRequest);
+			String reqId = resp.getId();
+			AnalysisRequestStatus status = null;
+			final int maxNumberOfTimesToPoll = 500;
+			int count = 0;
+			int msToSleepBetweenPolls = 20;
+			boolean continuePolling = false;
+			do {
+				status = analysisManager.getAnalysisRequestStatus(reqId);
+				continuePolling = (status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.NOT_FOUND) && count < maxNumberOfTimesToPoll;
+				if (continuePolling) {
+					count++;
+					Thread.sleep(msToSleepBetweenPolls);
+				}
+			} while (continuePolling);
+			logger.log(Level.INFO, "Health check request ''{3}'' has status ''{0}'', time spent: {2}ms details ''{1}''", new Object[] { status.getState(), status.getDetails(),
+					count * msToSleepBetweenPolls, reqId });
+			health.getMessages().add(MessageFormat.format("Details message: {0}", status.getDetails()));
+			if (count >= maxNumberOfTimesToPoll) {
+				health.setStatus( SystemHealth.HealthStatus.WARNING);
+				String msg = MessageFormat.format("Health test request could not be processed in time ({0}ms)", (maxNumberOfTimesToPoll * msToSleepBetweenPolls));
+				logger.log(Level.INFO, msg);
+				health.getMessages().add(msg);
+			} else {
+				switch (status.getState()) {
+				case NOT_FOUND:
+					health.setStatus(SystemHealth.HealthStatus.ERROR);
+					health.getMessages().add(MessageFormat.format("Request ID ''{0}'' got lost", reqId));
+					break;
+				case ERROR:
+					health.setStatus(SystemHealth.HealthStatus.ERROR);
+					break;
+				case FINISHED:
+					health.setStatus(SystemHealth.HealthStatus.OK);
+					break;
+				default:
+					health.setStatus(SystemHealth.HealthStatus.ERROR);
+				}
+			}
+		} catch (Exception exc) {
+			logger.log(Level.WARNING, "An unknown error occurred", exc);
+			health.setStatus(SystemHealth.HealthStatus.ERROR);
+			health.getMessages().add(Utils.getExceptionAsString(exc));
+		}
+		return health;
+	}
+
+	/**
+	 * Returns the status of the ODF thread manager
+	 *
+	 * @return Status of all threads making up the ODF thread manager
+	 */
+	public List<ThreadStatus> getThreadManagerStatus() {
+		ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class);
+		return tm.getThreadManagerStatus();
+	}
+
+	/**
+	 * Returns the status of the ODF messaging subsystem
+	 *
+	 * @return Status of the ODF messaging subsystem
+	 */
+	public MessagingStatus getMessagingStatus() {
+		return new ODFInternalFactory().create(DiscoveryServiceQueueManager.class).getMessagingStatus();
+	}
+
+	/**
+	 * Returns the status of the messaging subsystem and the internal thread manager
+	 *
+	 * @return Combined status of the messaging subsystem and the internal thread manager
+	 */
+	public ODFStatus getStatus() {
+		ODFStatus status = new ODFStatus();
+		status.setMessagingStatus(this.getMessagingStatus());
+		status.setThreadManagerStatus(this.getThreadManagerStatus());
+		return status;
+	}
+
+	/**
+	 * Returns the current ODF version
+	 *
+	 * @return ODF version identifier
+	 */
+	public ODFVersion getVersion() {
+		InputStream is = ODFUtils.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/odfversion.txt");
+		ODFVersion version = new ODFVersion();
+		if (is == null) {
+			version.setVersion("NOTFOUND");
+		} else {
+			version.setVersion(Utils.getInputStreamAsString(is, "UTF-8").trim());
+		}
+		return version;
+	}
+
+	/**
+	 * Shuts down the ODF engine, purges all scheduled analysis requests from the queues, and cancels all running analysis requests.
+	 * This means that all running jobs will be cancelled or their results will not be reported back.
+	 * (for debugging purposes only)
+	 *
+	 * @param options Option for immediately restarting the engine after shutdown (default is not to restart immediately but only when needed)
+	 */
+	public void shutdown(ODFEngineOptions options) {
+		long currentTime = System.currentTimeMillis();
+
+		ControlCenter controlCenter = new ODFInternalFactory().create(ControlCenter.class);
+		AdminMessage shutDownMessage = new AdminMessage();
+		Type t = Type.SHUTDOWN;
+		if (options.isRestart()) {
+			t = Type.RESTART;
+		}
+		shutDownMessage.setAdminMessageType(t);
+		String detailMsg = MessageFormat.format("Shutdown was requested on {0} via ODF API", new Object[] { new Date() });
+		shutDownMessage.setDetails(detailMsg);
+		logger.log(Level.INFO, detailMsg);
+		controlCenter.getQueueManager().enqueueInAdminQueue(shutDownMessage);
+		int maxPolls = 60;
+		int counter = 0;
+		int timeBetweenPollsMs = 1000;
+		while (counter < maxPolls && ODFInitializer.getLastStopTimestamp() <= currentTime) {
+			try {
+				Thread.sleep(timeBetweenPollsMs);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+			counter++;
+		}
+		long timeWaited = ((counter * timeBetweenPollsMs) / 1000);
+		logger.log(Level.INFO, "Waited for {0} seconds for shutdown", timeWaited);
+		if (counter >= maxPolls) {
+			logger.log(Level.WARNING, "Waited for shutdown too long. Continuing." );
+		} else {
+			logger.log(Level.INFO, "Shutdown issued successfully");
+		}
+	}
+
+	@Override
+	public ServiceRuntimesInfo getRuntimesInfo() {
+		return ServiceRuntimes.getRuntimesInfo(ServiceRuntimes.getAllRuntimes());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java
new file mode 100755
index 0000000..9177556
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging;
+
+/**
+ * Default encryption: no encryption
+ * 
+ */
+public class DefaultMessageEncryption implements MessageEncryption {
+	
+	@Override
+	public String encrypt(String message) {
+		return message;
+	}
+
+	@Override
+	public String decrypt(String message) {
+		return message;
+	}
+
+
+	/*
+	// this used to be our default encryption. Leaving it in here for reference.
+	@Override
+	public String encrypt(String message) {
+		try {
+			return DatatypeConverter.printBase64Binary(message.getBytes("UTF-8"));
+		} catch (UnsupportedEncodingException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public String decrypt(String message)  {
+		try {
+			return new String(DatatypeConverter.parseBase64Binary(message), "UTF-8");
+		} catch (UnsupportedEncodingException e) {
+			throw new RuntimeException(e);
+		}
+	}
+	*/
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java
new file mode 100755
index 0000000..d2d84dd
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+
+
+
+public interface DiscoveryServiceQueueManager {
+	
+	void start() throws TimeoutException;
+	
+	void stop() throws TimeoutException;
+		
+	// find the next queue where this tracker should go and put it there
+	void enqueue(AnalysisRequestTracker tracker);
+	
+	void enqueueInStatusQueue(StatusQueueEntry sqe);
+	
+	void enqueueInAdminQueue(AdminMessage message);
+	
+	MessagingStatus getMessagingStatus();
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java
new file mode 100755
index 0000000..ad1bf28
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging;
+
+public interface MessageEncryption {
+	String encrypt(String message);
+
+	String decrypt(String message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java
new file mode 100755
index 0000000..c71ba3c
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.metadata;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.AnnotationPropagator;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.StoredMetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.RelationshipAnnotation;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.metadata.DefaultMetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.InternalMetaDataUtils;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStoreException;
+import org.apache.atlas.odf.api.metadata.models.ClassificationAnnotation;
+import org.apache.atlas.odf.api.metadata.models.ConnectionInfo;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * In-memory implementation of MetadataStore interface to be used for testing as
+ * well as for single-node ODF deployments. Uses static HashMaps for storing the
+ * metadata types and objects.
+ * 
+ * 
+ */
+public class DefaultMetadataStore extends WritableMetadataStoreBase implements WritableMetadataStore {
+	private Logger logger = Logger.getLogger(DefaultMetadataStore.class.getName());
+
+	private static final String METADATA_STORE_ID = "ODF_LOCAL_METADATA_STORE";
+	private static final String STORE_PROPERTY_TYPE = "default";
+	private static final String STORE_PROPERTY_DESCRIPTION = "ODF local metadata store";
+
+	private static HashMap<String, String> typeStore;
+	private static HashMap<String, StoredMetaDataObject> objectStore;
+	protected LinkedHashMap<String, StoredMetaDataObject> stagedObjects = new LinkedHashMap<String, StoredMetaDataObject>();
+	private static boolean isInitialized = false;
+	protected static Object accessLock = new Object();
+	static Object initializationLock = new Object();
+
+	public DefaultMetadataStore() {
+		synchronized (initializationLock) {
+			if (!isInitialized) {
+				isInitialized = true;
+				this.resetAllData();
+			}
+		}
+	}
+
+	protected WritableMetadataStore getMetadataStore() {
+		return this;
+	}
+
+	protected Object getAccessLock() {
+		return accessLock;
+	}
+
+	protected HashMap<String, StoredMetaDataObject> getObjects() {
+		return objectStore;
+	}
+
+	protected LinkedHashMap<String, StoredMetaDataObject> getStagedObjects() {
+		return stagedObjects;
+	}
+
+	@Override
+    public ConnectionInfo getConnectionInfo(MetaDataObject informationAsset) {
+    	synchronized(accessLock) {
+    		return WritableMetadataStoreUtils.getConnectionInfo(this, informationAsset);
+    	}
+    };
+
+	@Override
+	public void resetAllData() {
+		logger.log(Level.INFO, "Resetting all data in metadata store.");
+		synchronized (accessLock) {
+			typeStore = new HashMap<String, String>();
+			objectStore = new HashMap<String, StoredMetaDataObject>();
+			createTypes(WritableMetadataStoreUtils.getBaseTypes());
+		}
+	}
+
+	@Override
+	public Properties getProperties() {
+		Properties props = new Properties();
+		props.put(MetadataStore.STORE_PROPERTY_DESCRIPTION, STORE_PROPERTY_DESCRIPTION);
+		props.put(MetadataStore.STORE_PROPERTY_TYPE, STORE_PROPERTY_TYPE);
+		props.put(STORE_PROPERTY_ID, METADATA_STORE_ID);
+		return props;
+	}
+
+	@Override
+	public String getRepositoryId() {
+		return METADATA_STORE_ID;
+	}
+
+	@Override
+	public List<MetaDataObjectReference> search(String query) {
+		if ((query == null) || query.isEmpty()) {
+			throw new MetadataStoreException("The search term cannot be null or empty.");
+		}
+		logger.log(Level.INFO, MessageFormat.format("Processing query \"{0}\".", query));
+		synchronized (accessLock) {
+			LinkedList<String> queryElements = new LinkedList<String>();
+			for (String el : query.split(DefaultMetadataQueryBuilder.SEPARATOR_STRING)) {
+				queryElements.add(el);
+			}
+			List<MetaDataObjectReference> result = new ArrayList<MetaDataObjectReference>();
+			String firstOperator = queryElements.removeFirst();
+
+			if (firstOperator.equals(DefaultMetadataQueryBuilder.DATASET_IDENTIFIER)) {
+				String requestedObjectType = queryElements.removeFirst();
+				for (StoredMetaDataObject currentInternalObject : getObjects().values()) {
+					MetaDataObject currentObject = currentInternalObject.getMetaDataObject();
+					String currentObjectType = getObjectType(currentObject);
+					try {
+						if (isSubTypeOf(requestedObjectType, currentObjectType)
+								&& isConditionMet(currentObject, queryElements)) {
+							result.add(currentObject.getReference());
+						}
+					} catch (IllegalArgumentException | IllegalAccessException e) {
+						throw new MetadataStoreException(
+								MessageFormat.format("Error processing \"{0}\" clause of query.",
+										DefaultMetadataQueryBuilder.DATASET_IDENTIFIER));
+					}
+				}
+				return result;
+			} else {
+				throw new MetadataStoreException(MessageFormat.format("Query ''{0}'' is not valid.", query));
+			}
+		}
+	}
+
+	@Override
+	public void createSampleData() {
+		logger.log(Level.INFO, "Creating sample data in metadata store.");
+		SampleDataHelper.copySampleFiles();
+		WritableMetadataStoreUtils.createSampleDataObjects(this);
+	}
+
+	@Override
+	public AnnotationPropagator getAnnotationPropagator() {
+		return new AnnotationPropagator() {
+
+			@Override
+			public void propagateAnnotations(AnnotationStore as, String requestId) {
+				List<Annotation> annotations = as.getAnnotations(null, requestId);
+				for (Annotation annot : annotations) {
+					ensureAnnotationTypeExists(annot);
+					annot.setReference(null); // Set reference to null because a new reference will be generated by the metadata store
+					getMetadataStore().createObject(annot);
+					commit();
+				}
+			}
+		};
+	}
+
+	/**
+	 * Internal helper that creates a list of types in the metadata store.
+	 *
+	 * @param typeList List of types to be created
+	 */
+	private void createTypes(List<Class<?>> typeList) {
+		synchronized (accessLock) {
+			for (Class<?> type : typeList) {
+				if (!typeStore.containsKey(type.getSimpleName())) {
+					logger.log(Level.INFO,
+							MessageFormat.format("Creating new type \"{0}\" in metadata store.", type.getSimpleName()));
+					typeStore.put(type.getSimpleName(), type.getSuperclass().getSimpleName());
+				} else {
+					throw new MetadataStoreException(MessageFormat.format(
+							"A type with the name \"{0}\" already exists in this metadata store.", type.getName()));
+				}
+			}
+		}
+	};
+
+	/**
+	 * Internal helper that returns the type name of a given metadata object.
+	 *
+	 * @param mdo Metadata object
+	 * @return Type name 
+	 */
+	protected String getObjectType(MetaDataObject mdo) {
+		if (mdo instanceof Annotation) {
+			// Important when using the MetadataStore as an AnnotationStore
+			return ((Annotation) mdo).getAnnotationType();
+		} else {
+			return mdo.getClass().getSimpleName();
+		}
+	}
+
+	/**
+	 * Internal helper that checks if a type is a sub type of another type 
+	 *
+	 * @param subTypeName Name of the type that is supposed to be the sub type
+	 * @param parentTypeName Name of the type that is supposed to be the parent type
+	 */
+	private boolean isSubTypeOf(String subTypeName, String parentTypeName) {
+		if (subTypeName.equals(parentTypeName)) {
+			return true;
+		}
+		if (typeStore.get(parentTypeName) != null) {
+			String parent = typeStore.get(parentTypeName);
+			if ((parent != null) && (!parent.equals(parentTypeName))) {
+				if (isSubTypeOf(subTypeName, parent)) {
+					return true;
+				}
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * Internal helper that checks if the attributes of a given metadata object meet a given condition. 
+	 *
+	 * @param mdo Metadata object
+	 * @param condition List of tokens that make up the condition phrase
+	 */
+	private boolean isConditionMet(MetaDataObject mdo, LinkedList<String> condition)
+			throws IllegalArgumentException, IllegalAccessException {
+		if (condition.isEmpty()) {
+			return true;
+		}
+		LinkedList<String> clonedCondition = new LinkedList<String>();
+		clonedCondition.addAll(condition);
+		try {
+			JSONObject mdoJson = JSONUtils.toJSONObject(mdo);
+			logger.log(Level.FINER, MessageFormat.format("Evaluating object \"{0}\".", mdoJson));
+			while (clonedCondition.size() >= 4) {
+				// Each condition clause consists of four elements, e.g. "where
+				// name = 'BankClientsShort'" or "and name = 'BankClientsShort'"
+				String operator = clonedCondition.removeFirst();
+				String attribute = clonedCondition.removeFirst();
+				String comparator = clonedCondition.removeFirst();
+				String expectedValueWithQuotes = clonedCondition.removeFirst();
+				while ((!expectedValueWithQuotes.endsWith(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER)) && (clonedCondition.size() != 0)) {
+					expectedValueWithQuotes = expectedValueWithQuotes + DefaultMetadataQueryBuilder.SEPARATOR_STRING + clonedCondition.removeFirst();
+				}
+				if (operator.equals(DefaultMetadataQueryBuilder.CONDITION_PREFIX)
+						|| operator.equals(DefaultMetadataQueryBuilder.AND_IDENTIFIER)) {
+					if (mdoJson.containsKey(attribute)) {
+						String actualValue = (String) mdoJson.get(attribute) != null ? mdoJson.get(attribute).toString() : null;
+						if (comparator.equals(DefaultMetadataQueryBuilder.EQUALS_IDENTIFIER)) {
+							if (!expectedValueWithQuotes.equals(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER + actualValue + DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER)) {
+								// Condition is not met
+								return false;
+							}
+						} else if (comparator.equals(DefaultMetadataQueryBuilder.NOT_EQUALS_IDENTIFIER)) {
+							if (expectedValueWithQuotes.equals(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER + actualValue + DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER)) {
+								// Condition is not met
+								return false;
+							}
+						} else {
+							throw new MetadataStoreException(
+									MessageFormat.format("Unknown comparator \"{0}\" in query condition \"{1}\".",
+											new Object[] { comparator, condition.toString() }));
+						}
+					} else {
+						logger.log(Level.INFO,
+								MessageFormat.format("The object does not contain attribute \"{0}\".", attribute));
+						// Condition is not met
+						return false;
+					}
+				} else {
+					throw new MetadataStoreException(
+							MessageFormat.format("Syntax error in query condition \"{0}\".", condition.toString()));
+				}
+			}
+			if (clonedCondition.size() != 0) {
+				throw new MetadataStoreException(
+						MessageFormat.format("Error parsing trailing query elements \"{0}\".", clonedCondition));
+			}
+			// All conditions are met
+			return true;
+		} catch (JSONException e) {
+			throw new MetadataStoreException(MessageFormat.format("Error parsing JSON object {0} in query.", mdo), e);
+		}
+	}
+
+	/**
+	 * Internal helper that merges the references of a staged metadata object with the references of the current metadata object
+	 * stored in the metadata store. The missing references are added to the provided object in place.
+	 *
+	 * @param object Internal representation of a staged metadata object
+	 */
+	private void mergeReferenceMap(StoredMetaDataObject object) {
+		HashMap<String, List<MetaDataObjectReference>> mergedObjectRefMap = new HashMap<String, List<MetaDataObjectReference>>();
+		String objectId = object.getMetaDataObject().getReference().getId();
+		if (getObjects().get(objectId) != null) {
+			// Only merge if the object already exists in the metadata store
+			HashMap<String, List<MetaDataObjectReference>> originalRefMap = getObjects().get(objectId)
+					.getReferenceMap(); // Get reference map of exiting object
+			HashMap<String, List<MetaDataObjectReference>> updatedObjectRefMap = object.getReferenceMap();
+			for (String referenceId : updatedObjectRefMap.keySet()) {
+				// Update original reference map in place
+				mergedObjectRefMap.put(referenceId,
+						InternalMetaDataUtils.mergeReferenceLists(originalRefMap.get(referenceId), updatedObjectRefMap.get(referenceId)));
+			}
+			object.setReferencesMap(mergedObjectRefMap);
+		}
+	}
+
+	@Override
+	public void commit() {
+		synchronized (accessLock) {
+			// Check if all required types exist BEFORE starting to create the
+			// objects in order to avoid partial creation of objects
+			for (Map.Entry<String, StoredMetaDataObject> mapEntry : this.stagedObjects.entrySet()) {
+				String typeName = getObjectType(mapEntry.getValue().getMetaDataObject());
+				if ((typeName == null) || !typeStore.containsKey(typeName)) {
+					throw new MetadataStoreException(MessageFormat.format(
+							"The type \"{0}\" of the object you are trying to create does not exist in this metadata store.",
+							typeName));
+				}
+			}
+
+			// Move objects from staging area into metadata store
+			for (Map.Entry<String, StoredMetaDataObject> mapEntry : this.stagedObjects.entrySet()) {
+				StoredMetaDataObject object = mapEntry.getValue();
+				String typeName = getObjectType(mapEntry.getValue().getMetaDataObject());
+				logger.log(Level.INFO,
+						MessageFormat.format(
+								"Creating or updating object with id ''{0}'' and type ''{1}'' in metadata store.",
+								new Object[] { object.getMetaDataObject().getReference(), typeName }));
+				String objectId = object.getMetaDataObject().getReference().getId();
+				mergeReferenceMap(object); // Merge new object references with
+											// existing object references in
+											// metadata store
+				getObjects().put(objectId, object);
+			}
+
+			// Clear staging area
+			stagedObjects = new LinkedHashMap<String, StoredMetaDataObject>();
+		}
+	}
+
+	/**
+	 * Internal helper that creates a new annotation type in the internal type store if it does not yet exist.
+	 *
+	 * @param mds Metadata store to operate on
+	 */
+	private void ensureAnnotationTypeExists(Annotation annotation) {
+		String annotationType = annotation.getAnnotationType();
+		if (typeStore.get(annotationType) == null) {
+			if (annotation instanceof ProfilingAnnotation) {
+				typeStore.put(annotationType, "ProfilingAnnotation");
+			} else if (annotation instanceof ClassificationAnnotation) {
+				typeStore.put(annotationType, "ClassificationAnnotation");
+			} else if (annotation instanceof RelationshipAnnotation) {
+				typeStore.put(annotationType, "RelationshipAnnotation");
+			}
+		}
+	}
+}


Mime
View raw message