atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [10/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
Date Wed, 28 Jun 2017 05:57:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
new file mode 100755
index 0000000..900c214
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.UnknownDataSet;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.metadata.DefaultMetadataStore;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+
+public class ODFAPITest extends ODFTestBase {
+
+	public static int WAIT_MS_BETWEEN_POLLING = 500;
+	public static int MAX_NUMBER_OF_POLLS = 500;
+	public static String DUMMY_SUCCESS_ID = "success";
+	public static String DUMMY_ERROR_ID = "error";
+
+	public static void runRequestAndCheckResult(String dataSetID, AnalysisRequestStatus.State expectedState, int expectedProcessedDiscoveryRequests) throws Exception{
+		runRequestAndCheckResult(Collections.singletonList(dataSetID), expectedState, expectedProcessedDiscoveryRequests);
+	}
+	
+	public static void runRequestAndCheckResult(List<String> dataSetIDs, AnalysisRequestStatus.State expectedState, int expectedProcessedDiscoveryRequests) throws Exception{
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+		String id = runRequest(dataSetIDs, analysisManager);
+		log.info("Running request "+id+" on data sets: " + dataSetIDs);
+		AnalysisRequestStatus status = null;
+
+		int maxPolls = MAX_NUMBER_OF_POLLS;
+		do {
+			status = analysisManager.getAnalysisRequestStatus(id);
+			log.log(Level.INFO, "{4}th poll request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new Object[] { id, status.getState(), status.getDetails(),
+					expectedState, (MAX_NUMBER_OF_POLLS-maxPolls) });
+			maxPolls--;
+			Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+		} while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+
+		log.log(Level.INFO, "Polling result after {0} polls for request id {1}: status: {2}", new Object[] {(MAX_NUMBER_OF_POLLS-maxPolls), id, status.getState()});
+		
+		Assert.assertTrue(maxPolls > 0);		
+		Assert.assertEquals(expectedState, status.getState());
+		AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+		AnalysisRequestTracker tracker = store.query(id);
+		Assert.assertNotNull(tracker);
+		checkTracker(tracker, expectedProcessedDiscoveryRequests);
+		log.info("Status details: " + status.getDetails());
+	}
+
+	static void checkTracker(AnalysisRequestTracker tracker, int expectedProcessedDiscoveryRequests) {
+		if (expectedProcessedDiscoveryRequests == -1) {
+			expectedProcessedDiscoveryRequests = tracker.getDiscoveryServiceRequests().size(); 
+		}
+		Assert.assertEquals(expectedProcessedDiscoveryRequests, tracker.getDiscoveryServiceResponses().size());
+		
+	}
+
+	static String runRequest(String dataSetID, AnalysisManager analysisManager) throws Exception {
+		return runRequest(Collections.singletonList(dataSetID), analysisManager);
+	}
+
+	public static String runRequest(List<String> dataSetIDs, AnalysisManager analysisManager) throws Exception {
+		AnalysisRequest request = createAnalysisRequest(dataSetIDs);
+		log.info("Starting analyis");
+		AnalysisResponse response = analysisManager.runAnalysis(request);
+		Assert.assertNotNull(response);
+		Assert.assertFalse(response.isInvalidRequest());
+		String id = response.getId();
+		Assert.assertNotNull(id);
+		return id;
+	}
+
+	
+	@Test
+	public void testSimpleSuccess() throws Exception {
+		runRequestAndCheckResult("successID", AnalysisRequestStatus.State.FINISHED, -1);
+	}
+
+	public static void waitForRequest(String requestId, AnalysisManager analysisManager) {
+		waitForRequest(requestId, analysisManager, MAX_NUMBER_OF_POLLS);
+	}
+	
+	public static void waitForRequest(String requestId, AnalysisManager analysisManager, int maxPolls) {
+		AnalysisRequestStatus status = null;
+
+		log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId);
+		do {
+			status = analysisManager.getAnalysisRequestStatus(requestId);
+			
+			log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() });
+			maxPolls--;
+			try {
+				Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+				throw new RuntimeException(e);
+			}
+		} while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+		if (maxPolls == 0) {
+			log.log(Level.INFO, "Request ''{0}'' is not finished yet, don't wait for it", requestId);
+		}
+		log.log(Level.INFO, "Request ''{0}'' is finished with state: ''{1}''", new Object[] { requestId, status.getState() });
+	}
+
+	public static boolean waitForRequest(String requestId, AnalysisManager analysisManager, int maxPolls, AnalysisRequestStatus.State expectedState) {
+		AnalysisRequestStatus status = null;
+
+		log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId);
+		do {
+			status = analysisManager.getAnalysisRequestStatus(requestId);
+			log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() });
+			maxPolls--;
+			try {
+				Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+				throw new RuntimeException(e);
+			}
+		} while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+		if (maxPolls == 0) {
+			log.log(Level.INFO, "Request ''{0}'' is not finished yet, don't wait for it", requestId);
+		}
+		return expectedState.equals(status.getState());
+	}
+
+	
+	@Test
+	public void testSimpleSuccessDuplicate() throws Exception {
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+		String id = runRequest("successID", analysisManager);
+		String secondId = runRequest("successID", analysisManager);
+		Assert.assertNotEquals(id, secondId);
+		//Wait limit and try if new analysis is started
+		Thread.sleep(DefaultStatusQueueStore.IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS*2 + 5000);
+		String thirdId = runRequest("successID", analysisManager);
+		Assert.assertNotEquals(secondId, thirdId);
+		waitForRequest(id, analysisManager);
+		waitForRequest(thirdId, analysisManager);
+	}
+
+	@Test
+	public void testSimpleSuccessNoDuplicate() throws Exception {
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+		String id = runRequest("successID", analysisManager);
+		String secondId = runRequest("successID2", analysisManager);
+		Assert.assertNotEquals(id, secondId);
+		waitForRequest(id, analysisManager);
+		waitForRequest(secondId, analysisManager);
+	}
+
+	@Test
+	public void testSimpleSuccessDuplicateSubset() throws Exception {
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+		String id = runRequest(Arrays.asList("successID", "successID2", "successID3"), analysisManager);
+		String secondId = runRequest("successID2", analysisManager);
+		Assert.assertNotEquals(id, secondId);
+		Thread.sleep(DefaultStatusQueueStore.IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS + 5000);
+		String thirdId = runRequest("successID", analysisManager);
+		Assert.assertNotEquals(secondId, thirdId);
+		waitForRequest(id, analysisManager);
+		waitForRequest(thirdId, analysisManager);
+	}
+	
+	/**
+	 * This test depends on the speed of execution.
+	 * An analysis that is not in state INITIALIZED or IN_SERVICE_QUEUE cannot be cancelled. 
+	 * Therefore if the analysis is started too quickly this test will fail!
+	 * 
+	 * Ignore for now as this can go wrong in the build.
+	 */
+	@Test
+	@Ignore
+	public void testCancelRequest() throws Exception {
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+		String id = runRequest(Arrays.asList("successID", "successID2", "successID3"), analysisManager);
+		AnalysisCancelResult cancelAnalysisRequest = analysisManager.cancelAnalysisRequest(id);
+		Assert.assertEquals(cancelAnalysisRequest.getState(), AnalysisCancelResult.State.SUCCESS);
+		String secondId = runRequest("successID2", analysisManager);
+		Assert.assertNotEquals(id, secondId);
+	}
+
+	
+	@Test
+	public void testRequestsWithDataSetListSuccess() throws Exception {
+		runRequestAndCheckResult(Arrays.asList("success1", "success2", "success3"), AnalysisRequestStatus.State.FINISHED, 6);
+	}
+	
+	@Test
+	public void testRequestsWithDataSetListError() throws Exception {
+		runRequestAndCheckResult(Arrays.asList("success1", "error2", "success3"), AnalysisRequestStatus.State.ERROR, 3);
+	}
+
+		
+
+	@Test
+	public void testSimpleFailure() throws Exception {
+		runRequestAndCheckResult("errorID", AnalysisRequestStatus.State.ERROR, 1);
+	}
+	
+	@Test 
+	public void testManyRequests()  throws Exception {
+		List<String> dataSets = new ArrayList<String>();
+		List<AnalysisRequestStatus.State> expectedStates = new ArrayList<AnalysisRequestStatus.State>();
+		int dataSetNum = 5;
+		for (int i=0; i<dataSetNum; i++) {
+			AnalysisRequestStatus.State expectedState = AnalysisRequestStatus.State.FINISHED;
+			String dataSet = "successdataSet" + i;
+			if (i % 3 == 0) {
+				// every third data set should fail
+				dataSet = "errorDataSet" + i;
+				expectedState = AnalysisRequestStatus.State.ERROR;
+			} 
+			dataSets.add(dataSet);
+			expectedStates.add(expectedState);
+		}
+		
+		runRequests(dataSets, expectedStates);
+	}
+
+	public void runRequests(List<String> dataSetIDs, List<AnalysisRequestStatus.State> expectedStates) throws Exception {
+		Assert.assertTrue(dataSetIDs.size() == expectedStates.size());
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+
+		Map<AnalysisRequest, AnalysisRequestStatus.State> request2ExpectedState = new HashMap<AnalysisRequest, AnalysisRequestStatus.State>();
+
+		for (int i = 0; i < dataSetIDs.size(); i++) {
+			String dataSetID = dataSetIDs.get(i);
+			AnalysisRequestStatus.State expectedState = expectedStates.get(i);
+
+			AnalysisRequest request = createAnalysisRequest(Collections.singletonList(dataSetID));
+
+			log.info("Starting analyis");
+			AnalysisResponse response = analysisManager.runAnalysis(request);
+			Assert.assertNotNull(response);
+			String id = response.getId();
+			Assert.assertFalse(response.isInvalidRequest());
+			Assert.assertNotNull(id);
+			request.setId(id);
+			request2ExpectedState.put(request, expectedState);
+		}
+
+		//		Set<AnalysisRequest> finishedRequests = new HashSet<AnalysisRequest>();
+		Map<AnalysisRequest, AnalysisRequestStatus> actualFinalStatePerRequest = new HashMap<AnalysisRequest, AnalysisRequestStatus>();
+		int maxPollPasses = 10;
+		for (int i = 0; i < maxPollPasses; i++) {
+			log.info("Polling all requests for the " + i + " th time");
+			boolean allRequestsFinished = true;
+			for (Map.Entry<AnalysisRequest, AnalysisRequestStatus.State> entry : request2ExpectedState.entrySet()) {
+
+				AnalysisRequest request = entry.getKey();
+				String id = request.getId();
+				if (actualFinalStatePerRequest.containsKey(request)) {
+					log.log(Level.INFO, "Request with ID ''{0}'' already finished, skipping it", id);
+				} else {
+					allRequestsFinished = false;
+
+					AnalysisRequestStatus.State expectedState = entry.getValue();
+
+					AnalysisRequestStatus status = null;
+
+					int maxPollsPerRequest = 3;
+					do {
+						status = analysisManager.getAnalysisRequestStatus(id);
+						log.log(Level.INFO, "Poll request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''",
+								new Object[] { id, status.getState(), status.getDetails(), expectedState });
+						maxPollsPerRequest--;
+						Thread.sleep(1000);
+					} while (maxPollsPerRequest > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+
+					if (maxPollsPerRequest > 0) {
+						// final state found
+						actualFinalStatePerRequest.put(request, status);
+						//				Assert.assertEquals(expectedState, status.getState());
+					}
+				}
+			}
+			if (allRequestsFinished) {
+				log.info("All requests finished");
+				break;
+			}
+		}
+		Assert.assertTrue(actualFinalStatePerRequest.size() == request2ExpectedState.size());
+		Assert.assertTrue(actualFinalStatePerRequest.keySet().equals(request2ExpectedState.keySet()));
+		for (Map.Entry<AnalysisRequest, AnalysisRequestStatus> actual : actualFinalStatePerRequest.entrySet()) {
+			AnalysisRequest req = actual.getKey();
+			Assert.assertNotNull(req);
+			AnalysisRequestStatus.State expectedState = request2ExpectedState.get(req);
+			Assert.assertNotNull(expectedState);
+			AnalysisRequestStatus.State actualState = actual.getValue().getState();
+			Assert.assertNotNull(actualState);
+
+			log.log(Level.INFO, "Checking request ID ''{0}'', actual state: ''{1}'', expected state: ''{2}''", new Object[] { req.getId(), actualState, expectedState });
+			Assert.assertNotNull(expectedState);
+			Assert.assertEquals(expectedState, actualState);
+		}
+	}
+
+	public static AnalysisRequest createAnalysisRequest(List<String> dataSetIDs) throws JSONException {
+		AnalysisRequest request = new AnalysisRequest();
+		List<MetaDataObjectReference> dataSetRefs = new ArrayList<>();
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		if (!(mds instanceof DefaultMetadataStore)) {
+			throw new RuntimeException(MessageFormat.format("This tests does not work with metadata store implementation \"{0}\" but only with the DefaultMetadataStore.", mds.getClass().getName()));
+		}
+		DefaultMetadataStore defaultMds = (DefaultMetadataStore) mds;
+		defaultMds.resetAllData();
+		for (String id : dataSetIDs) {
+			MetaDataObjectReference mdr = new MetaDataObjectReference();
+			mdr.setId(id);
+			dataSetRefs.add(mdr);
+			if (id.startsWith(DUMMY_SUCCESS_ID) || id.startsWith(DUMMY_ERROR_ID)) {
+				log.info("Creating dummy data set for reference : " + id.toString());
+				DataSet ds = new UnknownDataSet();
+				ds.setReference(mdr);
+				defaultMds.createObject(ds);
+			}
+		}
+		defaultMds.commit();
+		request.setDataSets(dataSetRefs);
+		List<String> serviceIds = Arrays.asList(new String[]{"asynctestservice", "synctestservice"});
+		/* use a fix list of services 
+		List<DiscoveryServiceRegistrationInfo> registeredServices = new ODFFactory().create(ControlCenter.class).getConfig().getRegisteredServices();		
+		for(DiscoveryServiceRegistrationInfo service : registeredServices){
+			serviceIds.add(service.getId());
+		}
+		*/
+		request.setDiscoveryServiceSequence(serviceIds);
+		Map<String, Object> additionalProps = new HashMap<String, Object>();
+		additionalProps.put("aaa", "bbb");
+		JSONObject jo = new JSONObject();
+		jo.put("p1", "v1");
+		jo.put("p2", "v2");
+		additionalProps.put("jo", jo);
+		request.setAdditionalProperties(additionalProps);
+		return request;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
new file mode 100755
index 0000000..9aa3ba4
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.SystemHealth;
+import org.apache.atlas.odf.api.engine.SystemHealth.HealthStatus;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.analysis.AnalysisManagerImpl;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class ParallelODFTest extends ODFTestcase {
+	Logger log = ODFTestLogger.get();
+	
+	@Test
+	public void runDataSetsInParallelSuccess() throws Exception {
+		runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "successID2" }), State.FINISHED);
+	}
+
+	@Test 
+	public void runDataSetsInParallelError() throws Exception {
+		runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "errorID2" }), State.ERROR);
+	}
+
+	private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, State expectedState) throws Exception {
+		log.info("Running data sets in parallel: " + dataSetIDs);
+		log.info("Expected state: " + expectedState);
+		AnalysisRequest req = ODFAPITest.createAnalysisRequest(dataSetIDs);
+		// Enable parallel processing because this is a parallel test
+		req.setProcessDataSetsSequentially(false);
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+		EngineManager engineManager = new ODFFactory().create().getEngineManager();
+
+		SystemHealth healthCheckResult = engineManager.checkHealthStatus();
+		Assert.assertEquals(HealthStatus.OK, healthCheckResult.getStatus());
+		AnalysisResponse resp = analysisManager.runAnalysis(req);
+		log.info("Parallel requests started");
+
+		String id = resp.getId();
+		List<String> singleIds = Utils.splitString(id, AnalysisManagerImpl.COMPOUND_REQUEST_SEPARATOR);
+		List<String> singleDetails = Utils.splitString(resp.getDetails(), AnalysisManagerImpl.COMPOUND_REQUEST_SEPARATOR);
+		Assert.assertEquals(dataSetIDs.size(), singleIds.size());
+		Assert.assertEquals(dataSetIDs.size(), singleDetails.size());
+
+		AnalysisRequestStatus status = null;
+
+		// check that requests are processed in parallel: 
+		//   there must be a point in time where both requests are in status "active"
+		log.info("Polling for status of parallel request...");
+		boolean foundPointInTimeWhereBothRequestsAreActive = false;
+		int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+		do {
+			List<State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>();
+			for (String singleId : singleIds) {
+				allSingleStates.add(analysisManager.getAnalysisRequestStatus(singleId).getState());
+			}
+			if (Utils.containsOnly(allSingleStates, new State[] { State.ACTIVE })) {
+				foundPointInTimeWhereBothRequestsAreActive = true;
+			}
+
+			status = analysisManager.getAnalysisRequestStatus(id);
+			log.log(Level.INFO, "Poll request for parallel request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new Object[] { id, status.getState(), status.getDetails(),
+					expectedState });
+			log.info("States of single requests: " + singleIds + ": " + allSingleStates);
+			maxPolls--;
+			Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+		} while (maxPolls > 0 && (status.getState() == State.ACTIVE || status.getState() == State.QUEUED));
+
+		Assert.assertTrue(maxPolls > 0);
+		Assert.assertEquals(expectedState, status.getState());
+		Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+		log.info("Parallel request status details: " + status.getDetails());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
new file mode 100755
index 0000000..9a43b78
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.logging.Level;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+
+public class SetTrackerStatusTest extends ODFTestBase {
+
+	@Test
+	public void testSetTrackerStatus() throws Exception {
+		AnalysisManager am = new ODFFactory().create().getAnalysisManager();
+		AnalysisRequestTrackerStore arts = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+		String requestId = ODFAPITest.runRequest("successId", am);
+		Thread.sleep(1000);
+		long cutOffTimestamp = System.currentTimeMillis();		
+		String testMessage = "Message was set to error at " + cutOffTimestamp;
+		arts.setStatusOfOldRequest(cutOffTimestamp, STATUS.ERROR, testMessage);
+		AnalysisRequestTracker tracker = arts.query(requestId);
+		Assert.assertEquals(STATUS.ERROR, tracker.getStatus());
+		Assert.assertEquals(testMessage, tracker.getStatusDetails());
+		
+		// wait until request is finished and state is set back to finished
+		log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId);
+		int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+		AnalysisRequestStatus status = null;
+		do {
+			status = am.getAnalysisRequestStatus(requestId);
+			log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() });
+			maxPolls--;
+			try {
+				Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		} while (maxPolls > 0 && (status.getState() != AnalysisRequestStatus.State.FINISHED) );
+		
+		Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, am.getAnalysisRequestStatus(requestId).getState());
+		tracker = arts.query(requestId);
+		Assert.assertEquals(STATUS.FINISHED, tracker.getStatus());
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
new file mode 100755
index 0000000..0f1aa8f
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.io.InputStream;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRuntimeStatistics;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceStatus;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+
+public class DiscoveryServiceManagerTest {
+	
+	final private static String ASYNCTESTWA_SERVICE_ID = "asynctestservice-with-annotations";
+
+	final private static String NEW_SERVICE_ID = "New_Service";
+	final private static String NEW_SERVICE_NAME = "Name of New Service";
+	final private static String NEW_SERVICE_DESCRIPTION = "Description of the New Service";
+	final private static String NEW_SERVICE_CLASSNAME = "TestAsyncDiscoveryService1";
+	
+	final private static String UPDATED_SERVICE_DESCRIPTION = "Updated description of the New Service";
+	final private static String UPDATED_SERVICE_CLASSNAME = "TestSyncDiscoveryService1";
+	
+	private void registerDiscoveryService(DiscoveryServiceProperties dsProperties) throws ValidationException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		discoveryServicesManager.createDiscoveryService(dsProperties);
+	}
+	
+	private void replaceDiscoveryService(DiscoveryServiceProperties dsProperties) throws ValidationException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		discoveryServicesManager.replaceDiscoveryService(dsProperties);
+	}
+	
+	private void unregisterDiscoveryService(String serviceId) throws ServiceNotFoundException, ValidationException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		discoveryServicesManager.deleteDiscoveryService(serviceId);
+	}
+		
+	@Test
+	public void testGetDiscoveryServiceProperties() throws ServiceNotFoundException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		DiscoveryServiceProperties dsProperties = discoveryServicesManager.getDiscoveryServiceProperties(ASYNCTESTWA_SERVICE_ID);
+		Assert.assertNotNull(dsProperties);
+	}
+	
+		
+	@Ignore @Test    // Ignoring testcase due to problem on Mac (issue #56)
+	public void testGetDiscoveryServiceStatus() throws ServiceNotFoundException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		DiscoveryServiceStatus dsStatus = discoveryServicesManager.getDiscoveryServiceStatus(ASYNCTESTWA_SERVICE_ID);
+		Assert.assertNotNull(dsStatus);
+	}
+	
+	@Test  // TODO: need to adjust as soon as runtime statistics are available
+	public void testGetDiscoveryServiceRuntimeStatistics() throws ServiceNotFoundException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		DiscoveryServiceRuntimeStatistics dsRuntimeStats = discoveryServicesManager.getDiscoveryServiceRuntimeStatistics(ASYNCTESTWA_SERVICE_ID);
+		Assert.assertNotNull(dsRuntimeStats);
+		long avgProcTime = dsRuntimeStats.getAverageProcessingTimePerItemInMillis();
+		Assert.assertEquals(0, avgProcTime);
+	}
+
+	@Test
+	public void testDeleteDiscoveryServiceRuntimeStatistics() throws ServiceNotFoundException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		discoveryServicesManager.deleteDiscoveryServiceRuntimeStatistics(ASYNCTESTWA_SERVICE_ID);
+	}
+
+	@Test
+	public void testGetDiscoveryServiceImage() throws ServiceNotFoundException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		InputStream is = discoveryServicesManager.getDiscoveryServiceImage(ASYNCTESTWA_SERVICE_ID);
+		Assert.assertNull(is);
+	}
+
+	@Test
+	public void testCreateUpdateDelete() throws ServiceNotFoundException, ValidationException, JSONException {
+		DiscoveryServiceJavaEndpoint dse = new DiscoveryServiceJavaEndpoint();
+		dse.setClassName(NEW_SERVICE_CLASSNAME);
+		DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties();
+		dsProperties.setId(NEW_SERVICE_ID);
+		dsProperties.setName(NEW_SERVICE_NAME);
+		dsProperties.setDescription(NEW_SERVICE_DESCRIPTION);
+		dsProperties.setLink(null);
+		dsProperties.setPrerequisiteAnnotationTypes(null);
+		dsProperties.setResultingAnnotationTypes(null);
+		dsProperties.setSupportedObjectTypes(null);
+		dsProperties.setAssignedObjectTypes(null);
+		dsProperties.setAssignedObjectCandidates(null);
+		dsProperties.setEndpoint(JSONUtils.convert(dse, DiscoveryServiceEndpoint.class));
+		dsProperties.setParallelismCount(2);
+		registerDiscoveryService(dsProperties);
+
+		DiscoveryServiceJavaEndpoint dse2 = new DiscoveryServiceJavaEndpoint();
+		dse2.setClassName(UPDATED_SERVICE_CLASSNAME);
+		DiscoveryServiceProperties dsProperties2 = new DiscoveryServiceProperties();
+		dsProperties2.setId(NEW_SERVICE_ID);
+		dsProperties2.setName(NEW_SERVICE_NAME);
+		dsProperties2.setDescription(UPDATED_SERVICE_DESCRIPTION);
+		dsProperties2.setLink(null);
+		dsProperties.setPrerequisiteAnnotationTypes(null);
+		dsProperties.setResultingAnnotationTypes(null);
+		dsProperties.setSupportedObjectTypes(null);
+		dsProperties.setAssignedObjectTypes(null);
+		dsProperties.setAssignedObjectCandidates(null);
+		dsProperties2.setEndpoint(JSONUtils.convert(dse2, DiscoveryServiceEndpoint.class));
+		dsProperties2.setParallelismCount(2);
+		replaceDiscoveryService(dsProperties2);
+
+		unregisterDiscoveryService(NEW_SERVICE_ID);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
new file mode 100755
index 0000000..2ea85b7
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import org.junit.Assert;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class TestAsyncDiscoveryService1 extends DiscoveryServiceBase implements AsyncDiscoveryService {
+
+	static int unavailableCounter = 0;
+
+	static Logger logger = ODFTestLogger.get();
+
+	public static void checkUserAndAdditionalProperties(DiscoveryServiceRequest request) {
+		String user = request.getUser();
+		
+		String defaultUser = System.getProperty("user.name");
+		Assert.assertEquals(defaultUser, user);
+
+		Map<String, Object> additionalProperties = request.getAdditionalProperties();
+		logger.info("TestAsyncDiscoveryService1.startAnalysis additional properties: " + additionalProperties);
+		Assert.assertNotNull(additionalProperties);
+		
+		// check that environment entries are also available additional properties
+		Environment ev = new ODFInternalFactory().create(Environment.class);
+		String dsId = request.getDiscoveryServiceId();
+		Map<String, String> serviceEnvProps = ev.getPropertiesWithPrefix(dsId);
+		if (!serviceEnvProps.isEmpty()) {
+			Assert.assertTrue(!additionalProperties.isEmpty());
+			for (Map.Entry<String, String> serviceEnvProp : serviceEnvProps.entrySet()) {
+				String key = serviceEnvProp.getKey();
+				String val = serviceEnvProp.getValue();
+				logger.info("Found discoveryservice configuration parameter: " + key + " with value " + val);
+				Assert.assertTrue(key.startsWith(dsId));
+				Assert.assertTrue(additionalProperties.containsKey(key) );
+				Assert.assertEquals(val, additionalProperties.get(key));
+			}
+		}
+		
+		if (!additionalProperties.isEmpty()) {
+			Assert.assertTrue(additionalProperties.containsKey("aaa"));
+			Assert.assertTrue("bbb".equals(additionalProperties.get("aaa")));
+			Assert.assertTrue(additionalProperties.containsKey("jo"));
+			@SuppressWarnings("unchecked")
+			Map<String, Object> m = (Map<String, Object>) additionalProperties.get("jo");
+			Assert.assertTrue("v1".equals(m.get("p1")));
+			Assert.assertTrue("v2".equals(m.get("p2")));
+			/*
+			if (!additionalProperties.containsKey("aaa")) {
+				response.setCode(ResponseCode.UNKNOWN_ERROR);
+				response.setDetails("Additional property value 'aaa' doesn't exist");
+				return;
+			}
+			if (!"bbb".equals(additionalProperties.get("aaa"))) {
+				response.setCode(ResponseCode.UNKNOWN_ERROR);
+				response.setDetails("Additional properties 'aaa' has wrong value");
+				return;
+			}
+			if (!additionalProperties.containsKey("jo")) {
+				response.setCode(ResponseCode.UNKNOWN_ERROR);
+				response.setDetails("Additional property value 'jo' doesn't exist");
+				return;
+			}
+			Map m = (Map) additionalProperties.get("jo");
+			if (!"v1".equals(m.get("p1"))) {
+				response.setCode(ResponseCode.UNKNOWN_ERROR);
+				response.setDetails("Additional property value 'jo.p1' doesn't exist");
+				return;
+
+			}
+			if (!"v2".equals(m.get("p2"))) {
+				response.setCode(ResponseCode.UNKNOWN_ERROR);
+				response.setDetails("Additional property value 'jo.p2' doesn't exist");
+				return;
+			}
+			*/
+		}
+	}
+	
+	@Override
+	public DiscoveryServiceAsyncStartResponse startAnalysis(DiscoveryServiceRequest request) {
+		try {
+			DiscoveryServiceResponse.ResponseCode code = DiscoveryServiceResponse.ResponseCode.TEMPORARILY_UNAVAILABLE;
+			String details = "Cannot answer right now";
+			if (unavailableCounter % 2 == 0) {
+				code = DiscoveryServiceResponse.ResponseCode.OK;
+				details = "Everything's peachy";
+			}
+			unavailableCounter++;
+			/*
+			if (unavailableCounter % 3 == 0) {
+				code = CODE.NOT_AUTHORIZED;
+				details = "You have no power here!";
+			}
+			*/
+			DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse();
+			response.setCode(code);
+			response.setDetails(details);
+			if (code == DiscoveryServiceResponse.ResponseCode.OK) {
+				String runid = "TestAsyncService1" + UUID.randomUUID().toString();
+				synchronized (lock) {
+					runIDsRunning.put(runid, 4); // return status "running" 4 times before finishing
+				}
+				response.setRunId(runid);
+				String dataSetId = request.getDataSetContainer().getDataSet().getReference().getId();
+				if (dataSetId.startsWith("error")) {
+					logger.info("TestAsync Discovery Service run " + runid + " will fail");
+					runIDsWithError.add(runid);
+				} else {
+					logger.info("TestAsync Discovery Service run " + runid + " will succeed");
+				}
+			}
+			logger.info("TestAsyncDiscoveryService1.startAnalysis returns: " + JSONUtils.lazyJSONSerializer(response));
+			checkUserAndAdditionalProperties(request);
+			/*
+			String user = request.getUser();
+			Assert.assertEquals(TestControlCenter.TEST_USER_ID, user);
+
+			Map<String, Object> additionalProperties = request.getAdditionalProperties();
+			logger.info("TestAsyncDiscoveryService1.startAnalysis additional properties: " + additionalProperties);
+			Assert.assertNotNull(additionalProperties);
+			if (!additionalProperties.isEmpty()) {
+				if (!additionalProperties.containsKey("aaa")) {
+					response.setCode(ResponseCode.UNKNOWN_ERROR);
+					response.setDetails("Additional property value 'aaa' doesn't exist");
+					return response;
+				}
+				if (!"bbb".equals(additionalProperties.get("aaa"))) {
+					response.setCode(ResponseCode.UNKNOWN_ERROR);
+					response.setDetails("Additional properties 'aaa' has wrong value");
+					return response;
+				}
+				if (!additionalProperties.containsKey("jo")) {
+					response.setCode(ResponseCode.UNKNOWN_ERROR);
+					response.setDetails("Additional property value 'jo' doesn't exist");
+					return response;
+				}
+				Map m = (Map) additionalProperties.get("jo");
+				if (!"v1".equals(m.get("p1"))) {
+					response.setCode(ResponseCode.UNKNOWN_ERROR);
+					response.setDetails("Additional property value 'jo.p1' doesn't exist");
+					return response;
+
+				}
+				if (!"v2".equals(m.get("p2"))) {
+					response.setCode(ResponseCode.UNKNOWN_ERROR);
+					response.setDetails("Additional property value 'jo.p2' doesn't exist");
+					return response;
+				}
+			}
+			*/
+			return response;
+		} catch (Throwable t) {
+			DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse();
+			response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+			response.setDetails(Utils.getExceptionAsString(t));
+			return response;
+		}
+	}
+
+	static Object lock = new Object();
+	static Map<String, Integer> runIDsRunning = new HashMap<String, Integer>();
+	static Set<String> runIDsWithError = Collections.synchronizedSet(new HashSet<String>());
+
+	//	static Map<String, Integer> requestIDUnavailable = new HashMap<>();
+
+	@Override
+	public DiscoveryServiceAsyncRunStatus getStatus(String runId) {
+		String details = "Run like the wind";
+		DiscoveryServiceAsyncRunStatus.State state = DiscoveryServiceAsyncRunStatus.State.RUNNING;
+		synchronized (lock) {
+			Integer i = runIDsRunning.get(runId);
+			Assert.assertNotNull(i);
+			if (i.intValue() == 0) {
+				if (runIDsWithError.contains(runId)) {
+					state = DiscoveryServiceAsyncRunStatus.State.ERROR;
+					details = "This was a mistake";
+				} else {
+					state = DiscoveryServiceAsyncRunStatus.State.FINISHED;
+					details = "Finish him!";
+				}
+			} else {
+				runIDsRunning.put(runId, i - 1);
+			}
+		}
+
+		DiscoveryServiceAsyncRunStatus status = new DiscoveryServiceAsyncRunStatus();
+		status.setRunId(runId);
+		status.setDetails(details);
+		status.setState(state);
+		logger.info("TestAsyncDiscoveryService1.getStatus returns: " + JSONUtils.lazyJSONSerializer(status));
+
+		return status;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
new file mode 100755
index 0000000..bd2f1a6
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestAsyncDiscoveryServiceWritingAnnotations1 extends DiscoveryServiceBase implements AsyncDiscoveryService {
+
+	static Logger logger = ODFTestLogger.get();
+
+	static Map<String, MyThread> id2Thread = Collections.synchronizedMap(new HashMap<String, MyThread>());
+
+	class MyThread extends Thread {
+
+		String errorMessage = null;
+		String correlationId;
+		MetaDataObjectReference dataSetRef;
+
+		public MyThread(MetaDataObjectReference dataSetRef, String correlationId) {
+			super();
+			this.dataSetRef = dataSetRef;
+			this.correlationId = correlationId;
+		}
+
+		@Override
+		public void run() {
+			this.errorMessage = TestSyncDiscoveryServiceWritingAnnotations1.createAnnotations(dataSetRef, correlationId, metadataStore, annotationStore);
+		}
+
+	}
+
+	@Override
+	public DiscoveryServiceAsyncStartResponse startAnalysis(DiscoveryServiceRequest request) {
+		DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse();
+		MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference();
+
+		String newRunID = "RunId-" + this.getClass().getSimpleName() + "-" + UUID.randomUUID().toString();
+		MyThread t = new MyThread(dataSetRef, (String) request.getAdditionalProperties().get(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID));
+		t.start();
+		id2Thread.put(newRunID, t);
+		response.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+		response.setRunId(newRunID);
+		response.setDetails("Thread started");
+		logger.info("Analysis writing annotations has started");
+
+		return response;
+	}
+
+	@Override
+	public DiscoveryServiceAsyncRunStatus getStatus(String runId) {
+		DiscoveryServiceAsyncRunStatus status = new DiscoveryServiceAsyncRunStatus();
+
+		MyThread t = id2Thread.get(runId);
+		status.setRunId(runId);
+		if (t == null) {
+			status.setState(DiscoveryServiceAsyncRunStatus.State.NOT_FOUND);
+		} else {
+			java.lang.Thread.State ts = t.getState();
+			if (!ts.equals(java.lang.Thread.State.TERMINATED)) {
+				status.setState(DiscoveryServiceAsyncRunStatus.State.RUNNING);
+			} else {
+				if (t.errorMessage != null) {
+					status.setState(DiscoveryServiceAsyncRunStatus.State.ERROR);
+					status.setDetails(t.errorMessage);
+				} else {
+					status.setState(DiscoveryServiceAsyncRunStatus.State.FINISHED);
+					status.setDetails("All went fine");
+				}
+			}
+		}
+		logger.info("Status of analysis with annotations: " + status.getState() + ", " + status.getDetails());
+		return status;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
new file mode 100755
index 0000000..9ea92f3
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryService1 extends DiscoveryServiceBase implements SyncDiscoveryService {
+	static int unavailableCounter = 0;
+
+	Logger logger = ODFTestLogger.get();
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+		try {
+			DiscoveryServiceResponse.ResponseCode code = DiscoveryServiceResponse.ResponseCode.TEMPORARILY_UNAVAILABLE;
+			String details = "Cannot answer right now synchronously";
+			if (unavailableCounter % 2 == 0) {
+				code = DiscoveryServiceResponse.ResponseCode.OK;
+				details = "Everything's peachy and synchronous";
+			}
+			unavailableCounter++;
+			DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+			response.setDetails(details);
+			response.setCode(code);
+			if (code == DiscoveryServiceResponse.ResponseCode.OK) {
+				String dataSetId = request.getDataSetContainer().getDataSet().getReference().getId();
+				if (dataSetId.startsWith("error")) {
+					response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+					response.setDetails("Something went synchronously wrong!");
+				} else {
+					response.setDetails("All is synchronously fine!");
+				}
+				TestAsyncDiscoveryService1.checkUserAndAdditionalProperties(request);
+			}
+			logger.info(this.getClass().getSimpleName() + " service returned with code: " + response.getCode());
+			return response;
+		} catch (Throwable t) {
+			t.printStackTrace();
+			return null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
new file mode 100755
index 0000000..62c7bf6
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+
+import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.MetaDataCache;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+
+public class TestSyncDiscoveryServiceWritingAnnotations1 extends DiscoveryServiceBase implements SyncDiscoveryService {
+
+	static Logger logger = Logger.getLogger(TestSyncDiscoveryServiceWritingAnnotations1.class.getName());
+
+	public static String checkMetaDataCache(DiscoveryServiceRequest request) {
+		logger.info("Checking metadata cache");
+		MetaDataObject mdo = request.getDataSetContainer().getDataSet();
+		MetaDataCache cache = request.getDataSetContainer().getMetaDataCache();
+		if (cache == null) {
+			return null;
+		}
+		CachedMetadataStore cacheReader = new CachedMetadataStore(cache);
+
+		if (mdo instanceof RelationalDataSet) {
+			logger.info("Checking metadata cache for columns...");
+			RelationalDataSet rds = (RelationalDataSet) mdo;
+			Set<MetaDataObjectReference> cachedColumns = new HashSet<>();
+			Set<MetaDataObjectReference> actualColumns = new HashSet<>();
+			for (MetaDataObject col : cacheReader.getColumns(rds)) {
+				cachedColumns.add(col.getReference());
+			}
+			MetadataStore mds = new ODFFactory().create().getMetadataStore();
+			for (MetaDataObject col : mds.getColumns(rds)) {
+				actualColumns.add(col.getReference());
+			}
+			Assert.assertTrue("Columns missing from metadata cache.", cachedColumns.containsAll(actualColumns));
+			Assert.assertTrue("Too many columns in metadata cache.", actualColumns.containsAll(cachedColumns));
+		}
+		return null;
+	}
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+		logger.info("Analysis started on sync test service with annotations ");
+		String errorMessage = createAnnotations( //
+				request.getDataSetContainer().getDataSet().getReference(), //
+				(String) request.getAdditionalProperties().get(REQUEST_PROPERTY_CORRELATION_ID), //
+				metadataStore, //
+				annotationStore);
+		if (errorMessage == null) {
+			errorMessage = checkMetaDataCache(request);
+		}
+		DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse();
+		if (errorMessage == null) {
+			resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+			resp.setDetails("Annotations created successfully");
+		} else {
+			resp.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+			resp.setDetails(errorMessage);
+		}
+		logger.info("Analysis finished on sync test service with annotations ");
+
+		return resp;
+	}
+
+	public static final String REQUEST_PROPERTY_CORRELATION_ID = "REQUEST_PROPERTY_CORRELATION_ID";
+
+	static final String ANNOTATION_TYPE = "AnnotationType-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+	static final String JSON_ATTRIBUTE = "Attribute-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+	static final String JSON_VALUE = "Value-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+
+	public static int getNumberOfAnnotations() {
+		return 3;
+	}
+
+	public static String[] getPropsOfNthAnnotation(int i) {
+		return new String[] { ANNOTATION_TYPE + i, JSON_ATTRIBUTE + i, JSON_VALUE + i };
+	}
+
+	public static String createAnnotations(MetaDataObjectReference dataSetRef, String correlationId, MetadataStore mds, AnnotationStore as) {
+		try {
+			TestSyncDiscoveryServiceWritingAnnotations1.logger.info("Analysis will run on data set ref: " + dataSetRef);
+			MetaDataObject dataSet = mds.retrieve(dataSetRef);
+
+			String errorMessage = null;
+			if (dataSet == null) {
+				errorMessage = "Data set with id " + dataSetRef + " could not be retrieved";
+				TestSyncDiscoveryServiceWritingAnnotations1.logger.severe(errorMessage);
+				return errorMessage;
+			}
+
+			if (!(dataSet instanceof DataSet)) {
+				errorMessage = "Object with id " + dataSetRef + " is not a data set";
+				TestSyncDiscoveryServiceWritingAnnotations1.logger.severe(errorMessage);
+				return errorMessage;
+			}
+
+			// add some annotations
+			for (int i = 0; i < getNumberOfAnnotations(); i++) {
+				String[] annotValues = getPropsOfNthAnnotation(i);
+				ProfilingAnnotation annotation1 = new ProfilingAnnotation();
+				annotation1.setProfiledObject(dataSetRef);
+				annotation1.setAnnotationType(annotValues[0]);
+				JSONObject jo1 = new JSONObject();
+				jo1.put(annotValues[1], annotValues[2]);
+				jo1.put(REQUEST_PROPERTY_CORRELATION_ID, correlationId);
+				annotation1.setJsonProperties(jo1.write());
+
+// PG: dynamic type creation disabled (types are already created statically)
+//				mds.createAnnotationTypesFromPrototypes(Collections.singletonList(annotation1));
+				MetaDataObjectReference resultRef1 = as.store(annotation1);
+				if (resultRef1 == null) {
+					throw new RuntimeException("Annotation object " + i + " could not be created");
+				}
+			}
+
+			TestSyncDiscoveryServiceWritingAnnotations1.logger.info("Discovery service " + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName() + "created annotations successfully");
+		} catch (Throwable exc) {
+			exc.printStackTrace();
+			TestSyncDiscoveryServiceWritingAnnotations1.logger.log(Level.WARNING, TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName() + " has failed", exc);
+			return "Failed: " + Utils.getExceptionAsString(exc);
+		}
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
new file mode 100755
index 0000000..2e6d012
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.engine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.engine.ODFVersion;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+
+public class ODFVersionTest extends TimerTestBase {
+	@Test
+	public void testVersion() {
+		ODFVersion version = new ODFFactory().create().getEngineManager().getVersion();
+		Assert.assertNotNull(version);
+		Assert.assertTrue(version.getVersion().startsWith("1.2.0-"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
new file mode 100755
index 0000000..465eb5c
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.engine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.ODFEngineOptions;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ShutdownTest extends ODFTestBase {
+
+	private void runAndTestThreads() throws Exception {
+		ODFAPITest.runRequestAndCheckResult("successID", State.FINISHED, -1);
+		ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class);
+		int numThreads = tm.getNumberOfRunningThreads();
+		log.info("--- Number of running threads: " + numThreads);
+		Assert.assertTrue(numThreads >= 3);		
+	}
+
+	@Test
+	public void testShutdown() throws Exception {
+
+		log.info("--- Running some request before shutdown...");
+		runAndTestThreads();
+
+		ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class);
+		log.info("--- Number of threads before shutdown: " + tm.getNumberOfRunningThreads());
+
+		EngineManager engineManager = new ODFFactory().create().getEngineManager();
+		ODFEngineOptions options = new ODFEngineOptions();
+		options.setRestart(false);
+		int numThreads = tm.getNumberOfRunningThreads();
+		log.info("--- Number of threads before restart: " + numThreads);
+
+		engineManager.shutdown(options);
+		log.info("--- Shutdown requested...");
+		int maxWait = 60;
+		int waitCnt = 0;
+		log.info("--- Shutdown requested, waiting for max " + maxWait + " seconds");
+		while (tm.getNumberOfRunningThreads() > 0 && waitCnt < maxWait) {
+			waitCnt++;
+			Thread.sleep(1000);
+		}
+		log.info("--- Shutdown should be done by now, waited for " + waitCnt + " threads: " + tm.getNumberOfRunningThreads());
+		Assert.assertNotEquals(waitCnt, maxWait);
+
+	//	log.info("--- Starting ODF again....");
+
+	//	ODFInitializer.start();
+		log.info("--- Rerunning request after shutdown...");
+		runAndTestThreads();
+
+		int nrOfThreads = tm.getNumberOfRunningThreads();
+		options.setRestart(true);
+		engineManager.shutdown(options);
+		maxWait = nrOfThreads * 2;
+		waitCnt = 0;
+		log.info("--- Restart requested..., wait for a maximum of " + (nrOfThreads * 2500) + " ms");
+		while (tm.getNumberOfRunningThreads() > 0 && waitCnt < maxWait) {
+			waitCnt++;
+			Thread.sleep(1000);
+		}
+		log.info("--- Restart should be done by now");
+		Thread.sleep(5000);
+		numThreads = tm.getNumberOfRunningThreads();
+		log.info("--- Number of threads after restart: " + numThreads);
+		Assert.assertTrue(numThreads > 2);
+		log.info("--- testShutdown finished");
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
new file mode 100755
index 0000000..c2be180
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.ConfigChangeQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.controlcenter.DiscoveryServiceStarter;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class MockQueueManager implements DiscoveryServiceQueueManager {
+
+	static Logger logger = Logger.getLogger(MockQueueManager.class.getName());
+
+	static Object lock = new Object();
+
+	static List<AdminMessage> adminQueue = Collections.synchronizedList(new ArrayList<AdminMessage>());
+	static List<StatusQueueEntry> statusQueue = Collections.synchronizedList(new ArrayList<StatusQueueEntry>());
+	static Map<String, List<AnalysisRequestTracker>> runtimeQueues = new HashMap<>();
+
+	ThreadManager threadManager;
+
+	public MockQueueManager() {
+		ODFInternalFactory factory = new ODFInternalFactory();
+		ExecutorServiceFactory esf = factory.create(ExecutorServiceFactory.class);
+		threadManager = factory.create(ThreadManager.class);
+		threadManager.setExecutorService(esf.createExecutorService());
+		//initialize();
+	}
+
+	@Override
+	public void start() throws TimeoutException {
+		logger.info("Initializing MockQueueManager");
+		List<ThreadStartupResult> threads = new ArrayList<ThreadStartupResult>();
+		ThreadStartupResult startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKADMIN", createQueueListener("Admin", adminQueue, new AdminQueueProcessor(), false));
+		boolean threadCreated = startUnmanagedThread.isNewThreadCreated();
+		threads.add(startUnmanagedThread);
+		startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKADMINCONFIGCHANGE",
+				createQueueListener("AdminConfig", adminQueue, new ConfigChangeQueueProcessor(), false));
+		threadCreated |= startUnmanagedThread.isNewThreadCreated();
+		threads.add(startUnmanagedThread);
+		startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKSTATUSSTORE",
+				createQueueListener("StatusStore", statusQueue, new DefaultStatusQueueStore.StatusQueueProcessor(), true));
+		threadCreated |= startUnmanagedThread
+				.isNewThreadCreated();
+		threads.add(startUnmanagedThread);
+
+		logger.info("New thread created: " + threadCreated);
+		if (threadCreated) {
+			try {
+				this.threadManager.waitForThreadsToBeReady(5000, threads);
+				logger.info("All threads ready");
+			} catch (TimeoutException e) {
+				final String message = "Not all thrads were created on time";
+				logger.warning(message);
+			}
+		}
+	}
+
+	@Override
+	public void stop() {
+		threadManager.shutdownThreads(Arrays.asList("MOCKADMIN", "MOCKADMINCONFIGCHANGE", "MOCKSTATUSSTORE"));
+	}
+
+	<T> T cloneObject(T obj) {
+		try {
+			return JSONUtils.cloneJSONObject(obj);
+		} catch (JSONException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void enqueue(AnalysisRequestTracker tracker) {
+		tracker = cloneObject(tracker);
+		DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+		if (dsRequest == null) {
+			throw new RuntimeException("Tracker is finished, should not be enqueued");
+		}
+		String dsID = dsRequest.getDiscoveryServiceId();
+		dsRequest.setPutOnRequestQueue(System.currentTimeMillis());
+		synchronized (lock) {
+			ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(dsID);
+			if (runtime == null) {
+				throw new RuntimeException(MessageFormat.format("Runtime of discovery service ''{0}'' does not exist", dsID));
+			}
+			String runtimeName = runtime.getName();
+			List<AnalysisRequestTracker> mq = runtimeQueues.get(runtimeName);
+			if (mq == null) {
+				mq = Collections.synchronizedList(new ArrayList<AnalysisRequestTracker>());
+				runtimeQueues.put(runtimeName, mq);
+			}
+			boolean started = this.threadManager.startUnmanagedThread("MOCK" + runtimeName, createQueueListener("Starter" + runtimeName, mq, new DiscoveryServiceStarter(), false))
+					.isNewThreadCreated();
+			logger.info("New thread created for runtime " + runtimeName + ", started: " + started + ", current queue length: " + mq.size());
+			mq.add(tracker);
+		}
+	}
+
+	static class MockQueueListener implements ODFRunnable {
+		String name; 
+		QueueMessageProcessor processor;
+		List<?> queue;
+		boolean cancelled = false;
+		ExecutorService service;
+		int index = 0;
+
+		public MockQueueListener(String name, List<?> q, QueueMessageProcessor qmp, boolean fromBeginning) {
+			this.name = name;
+			this.processor = qmp;
+			this.queue = q;
+			if (fromBeginning) {
+				index = 0;
+			} else {
+				index = q.size();
+			}
+		}
+
+		long WAITTIMEMS = 100;
+
+		boolean isValidIndex() {
+			return index >= 0 && index < queue.size();
+		}
+
+		@Override
+		public void run() {
+			logger.info("MockQueueManager thread " + name + " started");
+
+			while (!cancelled) {
+			//	logger.info("Queue consumer " + name + ": checking index " + index + " on queue of size " + queue.size());
+				if (!isValidIndex()) {
+					try {
+						Thread.sleep(WAITTIMEMS);
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+				} else {
+					Object obj = queue.get(index);
+					String msg;
+					try {
+						msg = JSONUtils.toJSON(obj);
+					} catch (JSONException e) {
+						e.printStackTrace();
+						cancelled = true;
+						return;
+					}
+					this.processor.process(service, msg, 0, index);
+					logger.finest("MockQConsumer " + name + ": Processed message: " + msg);
+					index++;
+				}
+			}
+			logger.info("MockQueueManager thread finished");
+
+		}
+
+
+		@Override
+		public void setExecutorService(ExecutorService service) {
+			this.service = service;
+		}
+
+		@Override
+		public void cancel() {
+			cancelled = true;
+		}
+
+		@Override
+		public boolean isReady() {
+			return true;
+		}
+
+	}
+
+	ODFRunnable createQueueListener(String name, List<?> queue, QueueMessageProcessor qmp, boolean fromBeginning) {
+		return new MockQueueListener(name, queue, qmp, fromBeginning);
+	}
+
+	@Override
+	public void enqueueInStatusQueue(StatusQueueEntry sqe) {
+		sqe = cloneObject(sqe);
+		statusQueue.add(sqe);
+	}
+
+	@Override
+	public void enqueueInAdminQueue(AdminMessage message) {
+		message = cloneObject(message);
+		adminQueue.add(message);
+	}
+
+	public static class MockMessagingStatus extends MessagingStatus {
+		String message;
+
+		public String getMessage() {
+			return message;
+		}
+
+		public void setMessage(String message) {
+			this.message = message;
+		}
+
+	}
+
+	@Override
+	public MessagingStatus getMessagingStatus() {
+		MockMessagingStatus mms = new MockMessagingStatus();
+		mms.setMessage("OK");
+		return mms;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
new file mode 100755
index 0000000..f69513c
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.notification;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class NotificationManagerTest extends ODFTestBase {
+
+	@Test
+	public void testNotifications() throws Exception {
+		NotificationManager nm = new ODFInternalFactory().create(NotificationManager.class);
+		Assert.assertNotNull(nm);
+		log.info("Notification manager found " + nm.getClass().getName());
+		Assert.assertTrue(nm instanceof TestNotificationManager);
+		List<NotificationListener> listeners = nm.getListeners();
+		Assert.assertTrue(listeners.size() > 0);
+
+		OpenDiscoveryFramework odf = new ODFFactory().create();
+		List<String> dataSetIDs = Collections.singletonList("successID");
+		String id = ODFAPITest.runRequest(dataSetIDs, odf.getAnalysisManager());
+		ODFAPITest.waitForRequest(id, odf.getAnalysisManager());
+
+		int polls = 20;
+		boolean found = false;
+		boolean foundFinished = false;
+		do {
+			// now check that trackers were found through the notification mechanism
+			log.info("Checking that trackers were consumed, " + polls + " seconds left");
+			List<AnalysisRequestTracker> trackers = new ArrayList<>(TestNotificationManager.receivedTrackers);
+			log.info("Received trackers: " + trackers.size());
+			for (AnalysisRequestTracker tracker : trackers) {
+				String foundId = tracker.getRequest().getId();
+				if (foundId.equals(id)) {
+					found = true;
+					if (tracker.getStatus().equals(STATUS.FINISHED)) {
+						foundFinished = true;
+					}
+				}
+			}
+			polls--;
+			Thread.sleep(1000);
+		} while (!found && !foundFinished && polls > 0);
+		Assert.assertTrue(found);
+		Assert.assertTrue(foundFinished);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
new file mode 100755
index 0000000..80252d6
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.notification;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+public class TestNotificationManager implements NotificationManager {
+
+	public static class TestListener1 implements NotificationListener {
+
+		@Override
+		public String getTopicName() {
+			return "odf-status-topic";
+		}
+
+		@Override
+		public void onEvent(String event, OpenDiscoveryFramework odf) {
+			try {
+				StatusQueueEntry sqe = JSONUtils.fromJSON(event, StatusQueueEntry.class);
+				AnalysisRequestTracker tracker = sqe.getAnalysisRequestTracker();
+				if (tracker != null) {
+					receivedTrackers.add(tracker);					
+				}
+			} catch (JSONException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public String getName() {
+			return this.getClass().getName();
+		}
+
+	}
+
+	public static List<AnalysisRequestTracker> receivedTrackers = Collections.synchronizedList(new ArrayList<AnalysisRequestTracker>());
+
+	@Override
+	public List<NotificationListener> getListeners() {
+		List<NotificationListener> result = new ArrayList<>();
+		result.add(new TestListener1());
+		return result;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
new file mode 100755
index 0000000..8a8d9a8
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.runtime;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class RuntimeExtensionTest extends ODFTestBase {
+
+	static final String SERVICE_ON_TEST_RUNTIME = "testruntimeservice";
+
+	List<String> getNames(List<ServiceRuntime> rts) {
+		List<String> result = new ArrayList<>();
+		for (ServiceRuntime rt : rts) {
+			result.add(rt.getName());
+		}
+		return result;
+	}
+
+	@Test
+	public void testActiveRuntimes() {
+		List<String> allNames = getNames(ServiceRuntimes.getAllRuntimes());
+		Assert.assertTrue(allNames.contains(TestServiceRuntime.TESTSERVICERUNTIME_NAME));
+
+		List<String> activeNames = getNames(ServiceRuntimes.getActiveRuntimes());
+		Assert.assertTrue(activeNames.contains(TestServiceRuntime.TESTSERVICERUNTIME_NAME));
+	}
+
+	@Test
+	public void testRuntimeForNewService() {
+		ServiceRuntime rt = ServiceRuntimes.getRuntimeForDiscoveryService(SERVICE_ON_TEST_RUNTIME);
+		Assert.assertNotNull(rt);
+		Assert.assertEquals(TestServiceRuntime.TESTSERVICERUNTIME_NAME, rt.getName());
+	}
+
+	static Object lock = new Object();
+
+	@Test
+	public void testRuntimeExtensionSimple() throws Exception {
+		synchronized (lock) {
+			OpenDiscoveryFramework odf = new ODFFactory().create();
+			TestServiceRuntime.runtimeBlocked = false;
+			AnalysisRequest request = ODFAPITest.createAnalysisRequest(Collections.singletonList(ODFAPITest.DUMMY_SUCCESS_ID));
+			request.setDiscoveryServiceSequence(Collections.singletonList(SERVICE_ON_TEST_RUNTIME));
+			log.info("Starting service for test runtime");
+			AnalysisResponse resp = odf.getAnalysisManager().runAnalysis(request);
+			String requestId = resp.getId();
+			Assert.assertTrue(ODFAPITest.waitForRequest(requestId, odf.getAnalysisManager(), 40, State.FINISHED));
+			Assert.assertTrue(TestServiceRuntime.requests.contains(requestId));
+			log.info("testRuntimeExtensionSimple finished");
+
+			// block runtime again to restore state before testcase
+			TestServiceRuntime.runtimeBlocked = true;
+			Thread.sleep(5000);
+		}
+	}
+
+	@Test
+	public void testBlockedRuntimeExtension() throws Exception {
+		synchronized (lock) {
+			OpenDiscoveryFramework odf = new ODFFactory().create();
+			TestServiceRuntime.runtimeBlocked = true;
+			AnalysisRequest request = ODFAPITest.createAnalysisRequest(Collections.singletonList(ODFAPITest.DUMMY_SUCCESS_ID));
+			request.setDiscoveryServiceSequence(Collections.singletonList(SERVICE_ON_TEST_RUNTIME));
+			log.info("Starting service for test runtime");
+			AnalysisResponse resp = odf.getAnalysisManager().runAnalysis(request);
+			String requestId = resp.getId();
+			Assert.assertFalse(resp.isInvalidRequest());
+			log.info("Checking that service is not called");
+			for (int i = 0; i < 5; i++) {
+				Assert.assertFalse(TestServiceRuntime.requests.contains(requestId));
+				Thread.sleep(1000);
+			}
+			log.info("Unblocking runtime...");
+			TestServiceRuntime.runtimeBlocked = false;
+			Thread.sleep(5000); // give service time to start consumption
+			log.info("Checking that request has finished");
+			Assert.assertTrue(ODFAPITest.waitForRequest(requestId, odf.getAnalysisManager(), 40, State.FINISHED));
+			log.info("Checking that service was called");
+			Assert.assertTrue(TestServiceRuntime.requests.contains(requestId));
+			log.info("testBlockedRuntimeExtension finished");
+			
+			// block runtime again to restore state before testcase
+			TestServiceRuntime.runtimeBlocked = true;
+			Thread.sleep(5000);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
new file mode 100755
index 0000000..d16e10a
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.runtime;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestServiceRuntime implements ServiceRuntime {
+
+	static Logger logger = ODFTestLogger.get();
+
+	public static final String TESTSERVICERUNTIME_NAME = "TestServiceRuntime";
+	
+	public static boolean runtimeBlocked = true;
+
+	@Override
+	public String getName() {
+		return TESTSERVICERUNTIME_NAME;
+	}
+
+	@Override
+	public long getWaitTimeUntilAvailable() {
+		if (runtimeBlocked) {
+			return 1000;
+		}
+		return 0;
+	}
+
+	public static Set<String> requests = new HashSet<>();
+
+	public static class DSProxy extends SyncDiscoveryServiceBase {
+
+		@Override
+		public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+			logger.info("Running test runtime service");
+			requests.add(request.getOdfRequestId());
+			DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse();
+			resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+			resp.setDetails("Test success");
+			return resp;
+		}
+	}
+
+	@Override
+	public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+		return new DSProxy();
+	}
+
+	@Override
+	public String getDescription() {
+		return "TestServiceRuntime description";
+	}
+
+	@Override
+	public void validate(DiscoveryServiceProperties props) throws ValidationException {
+	}
+
+}



Mime
View raw message