atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [16/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
Date Wed, 28 Jun 2017 05:57:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java
new file mode 100755
index 0000000..4ffa195
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java
@@ -0,0 +1,454 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.analysis.*;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.AnnotationPropagator;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.UnknownDataSet;
+import org.apache.atlas.odf.core.Encryption;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.core.Utils;
+
+public class ControlCenter {
+
+	private static final String CLASSNAME = ControlCenter.class.getName();
+	private Logger logger = Logger.getLogger(ControlCenter.class.getName());
+
+	public static final String HEALTH_TEST_DISCOVERY_SERVICE_ID = "odf-health-test-discovery-service-id";
+	public static final String HEALTH_TEST_DATA_SET_ID_PREFIX = "odf-health-test-dummy-data-set-id";
+
+	DiscoveryServiceQueueManager queueManager = null;
+	AnalysisRequestTrackerStore store = null;
+	Environment environment = null;
+	OpenDiscoveryFramework odf;
+
+	public ControlCenter() {
+		ODFInternalFactory f = new ODFInternalFactory();
+		queueManager = f.create(DiscoveryServiceQueueManager.class);
+		store = f.create(AnalysisRequestTrackerStore.class);
+		odf = new ODFFactory().create();
+		environment = f.create(Environment.class);
+	}
+
+	private String createNewRequestId() {
+		return "odf-request-" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis();
+	}
+
+	public DiscoveryServiceQueueManager getQueueManager() {
+		return queueManager;
+	}
+
+	public AnalysisResponse startRequest(AnalysisRequest request) {
+		final String METHODNAME = "startRequest()";
+		logger.entering(CLASSNAME, METHODNAME);
+		AnalysisResponse response = new AnalysisResponse();
+		AnalysisRequest requestWithServiceSequence = null;
+		try {
+			requestWithServiceSequence = JSONUtils.fromJSON(JSONUtils.toJSON(request), AnalysisRequest.class);
+		} catch (JSONException e) {
+			throw new RuntimeException("Error cloning analysis request.");
+		}
+		if ((request.getDiscoveryServiceSequence() == null) || request.getDiscoveryServiceSequence().isEmpty()) {
+			DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+			List<String> discoveryServiceSequence = mapper.getRecommendedDiscoveryServiceSequence();
+			logger.log(Level.INFO, "Using discovery service sequence: " + Utils.joinStrings(discoveryServiceSequence, ','));
+			if (discoveryServiceSequence == null) {
+				response.setId(request.getId());
+				response.setInvalidRequest(true);
+				response.setDetails("No suitable discovery services found to create the requested annotation types.");
+				return response;
+			}
+			requestWithServiceSequence.setDiscoveryServiceSequence(discoveryServiceSequence);
+		}
+		try {
+			//Initialize queues to make sure analysis can be started
+			queueManager.start();
+		} catch (TimeoutException e) {
+			logger.warning("queues could not be started in time");
+		}
+		AnalysisRequestTracker similarTracker = store.findSimilarQueuedRequest(requestWithServiceSequence);
+		if (similarTracker != null) {
+			logger.log(Level.WARNING, "A similar request for the issued one is already in the queue.");
+			logger.log(Level.FINE, "A similar request for the issued one is already in the queue. Original request: {0}, found similar request: {1}",
+					new Object[] { JSONUtils.lazyJSONSerializer(requestWithServiceSequence),
+					JSONUtils.lazyJSONSerializer(similarTracker) });
+		}
+		String newRequestId = createNewRequestId();
+		response.setId(newRequestId);
+		requestWithServiceSequence.setId(newRequestId);
+		AnalysisRequestTracker tracker = createTracker(requestWithServiceSequence, response);
+		// if request is invalid, response was already modified and null is returned
+		if (tracker != null) {
+			tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+			logger.log(Level.FINE, "Starting new request with ID ''{0}''. Tracker: {1}", new Object[] { newRequestId, JSONUtils.lazyJSONSerializer(tracker) });
+			store.store(tracker);
+			logger.log(Level.FINEST, "Stored tracker for new request with ID ''{0}''. Tracker: {1}", new Object[] { newRequestId, JSONUtils.lazyJSONSerializer(tracker) });
+			queueManager.enqueue(tracker);
+			logger.log(Level.FINEST, "Tracker enqueued for new request with ID ''{0}''. Tracker: {1}", new Object[] { newRequestId, JSONUtils.lazyJSONSerializer(tracker) });
+		}
+		logger.exiting(CLASSNAME, METHODNAME);
+		return response;
+	}
+
+	public AnalysisRequestStatus getRequestStatus(String requestId) {
+		final String METHODNAME = "getRequestStatus(String)";
+		logger.entering(CLASSNAME, METHODNAME);
+		AnalysisRequestStatus result = new AnalysisRequestStatus();
+		AnalysisRequestTracker tracker = store.query(requestId);
+		if (tracker == null) {
+			result.setState(AnalysisRequestStatus.State.NOT_FOUND);
+		} else {
+			AnalysisRequestStatus.State state = null;
+			switch (tracker.getStatus()) {
+			case INITIALIZED:
+			case IN_DISCOVERY_SERVICE_QUEUE:
+				state = AnalysisRequestStatus.State.QUEUED;
+				break;
+			case ERROR:
+				state = AnalysisRequestStatus.State.ERROR;
+				break;
+			case DISCOVERY_SERVICE_RUNNING:
+				state = AnalysisRequestStatus.State.ACTIVE;
+				break;
+			case FINISHED:
+				state = AnalysisRequestStatus.State.FINISHED;
+				break;
+			case CANCELLED:
+				state = AnalysisRequestStatus.State.CANCELLED;
+			default:
+				;
+			}
+			result.setState(state);
+			result.setDetails(tracker.getStatusDetails());
+			result.setRequest(tracker.getRequest());
+
+			long totalProcessingTime = 0;
+			long totalQueuingTime = 0;
+			long totalTimeSpentStoringAnnotations = 0;
+
+			List<DiscoveryServiceRequest> requests = new ArrayList<DiscoveryServiceRequest>();
+			for (DiscoveryServiceRequest req : tracker.getDiscoveryServiceRequests()) {
+				DiscoveryServiceRequest copyReq = new DiscoveryServiceRequest();
+				copyReq.setDiscoveryServiceId(req.getDiscoveryServiceId());
+				long putOnQueue = req.getPutOnRequestQueue();
+				long startedProcessing = req.getTakenFromRequestQueue();
+				long finishedProcessing = req.getFinishedProcessing();
+
+				totalProcessingTime += (finishedProcessing > 0 ? finishedProcessing - startedProcessing : finishedProcessing);
+				totalQueuingTime += (startedProcessing > 0 ? startedProcessing - putOnQueue : startedProcessing);
+				totalTimeSpentStoringAnnotations += req.getTimeSpentStoringResults();
+
+				copyReq.setFinishedProcessing(finishedProcessing);
+				copyReq.setPutOnRequestQueue(putOnQueue);
+				copyReq.setTakenFromRequestQueue(startedProcessing);
+				requests.add(copyReq);
+			}
+
+			result.setTotalTimeOnQueues(totalQueuingTime);
+			result.setTotalTimeProcessing(totalProcessingTime);
+			result.setTotalTimeStoringAnnotations(totalTimeSpentStoringAnnotations);
+			result.setServiceRequests(requests);
+		}
+		logger.log(Level.FINE, "Returning request status object {0}", JSONUtils.lazyJSONSerializer(result));
+		logger.exiting(CLASSNAME, METHODNAME);
+		return result;
+	}
+
+	public AnalysisCancelResult cancelRequest(String requestId) {
+		final String METHODNAME = "cancelRequest(String)";
+		logger.entering(CLASSNAME, METHODNAME);
+
+		AnalysisCancelResult result = new AnalysisCancelResult();
+		result.setState(AnalysisCancelResult.State.NOT_FOUND);
+
+		AnalysisRequestTracker request = store.query(requestId);
+		//TODO implement cancellation of running instead of only queued requests.
+		if (request != null) {
+			if (TrackerUtil.isCancellable(request)) {
+				request.setStatus(AnalysisRequestTrackerStatus.STATUS.CANCELLED);
+				store.store(request);
+				logger.info("cancelled request with id " + requestId);
+				result.setState(AnalysisCancelResult.State.SUCCESS);
+			} else {
+				logger.log(Level.FINER, "Request ''{0}'' could not be cancelled. State ''{1}'', next request number:. ''{2}''", new Object[]{requestId, request.getStatus(), request.getNextDiscoveryServiceRequest()});
+				result.setState(AnalysisCancelResult.State.INVALID_STATE);
+			}
+		}
+		logger.exiting(CLASSNAME, METHODNAME);
+		return result;
+	}
+
+	private AnalysisRequestTracker createTracker(AnalysisRequest request, AnalysisResponse response) {
+		DiscoveryServiceManager discoveryServiceManager = odf.getDiscoveryServiceManager();
+		List<DiscoveryServiceProperties> registeredServices = new ArrayList<>(discoveryServiceManager.getDiscoveryServicesProperties());
+		registeredServices.add(HealthCheckServiceRuntime.getHealthCheckServiceProperties());
+		String currentUser = this.environment.getCurrentUser();
+
+		/*
+		List<MetaDataObjectReference> datasets = request.getDataSets();
+		
+		if (datasets.size() == 1 && datasets.get(0).getId().startsWith(HEALTH_TEST_DATA_SET_ID_PREFIX)) {
+			// health test mode
+			AnalysisRequestTracker healthTestTracker = new AnalysisRequestTracker();
+			DiscoveryServiceRequest dssr = new DiscoveryServiceRequest();
+			dssr.setOdfRequestId(request.getId());
+			dssr.setDiscoveryServiceId(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID);
+			String odfUrl = new ODFFactory().create().getSettingsManager().getODFSettings().getOdfUrl();
+			dssr.setOdfUrl(odfUrl);
+			MetaDataObjectReference dsr = datasets.get(0);
+			
+			DataSetContainer dataSetContainer = new DataSetContainer();
+			DataSet oMDataSet = new UnknownDataSet();	
+			oMDataSet.setReference(dsr);
+			dataSetContainer.setDataSet(oMDataSet);
+			
+			dssr.setDataSetContainer(dataSetContainer);
+			dssr.setUser(currentUser);
+			dssr.setAdditionalProperties(request.getAdditionalProperties());
+			healthTestTracker.setDiscoveryServiceRequests(Collections.singletonList(dssr));
+			healthTestTracker.setRequest(request);
+			healthTestTracker.setStatus(STATUS.INITIALIZED);
+			Utils.setCurrentTimeAsLastModified(healthTestTracker);
+			healthTestTracker.setUser(currentUser);
+			response.setDetails("Request is a special health test request.");
+			return healthTestTracker;
+		}
+		*/
+
+		List<DiscoveryServiceRequest> startRequests = new ArrayList<DiscoveryServiceRequest>();
+		List<String> discoveryServiceSequence = request.getDiscoveryServiceSequence();
+		if (discoveryServiceSequence != null && !discoveryServiceSequence.isEmpty()) {
+			logger.log(Level.FINE, "Request issued with fixed discovery service sequence: {0}", discoveryServiceSequence);
+			// first check if discoveryService IDs are valid
+			Set<String> foundDSs = new HashSet<String>(discoveryServiceSequence);
+			for (String ds : discoveryServiceSequence) {
+				for (DiscoveryServiceProperties regInfo : registeredServices) {
+					if (regInfo.getId().equals(ds)) {
+						foundDSs.remove(ds);
+					}
+				}
+			}
+			// if there are some IDs left that were not found 
+			if (!foundDSs.isEmpty()) {
+				String msg = MessageFormat.format("The discovery services {0} could not be found", Utils.collectionToString(foundDSs, ","));
+				logger.log(Level.WARNING, msg);
+				response.setInvalidRequest(true);
+				response.setDetails(msg);
+				return null;
+			}
+
+			// for each data set process all discovery services
+			// (possible alternative, not used here: for all discovery services process each data set)
+			for (MetaDataObjectReference dataSetId : request.getDataSets()) {
+				MetaDataObject mdo = null;
+				if (dataSetId.getId().startsWith(HEALTH_TEST_DATA_SET_ID_PREFIX)) {
+					mdo = new UnknownDataSet();
+					mdo.setReference(dataSetId);
+				} else {
+					mdo = odf.getMetadataStore().retrieve(dataSetId);
+				}
+				if (mdo == null) {
+					String msg = MessageFormat.format("The meta data object id ''{0}'' does not reference an existing metadata object. Request will be set to error.", dataSetId.toString());
+					logger.log(Level.WARNING, msg);
+					response.setInvalidRequest(true);
+					response.setDetails(msg);
+					return null;
+				}
+				if (dataSetId.getUrl() == null) {
+					dataSetId.setUrl(mdo.getReference().getUrl());
+				}
+				for (String ds : discoveryServiceSequence) {
+					DiscoveryServiceRequest req = new DiscoveryServiceRequest();
+					DataSetContainer dataSetContainer = new DataSetContainer();
+					dataSetContainer.setDataSet(mdo);
+					req.setDataSetContainer(dataSetContainer);
+					req.setOdfRequestId(request.getId());
+					req.setDiscoveryServiceId(ds);
+					req.setUser(currentUser);
+					req.setAdditionalProperties(request.getAdditionalProperties());
+					String odfUrl = odf.getSettingsManager().getODFSettings().getOdfUrl();
+					req.setOdfUrl(odfUrl);
+					for (DiscoveryServiceProperties dsri : odf.getDiscoveryServiceManager().getDiscoveryServicesProperties()) {
+						if (dsri.getId().equals(ds)) {
+							if (dsri.getEndpoint().getRuntimeName().equals(SparkServiceRuntime.SPARK_RUNTIME_NAME)) {
+								req.setOdfUser(odf.getSettingsManager().getODFSettings().getOdfUser());
+								//Note that the password has to be provided as plain text here because the remote service cannot decrypt it otherwise.
+								//TODO: Consider to provide a temporary secure token instead of the password.
+								req.setOdfPassword(Encryption.decryptText(odf.getSettingsManager().getODFSettings().getOdfPassword()));
+							}
+						}
+					}
+					startRequests.add(req);
+				}
+			}
+		} else {
+			String msg = "The request didn't contain any processing hints. ODF cannot process a request without an analysis sequence.";
+			logger.log(Level.WARNING, msg);
+			response.setInvalidRequest(true);
+			response.setDetails(msg);
+			return null;
+		}
+
+		AnalysisRequestTracker tracker = new AnalysisRequestTracker();
+		tracker.setDiscoveryServiceRequests(startRequests);
+		tracker.setNextDiscoveryServiceRequest(0);
+		tracker.setRequest(request);
+		tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.INITIALIZED);
+		Utils.setCurrentTimeAsLastModified(tracker);
+		tracker.setUser(currentUser);
+		return tracker;
+	}
+	
+	boolean requiresMetaDataCache(DiscoveryService service) {
+		return service instanceof SparkDiscoveryServiceProxy;
+	}
+
+	public static SyncDiscoveryService getDiscoveryServiceProxy(String discoveryServiceId, AnalysisRequest request) {
+		try {
+			ODFInternalFactory factory = new ODFInternalFactory();
+			DiscoveryServiceManager dsm = factory.create(DiscoveryServiceManager.class);
+			DiscoveryServiceProperties serviceProps = null;
+			if (discoveryServiceId.startsWith(HEALTH_TEST_DISCOVERY_SERVICE_ID)) {
+				serviceProps = HealthCheckServiceRuntime.getHealthCheckServiceProperties();
+			} else {
+				serviceProps = dsm.getDiscoveryServiceProperties(discoveryServiceId);
+			}
+			ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(discoveryServiceId);
+			if (runtime == null) {
+				throw new RuntimeException(MessageFormat.format("Service runtime for service ''{0}'' was not found.", discoveryServiceId));
+			}
+			DiscoveryService runtimeProxy = runtime.createDiscoveryServiceProxy(serviceProps);
+			SyncDiscoveryService proxy = null;
+			if (runtimeProxy instanceof AsyncDiscoveryService) {
+				proxy = new AsyncDiscoveryServiceWrapper( (AsyncDiscoveryService) runtimeProxy);
+			} else {
+				proxy = (SyncDiscoveryService) runtimeProxy;
+			}
+			proxy.setMetadataStore(factory.create(MetadataStore.class));
+			AnnotationStore as = factory.create(AnnotationStore.class);
+			if (request != null) {
+				as.setAnalysisRun(request.getId());
+			}
+			proxy.setAnnotationStore(as);
+			return proxy;
+		} catch (ServiceNotFoundException exc) {
+			throw new RuntimeException(exc);
+		}
+	}
+
+	/**
+	 * package private helper method that can be called when the current discovery service was finished
+	 * and you want to advance to the next.
+	 * NOTE: This should only be called once for all nodes, i.e., typically from a Kafka consumer
+	 *       that has runs on all nodes with the same consumer group ID.
+	 * 
+	 * @param dsRunID runID is just used for logging, could be any value
+	 * @param dsID
+	 */
+	void advanceToNextDiscoveryService(final AnalysisRequestTracker tracker) {
+		DiscoveryServiceRequest req = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+		DiscoveryServiceResponse resp = TrackerUtil.getCurrentDiscoveryServiceStartResponse(tracker);
+		String dsRunID = "N/A";
+		if (resp instanceof DiscoveryServiceAsyncStartResponse) {
+			dsRunID = ((DiscoveryServiceAsyncStartResponse) resp).getRunId();
+		}
+		String dsID = req.getDiscoveryServiceId();
+
+		TrackerUtil.moveToNextDiscoveryService(tracker);
+		DiscoveryServiceRequest nextDSReq = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+		if (nextDSReq == null) {
+			logger.log(Level.FINER, "DSWatcher: Run ''{0}'' of DS ''{1}'' was last of request ''{2}'', marking overall request as finished",
+					new Object[] { dsRunID, dsID, tracker.getRequest().getId() });
+			// overall request is finished
+			tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.FINISHED);
+			tracker.setStatusDetails("All discovery services ran successfully");
+			
+			// now propagate annotations if configured
+			logger.log(Level.FINE, "Request is finished, checking for annotation propagation");
+			Boolean doPropagation = odf.getSettingsManager().getODFSettings().getEnableAnnotationPropagation();
+			if (Boolean.TRUE.equals(doPropagation)) {
+				TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class);
+				try {
+					transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+						
+						@Override
+						public Object call() throws Exception {
+							AnnotationPropagator ap = odf.getMetadataStore().getAnnotationPropagator();
+							if (ap != null) {
+								logger.log(Level.FINE, "Annotation Propagator exists, running propagation");
+								try {
+									ap.propagateAnnotations(new ODFFactory().create().getAnnotationStore(), tracker.getRequest().getId());
+								} catch(Exception exc) {
+									logger.log(Level.SEVERE, "An unexcepted exception occurred while propagating annotations", exc);
+									tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.ERROR);
+									String msg = MessageFormat.format("An unexpected exception occured while propagating annotations: ''{0}''", Utils.getExceptionAsString(exc));
+									tracker.setStatusDetails(msg);
+								}
+							}
+							return null;
+						}
+					});
+				} catch (Exception e) {
+					// should never happen as exception is handled inside the callable
+					throw new RuntimeException(e);
+				}
+			}
+		} else {
+			logger.log(Level.FINER, "DSWatcher: Run ''{0}'' of DS ''{1}'' was not the last of request ''{2}'', moving over to next request",
+					new Object[] { dsRunID, dsID, tracker.getRequest().getId() });
+			tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+			queueManager.enqueue(tracker);
+		}
+		Utils.setCurrentTimeAsLastModified(tracker);
+		store.store(tracker);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java
new file mode 100755
index 0000000..9b16270
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import java.text.MessageFormat;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+
+/**
+*
+* Maps a list of {@link AnnotationType} objects to a list of service ids representing concrete discovery
+* services that generate the requested annotation types.
+* 
+* Internally, this class generates a list of all possible combinations of discovery services which may be
+* used to generate the requested annotation types. The combinations are then assessed and ordered by the
+* expected execution effort and the one with the least execution effort is provided. 
+*
+*/
+public class DeclarativeRequestMapper {
+
+	private Logger logger = Logger.getLogger(DeclarativeRequestMapper.class.getName());
+
+	DiscoveryServiceManager dsManager = new ODFFactory().create().getDiscoveryServiceManager();
+	List<DiscoveryServiceProperties> dsPropList = dsManager.getDiscoveryServicesProperties();
+
+	private List<DiscoveryServiceSequence> discoveryServiceSequences = new ArrayList<DiscoveryServiceSequence>();
+
+	public DeclarativeRequestMapper(AnalysisRequest request) {
+		String messageText = "Generating possible discovery service sequences for annotation types {0}.";
+		logger.log(Level.INFO, MessageFormat.format(messageText, request.getAnnotationTypes()));
+
+		this.discoveryServiceSequences = calculateDiscoveryServiceSequences(request.getAnnotationTypes());
+		Collections.sort(this.discoveryServiceSequences, new EffortComparator());
+	}
+
+	/**
+	*
+	* Represents a single discovery service sequence.
+	*
+	*/
+	public class DiscoveryServiceSequence {
+		private LinkedHashSet<String> serviceSequence;
+
+		public DiscoveryServiceSequence() {
+			this.serviceSequence = new LinkedHashSet<String>();
+		}
+
+		public DiscoveryServiceSequence(LinkedHashSet<String> serviceIds) {
+			this.serviceSequence = serviceIds;
+		}
+
+		public LinkedHashSet<String> getServiceSequence() {
+			return this.serviceSequence;
+		}
+
+		public List<String> getServiceSequenceAsList() {
+			return new ArrayList<String>(this.serviceSequence);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if ((obj == null) || !(obj instanceof DiscoveryServiceSequence)) {
+				return false;
+			}
+			return this.getServiceSequence().equals(((DiscoveryServiceSequence) obj).getServiceSequence());
+		}
+
+		// Overriding hashCode method to ensure proper results of equals() method
+		// (See of http://www.javaranch.com/journal/2002/10/equalhash.html)
+		@Override
+		public int hashCode() {
+			return Utils.joinStrings(new ArrayList<String>(this.serviceSequence), ',').hashCode();
+		}
+	}
+
+	/**
+	*
+	* Internal class that estimates the effort for executing a sequence of discovery services.
+	* Should be extended to take runtime statistics into account. 
+	*
+	*/
+	private class EffortComparator implements Comparator<DiscoveryServiceSequence> {
+		public int compare(DiscoveryServiceSequence da1, DiscoveryServiceSequence da2) {
+			if (da1.getServiceSequence().size() < da2.getServiceSequence().size()) {
+				return -1;
+			} else if (da1.getServiceSequence().size() > da2.getServiceSequence().size()) {
+				return 1;
+			} else {
+				return 0;
+			}
+		}
+	}
+
+	/**
+	 * Returns the calculated list of discovery service sequences ordered by the execution effort,
+	 * starting with the sequence that is supposed to cause the minimum execution effort.
+	 *
+	 * @return List of discovery service sequences
+	 */
+	public List<DiscoveryServiceSequence> getDiscoveryServiceSequences() {
+		return this.discoveryServiceSequences;
+	}
+
+	/**
+	 * Returns recommended discovery service sequence, i.e. the one that is supposed to cause the
+	 * minimum execution effort.
+	 *
+	 * @return Discovery service sequence
+	 */
+	public List<String> getRecommendedDiscoveryServiceSequence() {
+		if (!getDiscoveryServiceSequences().isEmpty()) {
+			return new ArrayList<String>(this.discoveryServiceSequences.get(0).getServiceSequence());
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Remove all discovery service sequences that contain a specific service id. Use this method
+	 * to update the list of discovery service sequences after a specific discovery service has
+	 * failed and should not be used any more.
+	 *
+	 * @param serviceId Id of discovery service to be removed
+	 * @return Discovery service sequence
+	 */
+	public boolean removeDiscoveryServiceSequences(String serviceId) {
+		boolean serviceRemoved = false;
+		List<DiscoveryServiceSequence> updatedList = new ArrayList<DiscoveryServiceSequence>();
+		updatedList.addAll(this.discoveryServiceSequences);
+		for (DiscoveryServiceSequence sequence : this.discoveryServiceSequences) {
+			if (sequence.getServiceSequence().contains(serviceId)) {
+				updatedList.remove(sequence);
+				serviceRemoved = true;
+			}
+		}
+		this.discoveryServiceSequences = updatedList;
+		return serviceRemoved ? true : false;
+	}
+
+	/**
+	 * Internal method that determines all possible sequences of discovery services which could be used
+	 * to generate the requested annotation type. Using recursion, all levels of prerequisites are taken
+	 * into account.
+	 *
+	 * @param annotationType Annotation type to be generated
+	 * @return List of discovery service sequences that generate the requested annotation type
+	 */
+	private List<DiscoveryServiceSequence> getDiscoveryServiceSequencesForAnnotationType(String annotationType) {
+		List<DiscoveryServiceSequence> result = new ArrayList<DiscoveryServiceSequence>();
+		for (DiscoveryServiceProperties dsProps : this.dsPropList) {
+			if ((dsProps.getResultingAnnotationTypes() != null) && dsProps.getResultingAnnotationTypes().contains(annotationType)) {
+				DiscoveryServiceSequence da = new DiscoveryServiceSequence();
+				da.getServiceSequence().add(dsProps.getId());
+				List<DiscoveryServiceSequence> discoveryApproachesForService = new ArrayList<DiscoveryServiceSequence>();
+				discoveryApproachesForService.add(da);
+
+				// If there are prerequisite annotation types, also merge their services into the result
+				if ((dsProps.getPrerequisiteAnnotationTypes() != null)
+						&& !dsProps.getPrerequisiteAnnotationTypes().isEmpty()) {
+					discoveryApproachesForService = combineDiscoveryServiceSequences(
+							calculateDiscoveryServiceSequences(dsProps.getPrerequisiteAnnotationTypes()),
+							discoveryApproachesForService);
+					;
+				}
+				logger.log(Level.INFO, "Discovery appoaches for annotationType " + annotationType + ":");
+				for (DeclarativeRequestMapper.DiscoveryServiceSequence discoveryApproach : discoveryApproachesForService) {
+					logger.log(Level.INFO,
+							Utils.joinStrings(new ArrayList<String>(discoveryApproach.getServiceSequence()), ','));
+				}
+
+				result.addAll(discoveryApproachesForService);
+			}
+		}
+		return result;
+	}
+
+	/**
+	 * Internal method that combines two lists of discovery service sequences by generating all possible
+	 * combinations of the entries of both lists. The methods avoids duplicate services in each sequence
+	 * and duplicate sequences in the resulting list.
+	 *
+	 * @param originalSequences Original list of discovery service sequences
+	 * @param additionalSequences Second list discovery service sequences
+	 * @return Combined list of discovery service sequences
+	 */
+	private List<DiscoveryServiceSequence> combineDiscoveryServiceSequences(List<DiscoveryServiceSequence> originalSequences, List<DiscoveryServiceSequence> additionalSequences) {
+		// Example scenario for combining service sequences:
+		//
+		// Lets assume a service S that generates two annotation types AT1 and AT2 and S has prerequisite
+		// annotation type AT_P. There are two services P1 and P2 creating annotation type AT_P.
+		// The possible service sequences for generating annotation type AT1 are "P1, S" and "P2, S", same for AT2.
+		//
+		// When requesting a set of annotation types AT1 and AT2, this will result in the following four combinations
+		// which contain several redundancies:
+		// "P1, S, P1, S", "P1, S, P2, S", "P2, S, P1, S", "P2, S, P2, S"
+		// 
+		// This method uses three ways of removing redundancies:
+		//
+		// 1. Given that class DiscoveryServiceSequence internally uses LinkedHashSet, duplicate services are removed from the
+		// service sequences, resulting in: "P1, S", "P1, S, P2", "P2, S, P1", "P2, S"
+		//
+		// 2. Service sequences are only merged if the last service of the additional sequence is not already part of the original
+		// one which results in: "P1, S", "P1, S", "P2, S", "P2, S"
+		// 
+		// 3. Duplicate sequences are ignored, resulting in: "P1, S", "P2, S" which is the final result.  
+
+		List<DiscoveryServiceSequence> discoveryApproaches = new ArrayList<DiscoveryServiceSequence>();
+		for (DiscoveryServiceSequence da1 : originalSequences) {
+			for (DiscoveryServiceSequence da2 : additionalSequences) {
+				DiscoveryServiceSequence da = new DiscoveryServiceSequence();
+				da.getServiceSequence().addAll(da1.getServiceSequence());
+
+				// Add the second list only if its last serviceId is not already part of the first list
+				// (Otherwise unnecessary prerequisite services might be added, because the 2nd list may use different ones)
+				if (!da1.getServiceSequence().contains(da2.getServiceSequenceAsList().get(da2.getServiceSequenceAsList().size() - 1))) {
+					da.getServiceSequence().addAll(da2.getServiceSequence());
+				}
+
+				// Avoid duplicate entries (uses DiscoveryServiceSequence.equals() method)
+				if (!discoveryApproaches.contains(da)) {
+					discoveryApproaches.add(da);
+				}
+			}
+		}
+		return discoveryApproaches;
+	}
+
+	/**
+	 * Internal method that determines all possible sequences of discovery services which could be used
+	 * to generate a set of requested annotation types.
+	 *
+	 * Each discovery service creates one or multiple annotation types and may have prerequisite annotation types.
+	 * As there may be multiple services creating the same annotation type (maybe by using different prerequisite
+	 * annotation types), this may result in a complex dependencies. Using recursion, this method iterates through 
+	 * all the dependencies in order to calculate a list of all possible sequences of discovery services that could
+	 * be used to calculate the requested annotation types.
+	 * 
+	 * @param annotationTypes List of annotation types to be generated
+	 * @return List of discovery service sequences that generate the requested annotation types
+	 */
+	private List<DiscoveryServiceSequence> calculateDiscoveryServiceSequences(List<String> annotationTypes) {
+		List<DiscoveryServiceSequence> result = null;
+
+		for (String currentType : annotationTypes) {
+			// Calculate discovery sequences for current annotation type
+			List<DiscoveryServiceSequence> additionalDiscoveryApproaches = getDiscoveryServiceSequencesForAnnotationType(currentType);
+			if (result == null) {
+				result = additionalDiscoveryApproaches;
+			} else {
+				// Merge with discovery sequences determined for the previous annotation types in the list 
+				result = combineDiscoveryServiceSequences(result, additionalDiscoveryApproaches);
+			}
+		}
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java
new file mode 100755
index 0000000..20b7661
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.annotation.AnnotationStoreUtils;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+
+/**
+ * This class is an in-memory store for both request trackers (showing the status of analysis requests) as well as
+ * a for annotations. Both trackers and annotations are put on the ODF status queue which 
+ * (a) stores as "semi"-persistent store ("semi" because Kafka's retention mechanism will evantuall delete them), and
+ * (b) a way to propagate those changes to other ODF nodes.
+ * The annotations and trackers themselves are stored in memory in static variables.
+ * 
+ * This is how it works:
+ * 1. A single consumer thread listens on the status topic
+ * 2. If an incoming status queue entry is a tracker, it stores it in the in-memory tracker store
+ *    If it is an annotation, it stores it in the in-memory annotation store
+ * 3. Queries for trackers and annotations only go against the in-memory stores
+ * 4. When a check for overaged entries occurs (a check that removes trackers form the store which are older than the queue retention time)
+ *    the annotations for overaged and finished requests are also deleted (see removeOveragedEntries())
+ *   
+ *    
+ *
+ */
+public class DefaultStatusQueueStore implements AnalysisRequestTrackerStore, AnnotationStore {
+
+	static Logger logger = Logger.getLogger(DefaultStatusQueueStore.class.getName());
+	
+	public static final long IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS = 5000;
+	
+	static Object globalRequestStoreMapLock = new Object();
+	
+	/*
+	 * http://docs.oracle.com/javase/7/docs/api/java/util/LinkedHashMap.html
+	 * 
+	 * A structural modification is any operation that adds or deletes one or more mappings or, in the case of access-ordered linked hash maps, affects iteration order. 
+	 * In insertion-ordered linked hash maps, merely changing the value associated with a key that is already contained in the map is not a structural modification. 
+	 * In access-ordered linked hash maps, merely querying the map with get is a structural modification.) 
+	 */
+	static LinkedHashMap<String, AnalysisRequestTracker> globalRequestStoreMap = new LinkedHashMap<String, AnalysisRequestTracker>();
+	
+	/*
+	 * This map is only used to track if storing an object was successful
+	 *  
+	 */
+	static ConcurrentHashMap<String, Boolean> globalStoreSuccessMap = new ConcurrentHashMap<String, Boolean>();
+		
+	private String analysisRun;
+	
+	// simplest implementation for now: just keep a simple list
+	private static List<Annotation> storedAnnotations = new LinkedList<>();
+	private static Object storedAnnotationsLock = new Object();
+
+	/**
+	 * This processor reads trackers from the queue and stores it in the globalRequestStoreMap.
+	 * The thread for this processor is created in the QueueManager implementation.
+	 *
+	 */
+	public static class StatusQueueProcessor implements QueueMessageProcessor {
+		Logger logger = Logger.getLogger(StatusQueueProcessor.class.getName());
+
+		@Override
+		public void process(ExecutorService executorService, String message, int partition, long offset) {
+			StatusQueueEntry sqe = new StatusQueueEntry();
+			try {
+				sqe = JSONUtils.fromJSON(message, StatusQueueEntry.class);
+			} catch (Exception e) {
+				logger.log(Level.WARNING, "Entry in status queue could not be processed", e);
+			}
+			
+			// first handle trackers and / or initial cleanup
+			synchronized (globalRequestStoreMapLock) {
+				if (sqe.getAnalysisRequestTracker() != null) {
+					try {
+						AnalysisRequestTracker tracker = sqe.getAnalysisRequestTracker();
+						String requestID = tracker.getRequest().getId();
+						logger.log(Level.FINEST, "Store status queue: found tracker with id ''{0}'', tracker: {1}", new Object[] { requestID, message });
+						if (tracker.getStatus() == STATUS.FINISHED) {
+							logger.log(Level.INFO, "Request with id ''{0}'' is finished, result: {1}", new Object[] { requestID, message });
+						}
+						//remove item so that it is added to the end of the list.
+						if (globalRequestStoreMap.containsKey(requestID)) {
+							globalRequestStoreMap.remove(requestID);
+						}
+
+						globalRequestStoreMap.put(requestID, tracker);
+						if (tracker != null && tracker.getRevisionId() != null) {
+							globalStoreSuccessMap.put(tracker.getRevisionId(), true);
+						}
+
+					} catch (Exception e) {
+						logger.log(Level.WARNING, "Tracker entry in status queue could not be processed", e);
+					}
+				} 				
+			}
+			
+			if (sqe.getAnnotation() != null) {
+				Annotation annot = sqe.getAnnotation();
+				logger.log(Level.FINEST, "Received annotationk over status queue: ''{0}''", annot.getReference().getId());
+				synchronized (storedAnnotationsLock) {
+					storedAnnotations.add(annot);
+					globalStoreSuccessMap.put(annot.getReference().getId(), true);
+				}
+			}
+
+			removeOveragedEntries();
+		}
+
+	}
+
+	/////////////////////////////////////////////
+	// AnalysisRequestTrackerStore interface implementation
+
+	
+	/*
+	 * This store uses the lastModified timestamp to remove overaged trackers. 
+	 * Therefore, the lastModified timestamp MUST be set before storing anything and prevent unwanted removal
+	 */
+	@Override
+	public void store(AnalysisRequestTracker tracker) {
+		String id = tracker.getRequest().getId();
+		logger.fine("Store " + id + " in trackerStore");
+
+		String revId = UUID.randomUUID() + "_" + System.currentTimeMillis();
+		tracker.setRevisionId(revId);
+		globalStoreSuccessMap.put(revId, false);
+		
+		ODFInternalFactory factory = new ODFInternalFactory();
+		DiscoveryServiceQueueManager qm = factory.create(DiscoveryServiceQueueManager.class);
+		// put the tracker onto the status queue, the actual map that is used in query() is filled by the ARTProcessor listening on the status queue
+		StatusQueueEntry sqe = new StatusQueueEntry();
+		sqe.setAnalysisRequestTracker(tracker);
+		qm.enqueueInStatusQueue(sqe);
+		waitUntilEntryArrives(revId);
+	}
+
+	private void waitUntilEntryArrives(String entryId) {
+		boolean found = false;
+		int maxNumWaits = 1500;
+		int sleepMS = 20;
+		while (maxNumWaits > 0) {
+			final Boolean storageSuccess = globalStoreSuccessMap.get(entryId);
+			if (storageSuccess != null && storageSuccess == true) {
+				found = true;
+				globalStoreSuccessMap.remove(entryId);
+				break;
+			}
+			try {
+				Thread.sleep(sleepMS);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+			maxNumWaits--;
+		}
+		if(!found){
+			final String message = "The tracker could not be stored in 30 sec!";
+			logger.warning(message);
+			throw new RuntimeException(message);
+		}else{
+			logger.fine("Tracker stored after " + ((1500 - maxNumWaits) * sleepMS) + " ms");
+		}
+	}
+
+	@Override
+	public AnalysisRequestTracker query(String analysisRequestId) {
+		logger.fine("Querying store for " + analysisRequestId);
+		synchronized (globalRequestStoreMapLock) {
+			AnalysisRequestTracker tracker = globalRequestStoreMap.get(analysisRequestId);
+			return tracker;
+		}
+	}
+	
+	@Override
+	public void clearCache() {
+		logger.fine("Clearing store cache");
+		synchronized (globalRequestStoreMapLock) {
+			globalRequestStoreMap.clear();
+		}
+	}
+	
+	private static void removeOveragedEntries(){
+		Set<String> finishedRequests = new HashSet<>();
+		logger.fine("Removing overaged entries from store");
+		synchronized (globalRequestStoreMapLock) {
+			Iterator<Entry<String, AnalysisRequestTracker>> entryIterator = globalRequestStoreMap.entrySet().iterator();
+			long maxRetentionMS = new ODFFactory().create().getSettingsManager().getODFSettings().getMessagingConfiguration().getAnalysisRequestRetentionMs();
+			long currentTimeMS = System.currentTimeMillis();
+			while(entryIterator.hasNext()){
+				Entry<String, AnalysisRequestTracker> entry = entryIterator.next();
+				AnalysisRequestTracker tracker = entry.getValue();
+				if(currentTimeMS - tracker.getLastModified() >= maxRetentionMS){
+					if (tracker.getStatus() == STATUS.FINISHED || tracker.getStatus() == STATUS.ERROR) {
+						finishedRequests.add(tracker.getRequest().getId());
+					}
+					entryIterator.remove();
+					logger.log(Level.INFO, "Removed overaged status tracker with id ''{0}''", new Object[] { entry.getKey() });
+				}else{
+					/*
+					 * items in a linkedHashMap are ordered in the way they were put into the map.
+					 * Because of this, if one item is not overaged, all following won't be either
+					*/
+					break;
+				}
+			}
+		}
+		synchronized (storedAnnotationsLock) {
+			ListIterator<Annotation> it = storedAnnotations.listIterator();
+			while (it.hasNext()) {
+				Annotation annot = it.next();
+				if (finishedRequests.contains(annot.getAnalysisRun())) {
+					it.remove();
+				}
+			}
+		}
+	}
+
+	@Override
+	public int getSize() {
+		synchronized (globalRequestStoreMapLock) {
+			return globalRequestStoreMap.keySet().size();
+		}
+	}
+	
+	@Override
+	public AnalysisRequestTracker findSimilarQueuedRequest(AnalysisRequest request) {
+		synchronized (globalRequestStoreMapLock) {
+			for (AnalysisRequestTracker tracker : globalRequestStoreMap.values()) {
+				long startedAfterLimit = System.currentTimeMillis() - IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS;
+				if (TrackerUtil.isAnalysisWaiting(tracker) || 
+						(tracker.getNextDiscoveryServiceRequest() == 0 && tracker.getStatus() == STATUS.DISCOVERY_SERVICE_RUNNING && tracker.getLastModified() >= startedAfterLimit)) {
+					AnalysisRequest otherRequest = tracker.getRequest();
+					List<MetaDataObjectReference> dataSets = request.getDataSets();
+					List<MetaDataObjectReference> otherDataSets = otherRequest.getDataSets();
+					
+					if (otherDataSets.containsAll(dataSets) && tracker.getDiscoveryServiceRequests().get(0).getDiscoveryServiceId().equals(
+							request.getDiscoveryServiceSequence().get(0))) {
+						logger.log(Level.FINEST, "Found similar request for request {0}", new Object[] { request.getId()});
+						return tracker;
+					}
+				}
+			}
+			return null;
+		}
+	}
+
+	
+	@Override
+	public List<AnalysisRequestTracker> getRecentTrackers(int offset, int limit) {
+		if (offset < 0) {
+			throw new RuntimeException("Offset parameter cannot be negative.");
+		}
+		if (limit < -1) {
+			throw new RuntimeException("Limit parameter cannot be smaller than -1.");
+		}
+		synchronized (globalRequestStoreMapLock) {
+			List<AnalysisRequestTracker> arsList = new ArrayList<>();
+			Iterator<Map.Entry<String, AnalysisRequestTracker>> it = globalRequestStoreMap.entrySet().iterator();
+			// filter out health check requests
+			while (it.hasNext()) {
+				AnalysisRequestTracker t = it.next().getValue();
+				if (!t.getRequest().getDataSets().get(0).getId().startsWith(ControlCenter.HEALTH_TEST_DATA_SET_ID_PREFIX)) {
+					arsList.add(t);
+				}
+			}
+			// now pick number many requests from the end
+			List<AnalysisRequestTracker> result = new ArrayList<>();
+			if (arsList.size() > offset) {
+				int startIndex = arsList.size() - offset - limit;
+				if (limit == -1 || startIndex < 0) {
+					startIndex = 0;
+				}
+				int endIndex = arsList.size() - offset - 1;
+				if (endIndex < 0) {
+					endIndex = 0;
+				}
+				for (int i=endIndex ; i>=startIndex; i--) {
+					result.add(arsList.get(i));
+				}
+			}
+			return result;
+		}
+	}
+	
+	@Override
+	public AnalysisRequestSummary getRequestSummary() {
+		synchronized (globalRequestStoreMapLock) {
+			try {
+				List<AnalysisRequestTracker> recentTrackers = this.getRecentTrackers(0, -1);
+				int totalSuccess = 0;
+				int totalFailure = 0;
+	
+				for (AnalysisRequestTracker tracker : recentTrackers) {
+					if (STATUS.FINISHED.equals(tracker.getStatus())) {
+						totalSuccess++;
+					} else if (STATUS.ERROR.equals(tracker.getStatus())) {
+						totalFailure++;
+					}
+				}
+				return new AnalysisRequestSummary(totalSuccess, totalFailure);
+			} catch (Exception exc) {
+				throw new RuntimeException(exc);
+			}
+		}	
+	}
+
+	/////////////////////////////////////////////
+	// AnnotationStore interface implementation
+	
+	@Override
+	public Properties getProperties() {
+		Properties props = new Properties();
+		props.put(STORE_PROPERTY_TYPE, "DefaultAnnotationStore");
+		props.put(STORE_PROPERTY_ID, getRepositoryId());
+		props.put(STORE_PROPERTY_DESCRIPTION, "A default in-memory implementation of the annotation store storing its results via Kafka");
+		return props;
+	}
+
+	@Override
+	public String getRepositoryId() {
+		return "ODFDefaultAnnotationStore";
+	}
+
+	@Override
+	public ConnectionStatus testConnection() {
+		return ConnectionStatus.OK;
+	}
+
+	@Override
+	public MetaDataObjectReference store(Annotation annotation) {
+		// clone object
+		try {
+			annotation = JSONUtils.cloneJSONObject(annotation);
+		} catch (JSONException e) {
+			logger.log(Level.SEVERE, "Annotation could not be stored because JSON conversion failed.", e);
+			throw new RuntimeException(e);
+		}
+		
+		// create a new reference
+		String annotId = "Annot" + UUID.randomUUID() + "_" + System.currentTimeMillis();
+		logger.log(Level.FINEST, "Storing annotation with ID ''{0}''", annotId);
+		MetaDataObjectReference ref = new MetaDataObjectReference();
+		ref.setId(annotId);
+		ref.setRepositoryId(getRepositoryId());
+		annotation.setReference(ref);
+		if (analysisRun != null) {
+			annotation.setAnalysisRun(analysisRun);
+		}
+		
+		// re-use mechanism from status queue to wait until message has arrived via Kafka
+		globalStoreSuccessMap.put(annotId, false);
+		DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+		StatusQueueEntry sqe = new StatusQueueEntry();
+		sqe.setAnnotation(annotation);
+		qm.enqueueInStatusQueue(sqe);
+		waitUntilEntryArrives(annotId);
+		return ref;
+	}
+
+	@Override
+	public List<Annotation> getAnnotations(MetaDataObjectReference object, String analysisRequestId) {
+		List<Annotation> results = new ArrayList<>();
+		synchronized (storedAnnotationsLock) {
+			logger.log(Level.FINEST, "Number of annotations stored: ''{0}''", storedAnnotations.size());
+			ListIterator<Annotation> it = storedAnnotations.listIterator();
+			while (it.hasNext()) {
+				Annotation annot = it.next();
+				boolean match = true;
+				if (object != null) {
+					match = match && object.equals(AnnotationStoreUtils.getAnnotatedObject(annot));
+				}
+				if (annot.getAnalysisRun() != null) {
+					// analysisRun is not set for health check and for some of the tests
+					if (analysisRequestId != null) {
+						match &= annot.getAnalysisRun().equals(analysisRequestId);
+					}
+				}
+				if (match) {
+					results.add(annot);
+				}
+			}
+		}
+		logger.log(Level.FINEST, "Number of annotations found for request Id ''{0}'': ''{1}''", new Object[]{analysisRequestId, results.size()});
+		return results;
+	}
+
+	@Override
+	public void setAnalysisRun(String analysisRun) {
+		this.analysisRun = analysisRun;
+	}
+
+	@Override
+	public String getAnalysisRun() {
+		return this.analysisRun;
+	}
+
+	@Override
+	public Annotation retrieveAnnotation(MetaDataObjectReference ref) {
+		synchronized (storedAnnotationsLock) {
+			logger.log(Level.FINEST, "Number of annotations stored: ''{0}''", storedAnnotations.size());
+			ListIterator<Annotation> it = storedAnnotations.listIterator();
+			while (it.hasNext()) {
+				Annotation annot = it.next();
+				if (annot.getReference().equals(ref)) {
+					return annot;
+				}
+			}
+		}
+		return null;
+	}
+
+	@Override
+	public void setStatusOfOldRequest(long cutOffTimestamp, STATUS status, String detailsMessage) {
+		synchronized (globalRequestStoreMapLock) {
+			DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+			for (AnalysisRequestTracker tracker : globalRequestStoreMap.values()) {
+				if (tracker.getLastModified() < cutOffTimestamp //
+						&& (STATUS.DISCOVERY_SERVICE_RUNNING.equals(tracker.getStatus()) //
+								|| STATUS.IN_DISCOVERY_SERVICE_QUEUE.equals(tracker.getStatus()) //
+								|| STATUS.INITIALIZED.equals(tracker.getStatus()) //
+						)) {
+					// set the tracker in-memory to have the result available immediately
+					tracker.setStatus(status);
+					if (detailsMessage == null) {
+						detailsMessage = "Setting request to " + status + " because it was last modified before " + new Date(cutOffTimestamp);
+					}
+					tracker.setStatusDetails(detailsMessage);
+					// put tracker onto queue
+					StatusQueueEntry sqe = new StatusQueueEntry();
+					sqe.setAnalysisRequestTracker(tracker);
+					qm.enqueueInStatusQueue(sqe);
+				}
+			}
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java
new file mode 100755
index 0000000..0ea909f
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.lang.Thread.State;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+
+public class DefaultThreadManager implements ThreadManager {
+
+	private Logger logger = Logger.getLogger(DefaultThreadManager.class.getName());
+
+	static Object unmanagedThreadLock = new Object();
+	static Map<String, Thread> unmanagedThreadMap = new HashMap<String, Thread>();
+	static Map<String, ODFRunnable> unmanagedThreadRunnableMap = new HashMap<String, ODFRunnable>();
+	
+	ExecutorService executorService;
+
+	public DefaultThreadManager() {
+	}
+	
+	private boolean isThreadRunning(Thread thread) {
+		return thread.getState() != State.TERMINATED;
+	}
+	
+	private void purgeTerminatedThreads() {
+		List<String> entriesToBeRemoved = new ArrayList<String>();
+		List<String> entriesToBeKept = new ArrayList<String>();
+		for (Map.Entry<String, Thread> entry : unmanagedThreadMap.entrySet()) {
+			if (!isThreadRunning(entry.getValue())) {
+				entriesToBeRemoved.add(entry.getKey());
+			} else {
+				entriesToBeKept.add(entry.getKey());
+			}
+		}
+		for (String id : entriesToBeRemoved) {
+			unmanagedThreadMap.remove(id);
+			unmanagedThreadRunnableMap.remove(id);
+		}
+		logger.finer("Removed finished threads: " + entriesToBeRemoved.toString());
+		logger.finer("Kept unfinished threads: " + entriesToBeKept.toString());
+	}
+	
+	@Override
+	public ThreadStartupResult startUnmanagedThread(final String id, final ODFRunnable runnable) {
+		ThreadStartupResult result = new ThreadStartupResult(id) {
+			@Override
+			public boolean isReady() {
+				synchronized (unmanagedThreadLock) {
+					if (unmanagedThreadRunnableMap.containsKey(id)) {
+						return unmanagedThreadRunnableMap.get(id).isReady();
+					}
+				}
+				return false;
+			}
+		};
+		synchronized (unmanagedThreadLock) {
+			purgeTerminatedThreads();
+			Thread t = unmanagedThreadMap.get(id);
+			if (t != null) {
+				if (isThreadRunning(t)) {
+					return result;
+				}
+			} 
+			runnable.setExecutorService(executorService);
+
+			Thread newThread = new Thread(runnable);
+			result.setNewThreadCreated(true);
+			newThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+				@Override
+				public void uncaughtException(Thread thread, Throwable throwable) {
+					logger.log(Level.WARNING, "Uncaught exception in thread " + id + " - Thread will shutdown!", throwable);
+					synchronized (unmanagedThreadLock) {
+						purgeTerminatedThreads();
+					}
+				}
+			});
+
+			newThread.setDaemon(true); // TODO is it a daemon?
+			newThread.start();
+			unmanagedThreadMap.put(id, newThread);
+			unmanagedThreadRunnableMap.put(id,  runnable);
+		}
+		return result;
+	}
+
+	@Override
+	public ThreadStatus.ThreadState getStateOfUnmanagedThread(String id) {
+		synchronized (unmanagedThreadLock) {
+			Thread t = unmanagedThreadMap.get(id);
+			if (t == null) {
+				return ThreadStatus.ThreadState.NON_EXISTENT;
+			}
+			Thread.State ts = t.getState();
+			switch (ts) {
+			case TERMINATED:
+				return ThreadStatus.ThreadState.FINISHED;
+			default:
+				return ThreadStatus.ThreadState.RUNNING;
+			}
+		}
+	}
+
+
+
+	@Override
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
+
+	@Override
+	public void shutdownAllUnmanagedThreads() {
+		synchronized (unmanagedThreadLock) {
+			logger.log(Level.INFO, "Shutting down all ODF threads...");
+			for (String id : unmanagedThreadMap.keySet()) {
+				shutdownThreadImpl(id, false);
+			}
+			unmanagedThreadMap.clear();
+			unmanagedThreadRunnableMap.clear();
+			logger.log(Level.INFO, "All ODF threads shutdown");
+			purgeTerminatedThreads();
+		}		
+	}
+	
+	public void shutdownThreads(List<String> names) {
+		synchronized (unmanagedThreadLock) {
+			for (String name : names) {
+				shutdownThreadImpl(name, true);
+			}
+		}		
+	}
+
+	private void shutdownThreadImpl(String id, boolean purge) {
+		Thread t = unmanagedThreadMap.get(id);
+		if (t == null) {
+			return;
+		}
+		ODFRunnable r = unmanagedThreadRunnableMap.get(id);
+		r.cancel();
+		try {
+			Thread.sleep(500);
+		} catch (InterruptedException e1) {
+			e1.printStackTrace();
+		}
+		int max = 60;
+		while (t.getState() != Thread.State.TERMINATED) {
+			if (max == 0) {
+				break;
+			}
+			max--;
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException e) {
+				// do nothing
+				e.printStackTrace();
+			}
+		}
+		if (max == 0) {
+			logger.log(Level.WARNING, "Thread {0} did not stop on its own, must be interrupted.", id);
+			t.interrupt();
+		}
+		if (purge) {
+			purgeTerminatedThreads();
+		}
+	}
+
+	@Override
+	public int getNumberOfRunningThreads() {
+		synchronized (unmanagedThreadLock) {
+			int result = 0;
+			for (Thread t : unmanagedThreadMap.values()) {
+				if (isThreadRunning(t)) {
+					result++;
+				}
+			}
+			return result;
+		}
+	}
+
+	@Override
+	public List<ThreadStatus> getThreadManagerStatus() {
+		synchronized (unmanagedThreadLock) {
+			List<ThreadStatus> result = new ArrayList<ThreadStatus>();
+			for (Entry<String, Thread> entry : unmanagedThreadMap.entrySet()) {
+				ThreadStatus status = new ThreadStatus();
+				status.setId(entry.getKey());
+				status.setState(getStateOfUnmanagedThread(entry.getKey()));
+				ODFRunnable odfRunnable = unmanagedThreadRunnableMap.get(entry.getKey());
+				if (odfRunnable != null) {
+					status.setType(odfRunnable.getClass().getName());
+				}
+				result.add(status);
+			}
+
+			return result;
+		}
+	}
+
+	@Override
+	public void waitForThreadsToBeReady(long waitingLimitMs, List<ThreadStartupResult> startedThreads) throws TimeoutException {
+		Set<String> threadsToWaitFor = new HashSet<String>();
+		for (ThreadStartupResult res : startedThreads) {
+			//Only if a new thread was created we wait for it to be ready.
+			if (res.isNewThreadCreated()) {
+				threadsToWaitFor.add(res.getThreadId());
+			}
+		}
+		if (threadsToWaitFor.isEmpty()) {
+			return;
+		}
+
+		final int msToWait = 200;
+		final long maxPolls = waitingLimitMs / msToWait;
+		int count = 0;
+		while (threadsToWaitFor.size() > 0 && count < maxPolls) {
+			List<String> ready = new ArrayList<String>();
+			List<String> notReady = new ArrayList<String>();
+			for (ThreadStartupResult thr : startedThreads) {
+				if (thr.isReady()) {
+					ready.add(thr.getThreadId());
+					threadsToWaitFor.remove(thr.getThreadId());
+				} else {
+					notReady.add(thr.getThreadId());
+				}
+			}
+
+			logger.fine("Ready: " + ready);
+			logger.fine("NotReady: " + notReady);
+
+			try {
+				Thread.sleep(msToWait);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+			count++;
+		}
+		if (count >= maxPolls) {
+			String msg = "Threads: " + threadsToWaitFor + "' are not ready yet after " + waitingLimitMs + " ms, give up to wait for it";
+			logger.log(Level.WARNING, msg);
+			throw new TimeoutException(msg);
+		}
+		
+		logger.fine("All threads ready after " + (count * msToWait) + "ms");
+	}
+
+	@Override
+	public ODFRunnable getRunnable(String name) {
+		synchronized (unmanagedThreadLock) {
+			return unmanagedThreadRunnableMap.get(name);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java
new file mode 100755
index 0000000..0f79e0c
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.Callable;
+
+/**
+ * The default TransactionContextExecutor runs code in the same thread as the caller.
+ * 
+ */
+public class DefaultTransactionContextExecutor implements TransactionContextExecutor {
+	
+	@Override
+	public Object runInTransactionContext(Callable<Object> callable) throws Exception {
+		return callable.call();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java
new file mode 100755
index 0000000..dbfb597
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.annotation.InternalAnnotationStoreUtils;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * This class processes the entries of a discovery service queue and runs its respective discovery services in a separate thread. 
+ * 
+ */
+public class DiscoveryServiceStarter implements QueueMessageProcessor {
+
+	private Logger logger = Logger.getLogger(DiscoveryServiceStarter.class.getName());
+
+	AnalysisRequestTrackerStore trackerStore = null;
+	ControlCenter controlCenter = null;
+	Environment environment = null;
+	
+	/**
+	 * parameters must be a three element String[] containing the DiscoveryServiceRequest, the partition number (int) and the offset (long).
+	 */
+	public DiscoveryServiceStarter() {
+		ODFInternalFactory factory = new ODFInternalFactory();
+		trackerStore = factory.create(AnalysisRequestTrackerStore.class);
+		controlCenter = factory.create(ControlCenter.class);
+		environment = factory.create(Environment.class);
+	}
+	
+	private DiscoveryServiceRequest cloneDSRequestAndAddServiceProps(DiscoveryServiceRequest request, boolean requiresMetaDataCache) throws JSONException {
+		DiscoveryServiceRequest clonedRequest = JSONUtils.cloneJSONObject(request);
+		Map<String, Object> additionalProps = clonedRequest.getAdditionalProperties();
+		if (additionalProps == null) {
+			additionalProps = new HashMap<>();
+			clonedRequest.setAdditionalProperties(additionalProps);
+		}
+		// add service specific properties
+		String id = request.getDiscoveryServiceId();
+		Map<String, String> serviceProps = environment.getPropertiesWithPrefix(id);
+		additionalProps.putAll(serviceProps);
+		
+		// add cached metadata objects to request if required
+		if (requiresMetaDataCache) {
+			MetaDataObject mdo = request.getDataSetContainer().getDataSet();
+			MetadataStore mds = new ODFInternalFactory().create(MetadataStore.class);
+			clonedRequest.getDataSetContainer().setMetaDataCache(CachedMetadataStore.retrieveMetaDataCache(mds, mdo));
+		}
+
+		return clonedRequest;
+	}
+
+	
+	/**
+	 * starts the service taken from the service runtime topic.
+	 */
+	public void process(ExecutorService executorService, String message, int partition, long offset) {
+		AnalysisRequestTracker tracker = null;
+		try {
+			tracker = JSONUtils.fromJSON(message, AnalysisRequestTracker.class);
+			logger.log(Level.FINEST, "DSStarter: received tracker {0}", JSONUtils.lazyJSONSerializer(tracker));
+			// load tracker from store and check if it was cancelled in the meantime
+			AnalysisRequestTracker storedRequest = trackerStore.query(tracker.getRequest().getId());
+
+			if (storedRequest == null || storedRequest.getStatus() != STATUS.CANCELLED) {
+				// set tracker to running
+				tracker.setStatus(STATUS.DISCOVERY_SERVICE_RUNNING);
+				trackerStore.store(tracker);
+				
+				DiscoveryServiceRequest nextRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+				if (nextRequest == null) {
+					logger.log(Level.WARNING, "Request in queue has wrong format");
+					tracker.setStatus(STATUS.ERROR);
+				} else {
+					nextRequest.setTakenFromRequestQueue(System.currentTimeMillis());
+					trackerStore.store(tracker);
+					String dsID = nextRequest.getDiscoveryServiceId();
+					SyncDiscoveryService nextService = ControlCenter.getDiscoveryServiceProxy(dsID, tracker.getRequest());
+					if (nextService == null) {
+						logger.log(Level.WARNING, "Discovery Service ''{0}'' could not be created", dsID);
+						throw new DiscoveryServiceUnreachableException("Java proxy for service with id " + dsID + " could not be created");
+					} else {
+						DataSetContainer ds = nextRequest.getDataSetContainer();
+						DataSetCheckResult checkResult = nextService.checkDataSet(ds);
+						if (checkResult.getDataAccess() == DataSetCheckResult.DataAccess.NotPossible) {
+							String responseDetails = "";
+							if (checkResult.getDetails() != null) {
+								responseDetails = " Reason: " + checkResult.getDetails();
+							}
+							if (tracker.getRequest().isIgnoreDataSetCheck()) {
+								String msg = MessageFormat.format("Discovery service ''{0}'' cannot process data set ''{1}''.{2} - Ignoring and advancing to next service",
+										new Object[]{dsID, ds.getDataSet().getReference(), responseDetails});
+								logger.log(Level.INFO, msg);
+								// check for next queue
+								DiscoveryServiceSyncResponse dummyResponse = new DiscoveryServiceSyncResponse();
+								dummyResponse.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+								dummyResponse.setDetails(msg);
+								TrackerUtil.addDiscoveryServiceStartResponse(tracker, dummyResponse);
+								controlCenter.advanceToNextDiscoveryService(tracker);
+							} else {
+								tracker.setStatus(STATUS.ERROR);
+								String msg = MessageFormat.format("Discovery service ''{0}'' cannot process data set ''{1}''.{2}",
+										new Object[]{dsID, ds.getDataSet().getReference(), responseDetails});
+								tracker.setStatusDetails(msg);
+								logger.log(Level.WARNING, msg);
+							}
+						} else {
+							nextService.setExecutorService(executorService);
+							runServiceInBackground(executorService, tracker, nextRequest, nextService);
+						}
+					}
+				}
+			}
+		} catch (DiscoveryServiceUnreachableException exc) {
+			logger.log(Level.WARNING, "Discovery service could not be started because it is unreachable", exc);
+			if (tracker != null) {
+				tracker.setStatus(STATUS.ERROR);
+				tracker.setStatusDetails(exc.getReason());
+			}
+		} catch (Throwable exc) {
+			logger.log(Level.WARNING, "An error occurred when starting the discovery service", exc);
+			if (tracker != null) {
+				tracker.setStatus(STATUS.ERROR);
+				tracker.setStatusDetails(Utils.getExceptionAsString(exc));
+			}
+		}
+		updateTracker(tracker);
+	}
+
+	
+	class ServiceRunner implements ODFRunnable {
+		AnalysisRequestTracker tracker;
+		DiscoveryServiceRequest nextRequest;
+		SyncDiscoveryService nextService;
+		
+		public ServiceRunner(AnalysisRequestTracker tracker, DiscoveryServiceRequest nextRequest, SyncDiscoveryService nextService) {
+			super();
+			this.tracker = tracker;
+			this.nextRequest = nextRequest;
+			this.nextService = nextService;
+		}
+
+		@Override
+		public void run() {
+			try {
+				runService(tracker, nextRequest, nextService);
+			} catch (Throwable exc) {
+				logger.log(Level.WARNING, "An error occurred when running the discovery service", exc);
+				if (tracker != null) {
+					tracker.setStatus(STATUS.ERROR);
+					tracker.setStatusDetails(Utils.getExceptionAsString(exc));
+				}
+			}
+			updateTracker(tracker);
+		}
+		
+		@Override
+		public void setExecutorService(ExecutorService service) {
+			
+		}
+		
+		@Override
+		public boolean isReady() {
+			return true;
+		}
+		
+		@Override
+		public void cancel() {
+		}
+
+	}
+	
+	
+	private void runServiceInBackground(ExecutorService executorService, final AnalysisRequestTracker tracker, final DiscoveryServiceRequest nextRequest, final SyncDiscoveryService nextService) throws JSONException {
+		String suffix = nextRequest.getDiscoveryServiceId() + "_" + nextRequest.getOdfRequestId() + UUID.randomUUID().toString();
+		String runnerId = "DSRunner_" + suffix;
+		ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class);
+		ServiceRunner serviceRunner = new ServiceRunner(tracker, nextRequest, nextService);
+		tm.setExecutorService(executorService);
+		tm.startUnmanagedThread(runnerId, serviceRunner);
+	}
+	
+	private void runService(AnalysisRequestTracker tracker, DiscoveryServiceRequest nextRequest, SyncDiscoveryService nextService) throws JSONException {
+		DiscoveryServiceResponse response = null;
+		String dsID = nextRequest.getDiscoveryServiceId();
+		boolean requiresAuxObjects = controlCenter.requiresMetaDataCache(nextService);
+		if (nextService instanceof SyncDiscoveryService) {
+			SyncDiscoveryService nextServiceSync = (SyncDiscoveryService) nextService;
+			logger.log(Level.FINER, "Starting synchronous analysis on service {0}", dsID);
+			DiscoveryServiceSyncResponse syncResponse = nextServiceSync.runAnalysis(cloneDSRequestAndAddServiceProps(nextRequest, requiresAuxObjects));
+			nextRequest.setFinishedProcessing(System.currentTimeMillis());
+			//Even if the analysis was concurrently cancelled we store the results since the service implementation could do this by itself either way.
+			long before = System.currentTimeMillis();
+			InternalAnnotationStoreUtils.storeDiscoveryServiceResult(syncResponse.getResult(), tracker.getRequest());
+			nextRequest.setTimeSpentStoringResults(System.currentTimeMillis() - before);
+			// remove result to reduce size of response
+			syncResponse.setResult(null);
+			response = syncResponse;
+		} else {
+			throw new RuntimeException("Unknown Java proxy created for service with id " + dsID);
+		}
+
+		// process response
+		if (response.getCode() == null) {
+			response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+			String origDetails = response.getDetails();
+			response.setDetails(MessageFormat.format("Discovery service did not return a response code. Assuming error. Original message: {0}", origDetails));
+		}
+		switch (response.getCode()) {
+		case UNKNOWN_ERROR:
+			TrackerUtil.addDiscoveryServiceStartResponse(tracker, response);
+			tracker.setStatus(STATUS.ERROR);
+			tracker.setStatusDetails(response.getDetails());
+			logger.log(Level.WARNING, "Discovery Service ''{2}'' responded with an unknown error ''{0}'', ''{1}''", new Object[] { response.getCode().name(),
+					response.getDetails(), dsID });
+			break;
+		case NOT_AUTHORIZED:
+			TrackerUtil.addDiscoveryServiceStartResponse(tracker, response);
+			tracker.setStatus(STATUS.ERROR);
+			tracker.setStatusDetails(response.getDetails());
+			logger.log(Level.WARNING, "Discovery Service ''{2}'' responded with an unauthorized ''{0}'', ''{1}''", new Object[] { response.getCode().name(),
+					response.getDetails(), dsID });
+			break;
+		case TEMPORARILY_UNAVAILABLE:
+			tracker.setStatus(STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+			logger.log(Level.INFO, "Discovery Service ''{2}'' responded that it is unavailable right now ''{0}'', ''{1}''", new Object[] {
+					response.getCode().name(), response.getDetails(), dsID });
+			// reqeue and finish immediately
+			controlCenter.getQueueManager().enqueue(tracker);
+			return;
+		case OK:
+			TrackerUtil.addDiscoveryServiceStartResponse(tracker, response);
+			logger.log(Level.FINER, "Synchronous Discovery Service processed request ''{0}'', ''{1}''", new Object[] { response.getCode().name(), response.getDetails() });
+			AnalysisRequestTracker storedTracker = trackerStore.query(tracker.getRequest().getId());
+			//A user could've cancelled the analysis concurrently. In this case, ignore the response and don't overwrite the tracker
+			if (storedTracker != null && storedTracker.getStatus() != STATUS.CANCELLED) {
+				// check for next queue
+				controlCenter.advanceToNextDiscoveryService(tracker);
+			} else {
+				logger.log(Level.FINER, "Not advancing analysis request because it was cancelled!");
+			}
+			break;
+		default:
+			tracker.setStatus(STATUS.ERROR);
+			tracker.setStatusDetails(response.getDetails());
+			logger.log(Level.WARNING, "Discovery Service ''{2}'' responded with an unknown response ''{0}'', ''{1}''", new Object[] {
+					response.getCode().name(), response.getDetails(), dsID });
+			break;
+		}
+	}
+
+	private boolean updateTracker(AnalysisRequestTracker tracker) {
+		boolean cancelled = false;
+		if (tracker != null) {
+			AnalysisRequestTracker storedTracker = trackerStore.query(tracker.getRequest().getId());
+			//A user could've cancelled the analysis concurrently. In this case, ignore the response and don't overwrite the tracker
+			if (storedTracker == null || (! STATUS.CANCELLED.equals(storedTracker.getStatus())) ) {
+				Utils.setCurrentTimeAsLastModified(tracker);
+				trackerStore.store(tracker);
+			} else {
+				cancelled = true;
+				logger.log(Level.FINER, "Not storing analysis tracker changes because it was cancelled!");
+			}
+		}
+		return cancelled;
+	}
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java
new file mode 100755
index 0000000..38e0747
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+public class DiscoveryServiceUnreachableException extends RuntimeException {
+
+	private static final long serialVersionUID = 3581149213306073675L;
+	
+	private String reason;
+
+	public DiscoveryServiceUnreachableException(String reason) {
+		super(reason);
+		this.reason = reason;
+	}
+
+	public String getReason() {
+		return reason;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java
new file mode 100755
index 0000000..4cba0f6
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class ExecutorServiceFactory {
+
+	static Object execServiceLock = new Object();
+	static ExecutorService executorService = null;
+	
+	public ExecutorService createExecutorService() {
+		synchronized (execServiceLock) {
+			if (executorService == null) {
+				executorService = Executors.newCachedThreadPool();
+			}
+		}
+		return executorService;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java
new file mode 100755
index 0000000..848a673
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.api.discoveryservice.*;
+
+public class HealthCheckServiceRuntime implements ServiceRuntime {
+	public static final String HEALTH_CHECK_RUNTIME_NAME = "HealthCheck";
+
+	@Override
+	public String getName() {
+		return HEALTH_CHECK_RUNTIME_NAME;
+	}
+
+	@Override
+	public long getWaitTimeUntilAvailable() {
+		return 0;
+	}
+
+	@Override
+	public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+		return new SyncDiscoveryServiceBase() {
+			
+			@Override
+			public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+				DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+				response.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+				response.setDetails("Health check service finished successfully");
+				return response;
+			}
+		};
+	}
+	
+	public static DiscoveryServiceProperties getHealthCheckServiceProperties() {		
+		DiscoveryServiceProperties props = new DiscoveryServiceProperties();
+		props.setId(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID);
+		props.setDescription("Health check service");
+		
+		DiscoveryServiceEndpoint ep = new DiscoveryServiceEndpoint();
+		ep.setRuntimeName(HEALTH_CHECK_RUNTIME_NAME);
+		
+		props.setEndpoint(ep);
+		return props;
+	}
+
+	@Override
+	public String getDescription() {
+		return "Internal runtime dedicated to health checks";
+	}
+
+	@Override
+	public void validate(DiscoveryServiceProperties props) throws ValidationException {
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java
new file mode 100755
index 0000000..61a29b1
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.settings.validation.ImplementationValidator;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.core.Utils;
+
+public class JavaServiceRuntime implements ServiceRuntime {
+
+	Logger logger = Logger.getLogger(JavaServiceRuntime.class.getName());
+
+	public static final String NAME = "Java";
+	
+	@Override
+	public String getName() {
+		return NAME;
+	}
+
+	@Override
+	public long getWaitTimeUntilAvailable() {
+		// for now, always run
+		return 0;
+	}
+
+	@Override
+	public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+		DiscoveryService service = null;
+		String className = null;
+		try {
+			className = JSONUtils.convert(props.getEndpoint(), DiscoveryServiceJavaEndpoint.class).getClassName();
+			Class<?> clazz = Class.forName(className);
+			Object o = clazz.newInstance();
+			service = (DiscoveryService) o;
+		} catch (Exception e) {
+			logger.log(Level.FINE, "An error occurred while instatiating Java implementation", e);
+			logger.log(Level.WARNING, "Java implementation ''{0}'' for discovery service ''{1}'' could not be instantiated (internal error: ''{2}'')",
+					new Object[] { className, props.getId(), e.getMessage() });
+			return null;
+		}
+		if (service instanceof SyncDiscoveryService) {
+			return new TransactionSyncDiscoveryServiceProxy((SyncDiscoveryService) service);
+		} else if (service instanceof AsyncDiscoveryService) {
+			return new TransactionAsyncDiscoveryServiceProxy((AsyncDiscoveryService) service);
+		}
+		return service;
+	}
+
+	@Override
+	public String getDescription() {
+		return "The default Java runtime";
+	}
+
+	@Override
+	public void validate(DiscoveryServiceProperties props) throws ValidationException {
+		DiscoveryServiceJavaEndpoint javaEP;
+		try {
+			javaEP = JSONUtils.convert(props.getEndpoint(), DiscoveryServiceJavaEndpoint.class);
+		} catch (JSONException e) {
+			throw new ValidationException("Endpoint definition for Java service is not correct: " + Utils.getExceptionAsString(e));
+		}
+		new ImplementationValidator().validate("Service.endpoint", javaEP.getClassName());
+	}
+
+}



Mime
View raw message