atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [11/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
Date Wed, 28 Jun 2017 05:57:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
new file mode 100755
index 0000000..68740e4
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.wink.json4j.JSONException;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.Stopwatch;
+import org.junit.runner.Description;
+
+import com.google.common.io.Files;
+
+public class TimerTestBase {
+	static final String logFilePath = "/tmp/odf-test-execution-log.csv";
+	static Map<String, HashMap<String, Long>> testTimeMap = new HashMap<String, HashMap<String, Long>>();
+	final static Logger logger = ODFTestLogger.get();
+
+	@Rule
+	public Stopwatch timeWatcher = new Stopwatch() {
+		@Override
+		protected void finished(long nanos, Description description) {
+			HashMap<String, Long> testMap = testTimeMap.get(description.getClassName());
+			if (testMap == null) {
+				testMap = new HashMap<String, Long>();
+				testTimeMap.put(description.getClassName(), testMap);
+			}
+			testMap.put(description.getMethodName(), (nanos / 1000 / 1000));
+		}
+	};
+
+	@AfterClass
+	public static void tearDownAndLogTimes() throws JSONException {
+		try {
+			File logFile = new File(logFilePath);
+			Set<String> uniqueRows = new HashSet<String>();
+			if (logFile.exists()) {
+				uniqueRows = new HashSet<String>(Files.readLines(logFile, StandardCharsets.UTF_8));
+			}
+
+			for (Entry<String, HashMap<String, Long>> entry : testTimeMap.entrySet()) {
+				for (Entry<String, Long> testEntry : entry.getValue().entrySet()) {
+					String logRow = new StringBuilder().append(testEntry.getKey()).append(",").append(testEntry.getValue()).append(",").append(entry.getKey()).append(",")
+							.append(System.getProperty("odf.build.project.name", "ProjectNameNotDefined")).toString();
+					uniqueRows.add(logRow);
+				}
+			}
+
+			StringBuilder logContent = new StringBuilder();
+			Iterator<String> rowIterator = uniqueRows.iterator();
+			while (rowIterator.hasNext()) {
+				logContent.append(rowIterator.next());
+				if (rowIterator.hasNext()) {
+					logContent.append("\n");
+				}
+			}
+
+			logger.info("Total time consumed by succeeded tests:\n" + logContent.toString());
+			logFile.createNewFile();
+			Files.write(logContent.toString().getBytes("UTF-8"), logFile);
+		} catch (IOException e) {
+			logger.warning("Error writing test execution log");
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
new file mode 100755
index 0000000..7a1f0ed
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import org.apache.atlas.odf.json.AnnotationDeserializer;
+import org.apache.atlas.odf.json.AnnotationSerializer;
+
+public class AnnotationExtensionTest extends TimerTestBase {
+
+	static Logger logger = ODFTestLogger.get();
+
+	public static <T> T readJSONObjectFromFileInClasspath(ObjectMapper om, Class<T> cl, String pathToFile, ClassLoader classLoader) {
+		if (classLoader == null) {
+			// use current classloader if not provided
+			classLoader = AnnotationExtensionTest.class.getClassLoader();
+		}
+		InputStream is = classLoader.getResourceAsStream(pathToFile);
+		T result = null;
+		try {
+			result = om.readValue(is, cl);
+		} catch (IOException e) {
+			// assume that this is a severe error since the provided JSONs should be correct
+			throw new RuntimeException(e);
+		}
+
+		return result;
+	}
+
+	@Test
+	public void testWithUtils() throws Exception {
+		testSimple(JSONUtils.getGlobalObjectMapper());
+	}
+
+	@Test
+	public void testWithSeparateObjectMapper() throws Exception {
+		ObjectMapper om = new ObjectMapper();
+		SimpleModule mod = new SimpleModule("annotation module", Version.unknownVersion());
+		mod.addDeserializer(Annotation.class, new AnnotationDeserializer());
+		mod.addSerializer(Annotation.class, new AnnotationSerializer());
+		om.registerModule(mod);
+		testSimple(om);
+	}
+
+	private void testSimple(ObjectMapper om) throws Exception {
+		ExtensionTestAnnotation newTestAnnot = new ExtensionTestAnnotation();
+		String strValue = "newstring1";
+		int intValue = 4237;
+		newTestAnnot.setNewStringProp1(strValue);
+		newTestAnnot.setNewIntProp2(intValue);
+//		String newTestAnnotJSON = om.writeValueAsString(newTestAnnot);
+		String newTestAnnotJSON = JSONUtils.toJSON(newTestAnnot).toString();
+		logger.info("New test annot JSON: " + newTestAnnotJSON);
+
+		logger.info("Deserializing with " + Annotation.class.getSimpleName() + "class as target class");
+		Annotation annot1 = om.readValue(newTestAnnotJSON, Annotation.class);
+		Assert.assertNotNull(annot1);
+		logger.info("Deserialized annotation JSON (target: " + Annotation.class.getSimpleName() + "): " + om.writeValueAsString(annot1));
+		logger.info("Deserialized annotation class (target: " + Annotation.class.getSimpleName() + "): " + annot1.getClass().getName());
+		Assert.assertEquals(ExtensionTestAnnotation.class, annot1.getClass());
+		ExtensionTestAnnotation extAnnot1 = (ExtensionTestAnnotation) annot1;
+		Assert.assertEquals(strValue, extAnnot1.getNewStringProp1());
+		Assert.assertEquals(intValue, extAnnot1.getNewIntProp2());
+
+		/* This does not make sense as you would never enter ExtensionTestAnnotation.class as deserialization target
+		 * which would enforce usage of the standard Bean serializer (since no serializer is registered for this specific class -> jsonProperties can not be mapped
+		logger.info("Calling deserialization with " + ExtensionTestAnnotation.class.getSimpleName() + " as target");
+		ExtensionTestAnnotation annot2 = om.readValue(newTestAnnotJSON, ExtensionTestAnnotation.class);
+		Assert.assertNotNull(annot2);
+		logger.info("Deserialized annotation JSON (target: " + ExtensionTestAnnotation.class.getSimpleName() + "): " + om.writeValueAsString(annot2));
+		logger.info("Deserialized annotation class (target: " + ExtensionTestAnnotation.class.getSimpleName() + "): " + annot2.getClass().getName());
+		Assert.assertEquals(ExtensionTestAnnotation.class, annot2.getClass());
+		String s = annot2.getNewStringProp1();
+		Assert.assertEquals(strValue, annot2.getNewStringProp1());
+		Assert.assertEquals(intValue, annot2.getNewIntProp2()); */
+
+		logger.info("Processing profiling annotation...");
+		Annotation unknownAnnot = readJSONObjectFromFileInClasspath(om, Annotation.class, "org/apache/atlas/odf/core/test/annotation/annotexttest1.json", null);
+		Assert.assertNotNull(unknownAnnot);
+		logger.info("Read Unknown annotation: " + unknownAnnot.getClass().getName());
+		Assert.assertEquals(ProfilingAnnotation.class, unknownAnnot.getClass());
+
+		logger.info("Read profiling annotation: " + om.writeValueAsString(unknownAnnot));
+		JSONObject jsonPropertiesObj = new JSONObject(unknownAnnot.getJsonProperties());
+		Assert.assertEquals("newProp1Value", jsonPropertiesObj.get("newProp1"));
+		Assert.assertEquals((Integer) 4237, jsonPropertiesObj.get("newProp2"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
new file mode 100755
index 0000000..b65ce17
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class AnnotationStoreTest extends ODFTestcase {
+
+	private AnnotationStore createAnnotationStore() {
+		return new DefaultStatusQueueStore();
+	}
+	
+	@Test
+	public void testStoreProfilingAnnotation() throws Exception {
+		AnnotationStore as = createAnnotationStore();
+		
+		String modRef1Id = UUID.randomUUID().toString();
+		MetaDataObjectReference mdoref1 = new MetaDataObjectReference();
+		mdoref1.setId(modRef1Id);
+		
+		ProfilingAnnotation annot1 = new ProfilingAnnotation();
+		annot1.setJsonProperties("{\"a\": \"b\"}");
+		annot1.setAnnotationType("AnnotType1");
+		annot1.setProfiledObject(mdoref1);
+
+		MetaDataObjectReference annot1Ref = as.store(annot1);
+		Assert.assertNotNull(annot1Ref.getId());
+		List<Annotation> retrievedAnnots = as.getAnnotations(mdoref1, null);
+		Assert.assertEquals(1, retrievedAnnots.size());
+		
+		Annotation retrievedAnnot = retrievedAnnots.get(0);
+		Assert.assertTrue(annot1 != retrievedAnnot);
+		Assert.assertTrue(retrievedAnnot instanceof ProfilingAnnotation);
+		ProfilingAnnotation retrievedProfilingAnnotation = (ProfilingAnnotation) retrievedAnnot;
+		Assert.assertEquals(modRef1Id, retrievedProfilingAnnotation.getProfiledObject().getId());
+		Assert.assertEquals(annot1Ref, retrievedAnnot.getReference());
+		
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
new file mode 100755
index 0000000..cd8f695
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+
+class ExtensionTestAnnotation extends ProfilingAnnotation {
+
+	private String newStringProp1;
+	private int newIntProp2;
+
+	public String getNewStringProp1() {
+		return newStringProp1;
+	}
+
+	public void setNewStringProp1(String newStringProp1) {
+		this.newStringProp1 = newStringProp1;
+	}
+
+	public int getNewIntProp2() {
+		return newIntProp2;
+	}
+
+	public void setNewIntProp2(int newIntProp2) {
+		this.newIntProp2 = newIntProp2;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
new file mode 100755
index 0000000..f65e3ad
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryServiceWritingExtendedAnnotations extends DiscoveryServiceBase implements SyncDiscoveryService {
+	Logger logger = ODFTestLogger.get();
+
+	public static class SyncDiscoveryServiceAnnotation extends ProfilingAnnotation {
+		private String prop1 = "";
+		private int prop2 = 4237;
+		private MyObject prop3 = new MyObject();
+
+		public String getProp1() {
+			return prop1;
+		}
+
+		public void setProp1(String prop1) {
+			this.prop1 = prop1;
+		}
+
+		public int getProp2() {
+			return prop2;
+		}
+
+		public void setProp2(int prop2) {
+			this.prop2 = prop2;
+		}
+
+		public MyObject getProp3() {
+			return prop3;
+		}
+
+		public void setProp3(MyObject prop3) {
+			this.prop3 = prop3;
+		}
+
+	}
+
+	public static class MyObject {
+		private String anotherProp = "";
+
+		public String getAnotherProp() {
+			return anotherProp;
+		}
+
+		public void setAnotherProp(String anotherProp) {
+			this.anotherProp = anotherProp;
+		}
+
+		private MyOtherObject yetAnotherProp = new MyOtherObject();
+
+		public MyOtherObject getYetAnotherProp() {
+			return yetAnotherProp;
+		}
+
+		public void setYetAnotherProp(MyOtherObject yetAnotherProp) {
+			this.yetAnotherProp = yetAnotherProp;
+		}
+
+	}
+
+	public static class MyOtherObject {
+		private String myOtherObjectProperty = "";
+
+		public String getMyOtherObjectProperty() {
+			return myOtherObjectProperty;
+		}
+
+		public void setMyOtherObjectProperty(String myOtherObjectProperty) {
+			this.myOtherObjectProperty = myOtherObjectProperty;
+		}
+
+	}
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+		try {
+			MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference();
+
+			List<Annotation> annotations = new ArrayList<>();
+			SyncDiscoveryServiceAnnotation annotation1 = new SyncDiscoveryServiceAnnotation();
+			String annotation1_prop1 = "prop1_1_" + dataSetRef.getUrl();
+			annotation1.setProp1(annotation1_prop1);
+			annotation1.setProp2(annotation1_prop1.hashCode());
+			annotation1.setProfiledObject(dataSetRef);
+			MyObject mo1 = new MyObject();
+			MyOtherObject moo1 = new MyOtherObject();
+			moo1.setMyOtherObjectProperty("nestedtwolevels" + annotation1_prop1);
+			mo1.setYetAnotherProp(moo1);
+			mo1.setAnotherProp("nested" + annotation1_prop1);
+			annotation1.setProp3(mo1);
+			annotations.add(annotation1);
+
+			SyncDiscoveryServiceAnnotation annotation2 = new SyncDiscoveryServiceAnnotation();
+			String annotation2_prop1 = "prop1_2_" + dataSetRef.getUrl();
+			annotation2.setProp1(annotation2_prop1);
+			annotation2.setProp2(annotation2_prop1.hashCode());
+			annotation2.setProfiledObject(dataSetRef);
+			MyObject mo2 = new MyObject();
+			MyOtherObject moo2 = new MyOtherObject();
+			moo2.setMyOtherObjectProperty("nestedtwolevels" + annotation2_prop1);
+			mo2.setYetAnotherProp(moo2);
+			mo2.setAnotherProp("nested" + annotation2_prop1);
+			annotation2.setProp3(mo2);
+			annotations.add(annotation2);
+
+			DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse();
+			resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+			DiscoveryServiceResult dsResult = new DiscoveryServiceResult();
+			dsResult.setAnnotations(annotations);
+			resp.setResult(dsResult);
+			resp.setDetails(this.getClass().getName() + ".runAnalysis finished OK");
+
+			logger.info("Returning from discovery service " + this.getClass().getSimpleName() + " with result: " + JSONUtils.toJSON(resp));
+			return resp;
+		} catch (Exception exc) {
+			throw new RuntimeException(exc);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
new file mode 100755
index 0000000..91b544c
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryServiceWritingJsonAnnotations extends DiscoveryServiceBase implements SyncDiscoveryService {
+	Logger logger = ODFTestLogger.get();
+	private String annotationResult = Utils.getInputStreamAsString(this.getClass().getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/integrationtest/metadata/internal/atlas/nested_annotation_example.json"), "UTF-8");
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+		try {
+			MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference();
+
+			List<Annotation> annotations = new ArrayList<>();
+			ProfilingAnnotation annotation1 = new ProfilingAnnotation();
+			annotation1.setProfiledObject(dataSetRef);
+			annotation1.setJsonProperties(annotationResult);
+			annotation1.setAnnotationType("JsonAnnotationWriteTest");
+			annotation1.setJavaClass("JsonAnnotationWriteTest");
+			annotations.add(annotation1);
+
+			DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse();
+			resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+			DiscoveryServiceResult dsResult = new DiscoveryServiceResult();
+			dsResult.setAnnotations(annotations);
+			resp.setResult(dsResult);
+			resp.setDetails(this.getClass().getName() + ".runAnalysis finished OK");
+
+			logger.info("Returning from discovery service " + this.getClass().getSimpleName() + " with result: " + JSONUtils.toJSON(resp));
+			return resp;
+		} catch (Exception exc) {
+			throw new RuntimeException(exc);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
new file mode 100755
index 0000000..b1d2518
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * this test uses a mocked storage therefore no zookeeper is required
+ */
+public class ODFConfigurationTest extends ODFTestcase {
+
+	Logger logger = ODFTestLogger.get();
+
+	@Before
+	public void setupDefaultConfig() throws JsonParseException, JsonMappingException, IOException, ValidationException, JSONException {
+		logger.info("reset config to default");
+		InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+		ConfigContainer defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class);
+		ConfigManager configManager = new ODFInternalFactory().create(ConfigManager.class);
+		configManager.updateConfigContainer(defaultConfig);
+	}
+
+	@Test
+	public void testUserDefinedMerge() throws JsonParseException, JsonMappingException, IOException {
+		InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+		ConfigContainer defaultConfig;
+		defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class);
+		//set testProps to defaultValues to be overwritten
+		defaultConfig.getOdf().getUserDefined().put("testProp", "defaultValue");
+		defaultConfig.getOdf().getUserDefined().put("testProp2", "defaultValue");
+		logger.info("Read config: " + defaultConfig);
+
+		//config example with userdefined property testProp to 123
+		String value = "{\r\n\t\"odf\" : {\r\n\t\"userDefined\" : {\r\n\t\t\"testProp\" : 123\r\n\t}\r\n}\r\n}\r\n";
+		ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class);
+		Utils.mergeODFPOJOs(defaultConfig, props);
+		logger.info("Mergded config: " + defaultConfig);
+
+		Assert.assertEquals(123, defaultConfig.getOdf().getUserDefined().get("testProp"));
+		Assert.assertEquals("defaultValue", defaultConfig.getOdf().getUserDefined().get("testProp2"));
+	}
+
+	@Test
+	public void testValidation() throws JsonParseException, JsonMappingException, IOException {
+		boolean exceptionOccured = false;
+		String value = "{\r\n\t\"odf\" : {\r\n\t\t\"discoveryServiceWatcherWaitMs\" : -5\r\n\t}\r\n}\r\n";
+		try {
+			ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class);
+			props.validate();
+		} catch (ValidationException e) {
+			exceptionOccured = true;
+		}
+
+		Assert.assertTrue(exceptionOccured);
+	}
+
+	@Test
+	public void testMerge() throws JsonParseException, JsonMappingException, IOException {
+		InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+		ConfigContainer defaultConfig;
+		defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class);
+		//config example with ODF - queueConsumerWaitMs property value 777
+		String value = "{\r\n\t\"odf\" : {\r\n\t\t\"discoveryServiceWatcherWaitMs\" : 777\r\n\t}\r\n}\r\n";
+		ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class);
+		Utils.mergeODFPOJOs(defaultConfig, props);
+
+		// TODOCONFIG, move next line to kafka tests
+		// Assert.assertEquals(777, defaultConfig.getOdf().getQueueConsumerWaitMs().intValue());
+	}
+
+	@Test
+	public void testDeepMerge() throws JsonParseException, JsonMappingException, IOException {
+		InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+		ConfigContainer defaultConfig;
+		defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class);
+		//config example with ODF - kafkaConsumer - offsetsStorage property value TEST. All other values for the kafkaConsumer should stay the same!
+		String value = "{\r\n\t\"odf\" : {\r\n\"messagingConfiguration\": { \"type\": \"" + KafkaMessagingConfiguration.class.getName()
+				+ "\", \t\t\"kafkaConsumerConfig\" : { \r\n\t\t\t\"offsetsStorage\" : \"TEST\"\r\n\t\t}\r\n\t}\r\n}}\r\n";
+		ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class);
+		Utils.mergeODFPOJOs(defaultConfig, props);
+
+		// TODOCONFIG
+		//		Assert.assertEquals("TEST", defaultConfig.getOdf().getKafkaConsumerConfig().getOffsetsStorage());
+		//make sure the rest is still default
+		//		Assert.assertEquals(400, defaultConfig.getOdf().getKafkaConsumerConfig().getZookeeperSessionTimeoutMs().intValue());
+	}
+
+	@Test
+	public void testGet() {
+		Assert.assertTrue(new ODFFactory().create().getSettingsManager().getODFSettings().isReuseRequests());
+	}
+
+	@Test
+	public void testPut() throws InterruptedException, IOException, ValidationException, JSONException, ServiceNotFoundException {
+		SettingsManager config = new ODFFactory().create().getSettingsManager();
+		String propertyId = "my_dummy_test_property";
+		int testNumber = 123;
+		Map<String, Object> cont = config.getUserDefinedConfig();
+		cont.put(propertyId, testNumber);
+		config.updateUserDefined(cont);
+		Assert.assertEquals(testNumber, config.getUserDefinedConfig().get(propertyId));
+
+		String testString = "test";
+		cont.put(propertyId, testString);
+		config.updateUserDefined(cont);
+
+		Assert.assertEquals(testString, config.getUserDefinedConfig().get(propertyId));
+
+		JSONObject testJson = new JSONObject();
+		testJson.put("testProp", "test");
+		cont.put(propertyId, testJson);
+		config.updateUserDefined(cont);
+
+		Assert.assertEquals(testJson, config.getUserDefinedConfig().get(propertyId));
+
+		ODFSettings settings = config.getODFSettings();
+		logger.info("Last update object: " + JSONUtils.toJSON(settings));
+		Assert.assertNotNull(settings);
+		Assert.assertNotNull(settings.getUserDefined());
+		Assert.assertNotNull(settings.getUserDefined().get(propertyId));
+		logger.info("User defined object: " + settings.getUserDefined().get(propertyId).getClass());
+		@SuppressWarnings("unchecked")
+		Map<String, Object> notifiedNestedJSON = (Map<String, Object>) settings.getUserDefined().get(propertyId);
+		Assert.assertNotNull(notifiedNestedJSON.get("testProp"));
+		Assert.assertTrue(notifiedNestedJSON.get("testProp") instanceof String);
+		Assert.assertEquals("test", notifiedNestedJSON.get("testProp"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
new file mode 100755
index 0000000..aea9a30
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.util.logging.Logger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Encryption;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class PasswordEncryptionTest extends TimerTestBase {
+	Logger logger = ODFTestLogger.get();
+	private static final String SPARK_PASSWORD_CONFIG = "spark.authenticate.secret";
+
+	@Test
+	public void testGeneralPasswordEncryption() throws Exception {
+		SettingsManager settings = new ODFFactory().create().getSettingsManager();
+		ODFSettings settingsWithPlainPasswords = settings.getODFSettingsHidePasswords();
+		settingsWithPlainPasswords.setOdfPassword("newOdfPassword");
+		logger.info("Settings with plain password: " + JSONUtils.toJSON(settingsWithPlainPasswords));
+		settings.updateODFSettings(settingsWithPlainPasswords);
+
+		ODFSettings settingsWithHiddenPasswords = settings.getODFSettingsHidePasswords();
+		String hiddenPasswordIdentifyier = "***hidden***";
+		Assert.assertEquals(hiddenPasswordIdentifyier, settingsWithHiddenPasswords.getOdfPassword());
+		logger.info("Settings with hidden password: " + JSONUtils.toJSON(settingsWithHiddenPasswords));
+
+		ODFSettings settingsWithEncryptedPassword = settings.getODFSettings();
+		Assert.assertEquals("newOdfPassword", Encryption.decryptText(settingsWithEncryptedPassword.getOdfPassword()));
+		logger.info("Settings with encrypted password: " + JSONUtils.toJSON(settingsWithEncryptedPassword));
+
+		// When overwriting settings with hidden passwords, encrypted passwords must be kept internally
+		settings.updateODFSettings(settingsWithHiddenPasswords);
+		settingsWithEncryptedPassword = settings.getODFSettings();
+		Assert.assertEquals("newOdfPassword", Encryption.decryptText(settingsWithEncryptedPassword.getOdfPassword()));
+	}
+
+	@Test
+	public void testSparkConfigEncryption() throws Exception {
+		SettingsManager settings = new ODFFactory().create().getSettingsManager();
+		SparkConfig plainSparkConfig = new SparkConfig();
+		plainSparkConfig.setConfig(SPARK_PASSWORD_CONFIG, "plainConfigValue");
+		ODFSettings settingsWithPlainPasswords = settings.getODFSettings();
+		settingsWithPlainPasswords.setSparkConfig(plainSparkConfig);;
+		logger.info("Settings with plain password: " + JSONUtils.toJSON(settingsWithPlainPasswords));
+		settings.updateODFSettings(settingsWithPlainPasswords);
+
+		ODFSettings settingsWithHiddenPasswords = settings.getODFSettingsHidePasswords();
+		String hiddenPasswordIdentifyier = "***hidden***";
+		String hiddenConfigValue = (String) settingsWithHiddenPasswords.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+		Assert.assertEquals(hiddenPasswordIdentifyier, hiddenConfigValue);
+		logger.info("Config with hidden password: " + JSONUtils.toJSON(settingsWithHiddenPasswords));
+
+		ODFSettings settingsWithEncryptedPassword = settings.getODFSettings();
+		String encryptedConfigValue = (String) settingsWithEncryptedPassword.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+		Assert.assertEquals("plainConfigValue", Encryption.decryptText(encryptedConfigValue));
+		logger.info("Config with encrypted password: " + JSONUtils.toJSON(settingsWithEncryptedPassword));
+
+		// When overwriting settings with hidden passwords, encrypted passwords must be kept internally
+		settings.updateODFSettings(settingsWithHiddenPasswords);
+		encryptedConfigValue = (String) settingsWithEncryptedPassword.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+		Assert.assertEquals("plainConfigValue", Encryption.decryptText(encryptedConfigValue));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
new file mode 100755
index 0000000..3db5778
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.util.Collections;
+
+import org.apache.atlas.odf.api.settings.validation.EnumValidator;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.configuration.ServiceValidator;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.validation.ImplementationValidator;
+import org.apache.atlas.odf.api.settings.validation.NumberPositiveValidator;
+import org.apache.atlas.odf.api.settings.validation.PropertyValidator;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import org.apache.atlas.odf.core.test.discoveryservice.TestAsyncDiscoveryServiceWritingAnnotations1;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ValidationTests extends TimerTestBase {
+
+	@Test
+	public void testEnum() {
+		String[] vals = new String[] { "test", "test2" };
+		String correct = "test";
+		String incorrect = "fail";
+
+		Assert.assertTrue(validateTest(correct, new EnumValidator(vals)));
+		Assert.assertFalse(validateTest(incorrect, new EnumValidator(vals)));
+	}
+
+	@Test
+	public void testImplementation() {
+		String correct = TestAsyncDiscoveryServiceWritingAnnotations1.class.getName();
+		String incorrect = "dummyClass";
+		Assert.assertTrue(validateTest(correct, new ImplementationValidator()));
+		Assert.assertFalse(validateTest(incorrect, new ImplementationValidator()));
+	}
+
+	@Test
+	public void testService() throws Exception {
+		String s = "{\r\n" + 
+				"			\"id\": \"asynctestservice\",\r\n" + 
+				"			\"name\": \"Async test\",\r\n" + 
+				"			\"description\": \"The async test service\",\r\n" + 
+				"			\"endpoint\": {\r\n" + 
+				"				\"runtimeName\": \"Java\",\r\n" + 
+				"				\"className\": \"TestAsyncDiscoveryService1\"\r\n" +
+				"			}\r\n" + 
+				"		}";
+		
+		DiscoveryServiceProperties newService = JSONUtils.fromJSON(s, DiscoveryServiceProperties.class);
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		//ODFConfig odfConfig = new ODFFactory().create(ODFConfiguration.class).getODFConfig();
+
+		ConfigContainer new1 = new ConfigContainer();
+		new1.setRegisteredServices(Collections.singletonList(newService));
+		ConfigManager configManager = new ODFInternalFactory().create(ConfigManager.class);
+		configManager.updateConfigContainer(new1);
+		
+		DiscoveryServiceProperties correct = discoveryServicesManager.getDiscoveryServicesProperties().get(0);
+		Assert.assertEquals("asynctestservice", correct.getId());
+		correct.setId("newId");
+		DiscoveryServiceProperties incorrect = new DiscoveryServiceProperties();
+		Assert.assertTrue(validateTest(correct, new ServiceValidator()));
+		Assert.assertFalse(validateTest(incorrect, new ServiceValidator()));
+	}
+
+	@Test
+	public void testNumber() {
+		int correct = 5;
+		int incorrect = -5;
+		Assert.assertTrue(validateTest(correct, new NumberPositiveValidator()));
+		Assert.assertFalse(validateTest(incorrect, new NumberPositiveValidator()));
+	}
+
+	private boolean validateTest(Object value, PropertyValidator validator) {
+		try {
+			validator.validate(null, value);
+			return true;
+		} catch (ValidationException ex) {
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
new file mode 100755
index 0000000..4fa2eda
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.settings.MessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class AnalysisProcessingTests extends ODFTestcase {
+	Logger logger = ODFTestLogger.get();
+
+	@Test
+	public void testAnalysisProcessingAfterShutdown() throws Exception {
+		final SettingsManager config = new ODFFactory().create().getSettingsManager();
+		final ODFSettings odfSettings = config.getODFSettings();
+		final MessagingConfiguration messagingConfiguration = odfSettings.getMessagingConfiguration();
+		final Long origRequestRetentionMs = messagingConfiguration.getAnalysisRequestRetentionMs();
+		messagingConfiguration.setAnalysisRequestRetentionMs(300000l);
+		config.updateODFSettings(odfSettings);
+
+		ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+		AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+		AnalysisRequest req = tracker.getRequest();
+		req.setDiscoveryServiceSequence(Arrays.asList("asynctestservice"));
+		req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset");
+		final AnalysisResponse startRequest = cc.startRequest(req);
+		logger.info("Analysis :" + startRequest.getId());
+
+		Assert.assertNull(startRequest.getOriginalRequest());
+		Assert.assertFalse(startRequest.isInvalidRequest());
+		final AnalysisResponse duplicate = cc.startRequest(req);
+		Assert.assertNotNull(duplicate.getOriginalRequest());
+		Assert.assertEquals(startRequest.getId(), duplicate.getId());
+		logger.info("Analysis1 duplciate :" + duplicate.getId());
+
+		final AnalysisCancelResult cancelRequest = cc.cancelRequest(startRequest.getId());
+		Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState());
+
+		cc.getQueueManager().stop();
+
+		AnalysisResponse response2 = cc.startRequest(req);
+		logger.info("Analysis2:" + response2.getId());
+		AnalysisRequestStatus requestStatus = cc.getRequestStatus(response2.getId());
+		int maxWait = 20;
+
+		int currentWait = 0;
+		while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.ACTIVE) {
+			Thread.sleep(100);
+			currentWait++;
+			requestStatus = cc.getRequestStatus(response2.getId());
+		}
+		logger.info("THREAD ACTIVE, KILL IT!");
+
+		cc.getQueueManager().start();
+		logger.info("restarted");
+		Assert.assertNull(response2.getOriginalRequest());
+		Assert.assertFalse(response2.isInvalidRequest());
+
+		messagingConfiguration.setAnalysisRequestRetentionMs(origRequestRetentionMs);
+		config.updateODFSettings(odfSettings);
+
+		currentWait = 0;
+		while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.FINISHED) {
+			Thread.sleep(100);
+			requestStatus = cc.getRequestStatus(response2.getId());
+		}
+		Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, requestStatus.getState());
+	}
+
+	@Test
+	public void testRequestWithAnnotationTypes() throws Exception {
+		ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+		AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+		AnalysisRequest req = tracker.getRequest();
+		req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset");
+		List<String> annotationTypes = Arrays.asList(new String[] { "AsyncTestDummyAnnotation" });
+		req.setAnnotationTypes(annotationTypes);
+		logger.info(MessageFormat.format("Running discovery request for annotation type {0}.", annotationTypes));
+		AnalysisResponse resp = cc.startRequest(req);
+		logger.info(MessageFormat.format("Started request id {0}.", resp.getId()));
+		Assert.assertNotNull(resp.getId());
+		Assert.assertFalse(resp.isInvalidRequest());
+
+		int currentWait = 0;
+		int maxWait = 20;
+		AnalysisRequestStatus requestStatus = cc.getRequestStatus(resp.getId());
+		while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.FINISHED) {
+			Thread.sleep(100);
+			requestStatus = cc.getRequestStatus(resp.getId());
+		}
+		Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, requestStatus.getState());
+		Assert.assertEquals("Generated service has incorrect number of elements.", 1, requestStatus.getRequest().getDiscoveryServiceSequence().size());
+		Assert.assertEquals("Generated service sequence differs from expected value.", "asynctestservice", requestStatus.getRequest().getDiscoveryServiceSequence().get(0));
+	}
+
+	@Test
+	public void testRequestWithMissingAnnotationTypes() throws Exception {
+		ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+		AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+		AnalysisRequest req = tracker.getRequest();
+		req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset");
+		List<String> annotationTypes = Arrays.asList(new String[] { "noServiceExistsForThisAnnotationType" });
+		req.setAnnotationTypes(annotationTypes);
+		logger.info(MessageFormat.format("Running discovery request for non-existing annotation type {0}.", annotationTypes));
+		AnalysisResponse resp = cc.startRequest(req);
+		Assert.assertTrue(resp.isInvalidRequest());
+		Assert.assertEquals("Unexpected error message.", "No suitable discovery services found to create the requested annotation types.", resp.getDetails());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
new file mode 100755
index 0000000..fd39e15
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.Collections;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+
+public class AnalysisRequestCancellationTest extends ODFTestcase {
+
+	Logger logger = ODFTestLogger.get();
+
+	AnalysisRequestTracker generateTracker(String id, STATUS status) {
+		AnalysisRequestTracker tracker = new AnalysisRequestTracker();
+		Utils.setCurrentTimeAsLastModified(tracker);
+		tracker.setNextDiscoveryServiceRequest(0);
+		AnalysisRequest req = new AnalysisRequest();
+		req.setId(id);
+		MetaDataObjectReference ref = new MetaDataObjectReference();
+		ref.setId("DataSet" + id);
+		req.setDataSets(Collections.singletonList(ref));
+		tracker.setRequest(req);
+		tracker.setStatus(status);
+		return tracker;
+	}
+
+	@Test
+	public void testRequestCancellationNotFoundFailure() {
+		ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+		AnalysisCancelResult cancelRequest = cc.cancelRequest("dummy_id");
+		Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.NOT_FOUND);
+	}
+
+	@Test
+	public void testRequestCancellationWrongStateFailure() {
+		ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+		AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+		String testId = "test_id1";
+		AnalysisRequestTracker tracker = null;
+		AnalysisCancelResult cancelRequest = null;
+		
+		tracker = generateTracker(testId, STATUS.FINISHED);
+		store.store(tracker);
+		cancelRequest = cc.cancelRequest(testId);
+		Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE);
+
+		tracker = generateTracker(testId, STATUS.ERROR);
+		store.store(tracker);
+		cancelRequest = cc.cancelRequest(testId);
+		Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE);
+
+		tracker = generateTracker(testId, STATUS.CANCELLED);
+		store.store(tracker);
+		cancelRequest = cc.cancelRequest(testId);
+		Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE);
+	}
+
+	@Test
+	public void testRequestCancellationSuccess() {
+		ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+		AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+		String testId = "test_id2";
+
+		AnalysisRequestTracker tracker = generateTracker(testId, STATUS.INITIALIZED);
+		store.store(tracker);
+		AnalysisCancelResult cancelRequest = cc.cancelRequest(testId);
+		Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState());
+
+		tracker = generateTracker(testId, STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+		store.store(tracker);
+		cancelRequest = cc.cancelRequest(testId);
+		Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState());
+
+		tracker = generateTracker(testId, STATUS.DISCOVERY_SERVICE_RUNNING);
+		store.store(tracker);
+		cancelRequest = cc.cancelRequest(testId);
+		Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState());
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
new file mode 100755
index 0000000..7eb46d8
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class AnalysisRequestTrackerStoreTest extends ODFTestcase {
+
+	Logger logger = ODFTestLogger.get();
+
+	AnalysisRequestTracker generateTracker(String id, STATUS status) {
+		AnalysisRequestTracker tracker = new AnalysisRequestTracker();
+		Utils.setCurrentTimeAsLastModified(tracker);
+		tracker.setNextDiscoveryServiceRequest(0);
+		AnalysisRequest req = new AnalysisRequest();
+		req.setId(id);
+		MetaDataObjectReference ref = new MetaDataObjectReference();
+		ref.setId("DataSet" + id);
+		req.setDataSets(Collections.singletonList(ref));
+		tracker.setRequest(req);
+		tracker.setStatus(status);
+		return tracker;
+	}
+
+	@Test
+	public void testStore() throws Exception {
+		AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+		assertNotNull(store);
+		int MAX_TRACKERS = 50;
+		List<AnalysisRequestTracker> trackers1 = new ArrayList<AnalysisRequestTracker>();
+		STATUS lastStatus = STATUS.IN_DISCOVERY_SERVICE_QUEUE;
+		for (int i = 0; i < MAX_TRACKERS; i++) {
+			trackers1.add(generateTracker("STORETEST_ID" + i, lastStatus));
+		}
+
+		logger.info("Storing " + MAX_TRACKERS + " Trackers");
+		long pass1Start = System.currentTimeMillis();
+		for (AnalysisRequestTracker tracker : trackers1) {
+			store.store(tracker);
+		}
+		long pass1End = System.currentTimeMillis();
+
+		logger.info("Storing " + MAX_TRACKERS + " Trackers again with new status");
+
+		lastStatus = STATUS.FINISHED;
+		List<AnalysisRequestTracker> trackers2 = new ArrayList<AnalysisRequestTracker>();
+		for (int i = 0; i < MAX_TRACKERS; i++) {
+			trackers2.add(generateTracker("STORETEST_ID" + i, lastStatus));
+		}
+		long pass2Start = System.currentTimeMillis();
+		for (AnalysisRequestTracker tracker : trackers2) {
+			store.store(tracker);
+		}
+		long pass2End = System.currentTimeMillis();
+
+		Thread.sleep(2000);
+		logger.info("Querying and checking " + MAX_TRACKERS + " Trackers");
+
+		long queryStart = System.currentTimeMillis();
+
+		for (int i = 0; i < MAX_TRACKERS; i++) {
+			final String analysisRequestId = "STORETEST_ID" + i;
+			AnalysisRequestTracker tracker = store.query(analysisRequestId);
+			assertNotNull(tracker);
+			assertEquals(1, tracker.getRequest().getDataSets().size());
+			MetaDataObjectReference ref = new MetaDataObjectReference();
+			ref.setId("DataSet" + analysisRequestId);
+			assertEquals(tracker.getRequest().getDataSets().get(0), ref);
+			assertEquals(lastStatus, tracker.getStatus());
+		}
+		long queryEnd = System.currentTimeMillis();
+
+		System.out.println("First pass: " + (pass1End - pass1Start) + "ms, second pass: " + (pass2End - pass2Start) + "ms, query: " + (queryEnd - queryStart) + "ms");
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
new file mode 100755
index 0000000..347fb84
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.DeclarativeRequestMapper;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+
+public class DeclarativeRequestMapperTest extends ODFTestBase {
+	final private static String SERVICE_CLASSNAME = "TestAsyncDiscoveryService1";
+	final private static String[] EXPECTED_SERVICE_SEQUENCES = new String[] { "pre3,ser1", "alt1,ser1", "pre4,pre1,ser1", 
+			"pre3,ser1,ser3", "pre3,ser1,ser5", "alt1,ser1,ser3", "alt1,ser1,ser5", "pre3,pre2,ser4", "alt1,pre2,ser4", 
+			"pre4,pre1,ser1,ser3", "pre4,pre1,ser1,ser5", "pre3,ser1,alt1,ser3", "pre3,ser1,pre2,ser4", "pre3,ser1,alt1,ser5" };
+	private Logger logger = Logger.getLogger(ControlCenter.class.getName());
+
+	private static void createDiscoveryService(String serviceId, String[] resultingAnnotationTypes, String[] prerequisiteAnnotationTypes, String[] supportedObjectTypes) throws ValidationException, JSONException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties();
+		DiscoveryServiceJavaEndpoint dse = new DiscoveryServiceJavaEndpoint();
+		dse.setClassName(SERVICE_CLASSNAME);
+		dsProperties.setEndpoint(JSONUtils.convert(dse, DiscoveryServiceEndpoint.class));
+		dsProperties.setId(serviceId);
+		dsProperties.setName(serviceId + " Discovery Service");
+		dsProperties.setPrerequisiteAnnotationTypes(Arrays.asList(prerequisiteAnnotationTypes));
+		dsProperties.setResultingAnnotationTypes(Arrays.asList(resultingAnnotationTypes));
+		dsProperties.setSupportedObjectTypes(Arrays.asList(supportedObjectTypes));
+		discoveryServicesManager.createDiscoveryService(dsProperties);
+	}
+
+	private void deleteDiscoveryService(String serviceId, boolean failOnError) throws ValidationException {
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		try {
+			discoveryServicesManager.deleteDiscoveryService(serviceId);
+		}
+		catch (ServiceNotFoundException e) {
+			if (failOnError) {
+				Assert.fail("Error deleting discovery services.");
+			}
+		}		
+	}
+
+	private void deleteDiscoveryServices(boolean failOnError) throws ValidationException {
+		List<String> serviceIds = Arrays.asList(new String[] { "ser1", "ser2", "ser3", "ser4", "ser5", "pre1", "pre2", "pre3", "pre4", "alt1" });
+		for (String serviceId : serviceIds) {
+			deleteDiscoveryService(serviceId, failOnError);
+		}
+	}
+
+	private void createDiscoveryServices() throws ValidationException, JSONException {
+		createDiscoveryService("ser1", new String[] { "an1", "com1", "com2" }, new String[] { "pre1"         }, new String[] { "Table", "DataFile" });
+		createDiscoveryService("ser2", new String[] { "an2", "com1"         }, new String[] { "pre2"         }, new String[] { "Table", "DataFile" });
+		createDiscoveryService("ser3", new String[] {                "com2" }, new String[] { "pre1"         }, new String[] { "Table", "DataFile" });
+		createDiscoveryService("ser4", new String[] { "an1", "com1", "com2" }, new String[] { "pre1", "pre2" }, new String[] { "Table", "DataFile" });
+		createDiscoveryService("ser5", new String[] {        "com1", "com2" }, new String[] { "pre1"         }, new String[] { "Table", "DataFile" });
+
+		createDiscoveryService("pre1", new String[] { "pre1"                }, new String[] { "pre4"         }, new String[] { "Table", "DataFile" });
+		createDiscoveryService("pre2", new String[] { "pre2"                }, new String[] {                }, new String[] { "Table", "DataFile" });
+		createDiscoveryService("pre3", new String[] { "pre1"                }, new String[] {                }, new String[] { "Table", "DataFile" });
+		createDiscoveryService("pre4", new String[] { "pre4"                }, new String[] {                }, new String[] { "Table", "DataFile" });
+
+		createDiscoveryService("alt1", new String[] { "pre1"                }, new String[] {                }, new String[] { "Table", "DataFile" });
+	}
+
+	@Test
+	public void testDiscoveryServiceSequences() throws Exception {
+		deleteDiscoveryServices(false);
+		createDiscoveryServices();
+
+		AnalysisRequest request = new AnalysisRequest();
+		request.setAnnotationTypes(Arrays.asList( new String[] { "an1", "com2" }));
+		DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+		logger.log(Level.INFO, "Printing list of mapper result to stdout.");
+		int i = 0;
+		for (DeclarativeRequestMapper.DiscoveryServiceSequence discoveryApproach : mapper.getDiscoveryServiceSequences()) {
+			String sequence = Utils.joinStrings(new ArrayList<String>(discoveryApproach.getServiceSequence()), ',');
+			System.out.println(sequence);
+			if (i < EXPECTED_SERVICE_SEQUENCES.length) {
+				Assert.assertTrue(sequence.equals(EXPECTED_SERVICE_SEQUENCES[i++]));
+			}
+		}
+		Assert.assertEquals("Number of calculated discovery service sequences does not match expected value.", 36, mapper.getDiscoveryServiceSequences().size());
+
+		deleteDiscoveryServices(true);
+	}
+
+	@Test
+	public void testRecommendedDiscoveryServiceSequence() throws Exception {
+		deleteDiscoveryServices(false);
+		createDiscoveryServices();
+
+		AnalysisRequest request = new AnalysisRequest();
+		request.setAnnotationTypes(Arrays.asList( new String[] { "com2", "pre4" }));
+		DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+		Assert.assertEquals("Recommended sequence does not match expected string.", "pre4,pre1,ser1", Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+		deleteDiscoveryServices(true);
+	}
+
+	@Test
+	public void testRemoveFailingService() throws Exception {
+		deleteDiscoveryServices(false);
+		createDiscoveryServices();
+
+		AnalysisRequest request = new AnalysisRequest();
+		request.setAnnotationTypes(Arrays.asList(new String[] { "an1", "com2" }));
+		DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+		Assert.assertEquals("Original sequence does not match expected string.", EXPECTED_SERVICE_SEQUENCES[0], Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+		mapper.removeDiscoveryServiceSequences("ser1");
+		Assert.assertEquals("Updated sequence does not match expected string.", "pre3,pre2,ser4", Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+		deleteDiscoveryServices(true);
+	}
+
+	@Test
+	public void testRequestWithManyAnnotationTypes() throws Exception {
+		deleteDiscoveryServices(false);
+		createDiscoveryServices();
+
+		AnalysisRequest request = new AnalysisRequest();
+		request.setAnnotationTypes(Arrays.asList(new String[] {  "an1", "an2", "com1", "com2", "pre1", "pre2", "pre4" }));
+		DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+		Assert.assertEquals("Number of calculated discovery service sequences does not match expected value.", 75, mapper.getDiscoveryServiceSequences().size());
+
+		deleteDiscoveryServices(true);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
new file mode 100755
index 0000000..96a4fee
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+
+public class DefaultThreadManagerTest extends TimerTestBase {
+
+	int threadMS = 100;
+	int waitMS = 5000;
+	
+	Logger logger = ODFTestLogger.get();
+
+	class TestRunnable implements ODFRunnable {
+
+		String id;
+		boolean cancelled = false;
+		long msToWaitBeforeFinish;
+		
+		public TestRunnable(String id, long msToWaitBeforeFinish) {
+			this.id = id;
+			this.msToWaitBeforeFinish = msToWaitBeforeFinish;
+		}
+		
+		public TestRunnable(String id) {
+			this(id, threadMS);
+		}
+
+		@Override
+		public void run() {
+			logger.info("Starting thread with ID: " + id);
+			try {
+				Thread.sleep(msToWaitBeforeFinish);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			logger.info("Thread finished with ID: " + id);
+
+		}
+
+		@Override
+		public void setExecutorService(ExecutorService service) {
+			// TODO Auto-generated method stub
+
+		}
+
+		@Override
+		public void cancel() {
+			cancelled = true;
+		}
+
+		@Override
+		public boolean isReady() {
+			return true;
+		}
+
+	}
+
+	@Test
+	public void testSimple() throws Exception {
+		ODFInternalFactory f = new ODFInternalFactory();
+		ThreadManager tm = f.create(ThreadManager.class);
+		tm.setExecutorService(f.create(ExecutorServiceFactory.class).createExecutorService());
+		assertNotNull(tm);
+
+		String id1 = "id1";
+		String id2 = "id2";
+
+		// start id1
+		ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id1);
+		Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+		boolean b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated();
+		assertTrue(b);
+		b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated();
+		assertFalse(b);
+
+		st = tm.getStateOfUnmanagedThread(id1);
+		Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, st);
+
+		// start id2
+		st = tm.getStateOfUnmanagedThread(id2);
+		Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+		b = tm.startUnmanagedThread(id2, new TestRunnable(id2)).isNewThreadCreated();
+		assertTrue(b);
+		b = tm.startUnmanagedThread(id2, new TestRunnable(id2)).isNewThreadCreated();
+		assertFalse(b);
+
+		Thread.sleep(waitMS);
+		st = tm.getStateOfUnmanagedThread(id1);
+		Assert.assertEquals(ThreadStatus.ThreadState.FINISHED, st);
+		b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated();
+		assertTrue(b);
+
+		st = tm.getStateOfUnmanagedThread(id2);
+		// id2 should be removed from thread list
+		Assert.assertTrue(ThreadStatus.ThreadState.FINISHED.equals(st) || ThreadStatus.ThreadState.NON_EXISTENT.equals(st));
+
+		tm.shutdownThreads(Arrays.asList("id1", "id2"));
+	}
+
+	@Test
+	public void testManyThreads() throws Exception {
+		ODFInternalFactory f = new ODFInternalFactory();
+		ThreadManager tm = f.create(ThreadManager.class);
+		tm.setExecutorService(f.create(ExecutorServiceFactory.class).createExecutorService());
+
+		assertNotNull(tm);
+
+		List<String> threadIds = new ArrayList<>();
+		int THREAD_NUM = 20;
+		for (int i = 0; i < THREAD_NUM; i++) {
+			String id = "ThreadID" + i;
+			threadIds.add(id);
+			ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id);
+			Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+			boolean b = tm.startUnmanagedThread(id, new TestRunnable(id)).isNewThreadCreated();
+			assertTrue(b);
+			b = tm.startUnmanagedThread(id, new TestRunnable(id)).isNewThreadCreated();
+			assertFalse(b);
+
+			st = tm.getStateOfUnmanagedThread(id);
+			Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, st);
+
+		}
+		logger.info("All threads scheduled");
+
+		Thread.sleep(waitMS);
+
+		for (int i = 0; i < THREAD_NUM; i++) {
+			String id = "ThreadID" + i;
+			ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id);
+			Assert.assertEquals(ThreadStatus.ThreadState.FINISHED, st);
+		}
+		tm.shutdownThreads(threadIds);
+
+	}
+
+}


Mime
View raw message