Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EE413200CB5 for ; Wed, 28 Jun 2017 07:57:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EC9D7160BF6; Wed, 28 Jun 2017 05:57:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7C0E5160BFC for ; Wed, 28 Jun 2017 07:57:38 +0200 (CEST) Received: (qmail 58620 invoked by uid 500); 28 Jun 2017 05:57:37 -0000 Mailing-List: contact commits-help@atlas.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.apache.org Delivered-To: mailing list commits@atlas.apache.org Received: (qmail 58611 invoked by uid 99); 28 Jun 2017 05:57:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jun 2017 05:57:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 1F2C6C6B32 for ; Wed, 28 Jun 2017 05:57:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.231 X-Spam-Level: X-Spam-Status: No, score=-4.231 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 0RGHa2qmkxir for ; Wed, 28 Jun 2017 05:57:31 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 5641B60D33 for ; Wed, 28 Jun 2017 05:57:16 +0000 (UTC) Received: (qmail 56921 invoked by uid 99); 28 Jun 2017 05:57:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jun 2017 05:57:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4AFACF3241; Wed, 28 Jun 2017 05:57:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: madhan@apache.org To: commits@atlas.incubator.apache.org Date: Wed, 28 Jun 2017 05:57:28 -0000 Message-Id: <1e0156cb0f65432d9aba0dc0685c6cdc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF archived-at: Wed, 28 Jun 2017 05:57:41 -0000 http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java new file mode 100755 index 0000000..f999ecf --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java @@ -0,0 +1,27 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.ExecutorService; + +public interface ODFRunnable extends Runnable { + + void setExecutorService(ExecutorService service); + + void cancel(); + + // return true if the runnable is likely to be ready to receive data + boolean isReady(); + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java new file mode 100755 index 0000000..e6642c5 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.ExecutorService; + + +public interface QueueMessageProcessor { + + /** + * callback to process the message taken from the queue. + * + * @param executorService + * @param msg The message to be processed + * @param partition The kafka topic partition this message was read from + * @param msgOffset The offset of this particular message on this kafka partition + * @return + */ + void process(ExecutorService executorService, String msg, int partition, long msgOffset); + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java new file mode 100755 index 0000000..da06dd2 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java @@ -0,0 +1,42 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.settings.validation.ValidationException; + +public interface ServiceRuntime { + + String getName(); + + /** + * Check if the runtime is currently available for processing. + * Returns <= 0 if the runtime is available immediately. A number > 0 + * indicates how many seconds to wait until retrying. + * + * Note: If this method returns > 0 the Kafka consumer will be shut down and only be + * started again when it returns <= 0. Shutting down and restarting the consumer is + * rather costly so this should only be done if the runtime won't be accepting requests + * for a foreseeable period of time. + */ + long getWaitTimeUntilAvailable(); + + DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props); + + String getDescription(); + + void validate(DiscoveryServiceProperties props) throws ValidationException; + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java new file mode 100755 index 0000000..a867580 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java @@ -0,0 +1,147 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.net.URL; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException; +import org.apache.atlas.odf.api.engine.ServiceRuntimeInfo; +import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; + +public class ServiceRuntimes { + + static Logger logger = Logger.getLogger(ServiceRuntimes.class.getName()); + + static List getRuntimeExtensions() throws IOException { + ClassLoader cl = ServiceRuntimes.class.getClassLoader(); + Enumeration services = cl.getResources("META-INF/odf/odf-runtimes.txt"); + List result = new ArrayList<>(); + while (services.hasMoreElements()) { + URL url = services.nextElement(); + InputStream is = url.openStream(); + InputStreamReader isr = new InputStreamReader(is, "UTF-8"); + LineNumberReader lnr = new LineNumberReader(isr); + String line = null; + while ((line = lnr.readLine()) != null) { + line = line.trim(); + logger.log(Level.INFO, "Loading runtime extension ''{0}''", line); + try { + @SuppressWarnings("unchecked") + Class clazz = (Class) cl.loadClass(line); + ServiceRuntime sr = clazz.newInstance(); + result.add(sr); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + logger.log(Level.WARNING, MessageFormat.format("Runtime extension of class ''{0}'' could not be instantiated", line), e); + } + } + } + logger.log(Level.INFO, "Number of classpath services found: {0}", result.size()); + return result; + } + + static { + List allRuntimes = new ArrayList<>(Arrays.asList( // + new HealthCheckServiceRuntime(), // + new JavaServiceRuntime(), // + new SparkServiceRuntime() // + )); + try { + List runtimeExtensions = getRuntimeExtensions(); + allRuntimes.addAll(runtimeExtensions); + } catch (IOException e) { + logger.log(Level.WARNING, "An exception occurred when loading runtime extensions, ignoring them", e); + } + runtimes = Collections.unmodifiableList(allRuntimes); + } + + private static List runtimes; + + public static List getActiveRuntimes() { + Environment env = new ODFInternalFactory().create(Environment.class); + List activeRuntimeNames = env.getActiveRuntimeNames(); + if (activeRuntimeNames == null) { + return getAllRuntimes(); + } + // always add health check runtime + Set activeRuntimeNamesSet = new HashSet<>(activeRuntimeNames); + activeRuntimeNamesSet.add(HealthCheckServiceRuntime.HEALTH_CHECK_RUNTIME_NAME); + List activeRuntimes = new ArrayList<>(); + for (ServiceRuntime rt : runtimes) { + if (activeRuntimeNamesSet.contains(rt.getName())) { + activeRuntimes.add(rt); + } + } + return activeRuntimes; + } + + public static List getAllRuntimes() { + return runtimes; + } + + public static ServiceRuntime getRuntimeForDiscoveryService(DiscoveryServiceProperties discoveryServiceProps) { + DiscoveryServiceEndpoint ep = discoveryServiceProps.getEndpoint(); + for (ServiceRuntime runtime : getAllRuntimes()) { + if (runtime.getName().equals(ep.getRuntimeName())) { + return runtime; + } + } + return null; + } + + public static ServiceRuntime getRuntimeForDiscoveryService(String discoveryServiceId) { + // special check because the healch check runtime is not part of the configuration + if (discoveryServiceId.startsWith(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID)) { + return new HealthCheckServiceRuntime(); + } + DiscoveryServiceManager dsm = new ODFInternalFactory().create(DiscoveryServiceManager.class); + try { + DiscoveryServiceProperties props = dsm.getDiscoveryServiceProperties(discoveryServiceId); + return getRuntimeForDiscoveryService(props); + } catch (ServiceNotFoundException e) { + return null; + } + } + + public static ServiceRuntimesInfo getRuntimesInfo(List runtimes) { + List rts = new ArrayList<>(); + for (ServiceRuntime rt : runtimes) { + ServiceRuntimeInfo sri = new ServiceRuntimeInfo(); + sri.setName(rt.getName()); + sri.setDescription(rt.getDescription()); + rts.add(sri); + } + ServiceRuntimesInfo result = new ServiceRuntimesInfo(); + result.setRuntimes(rts); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java new file mode 100755 index 0000000..6dc1fd0 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java @@ -0,0 +1,110 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.text.MessageFormat; +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.api.spark.SparkServiceExecutor; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint; +import org.apache.atlas.odf.core.Utils; + +/** + * Proxy for calling any type of Spark discovery services. + * + * + */ + +public class SparkDiscoveryServiceProxy implements SyncDiscoveryService { + Logger logger = Logger.getLogger(SparkDiscoveryServiceProxy.class.getName()); + + protected MetadataStore metadataStore; + protected AnnotationStore annotationStore; + protected ExecutorService executorService; + private DiscoveryServiceProperties dsri; + + public SparkDiscoveryServiceProxy(DiscoveryServiceProperties dsri) { + this.dsri = dsri; + } + + @Override + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public void setMetadataStore(MetadataStore metadataStore) { + this.metadataStore = metadataStore; + } + + @Override + public DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer) { + DataSetCheckResult checkResult = new DataSetCheckResult(); + checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible); + try { + SparkServiceExecutor executor = new ODFInternalFactory().create(SparkServiceExecutor.class); + checkResult = executor.checkDataSet(this.dsri, dataSetContainer); + } catch (Exception e) { + logger.log(Level.WARNING,"Error running discovery service.", e); + checkResult.setDetails(Utils.getExceptionAsString(e)); + } + return checkResult; + } + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + logger.log(Level.INFO,MessageFormat.format("Starting Spark discovery service ''{0}'', id {1}.", new Object[]{ dsri.getName(), dsri.getId() })); + DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse(); + DiscoveryServiceSparkEndpoint endpoint; + try { + endpoint = JSONUtils.convert(dsri.getEndpoint(), DiscoveryServiceSparkEndpoint.class); + } catch (JSONException e1) { + throw new RuntimeException(e1); + } + if ((endpoint.getJar() == null) || (endpoint.getJar().isEmpty())) { + response.setDetails("No jar file was provided that implements the Spark application."); + } else try { + SparkServiceExecutor executor = new ODFInternalFactory().create(SparkServiceExecutor.class); + response = executor.runAnalysis(this.dsri, request); + logger.log(Level.FINER, "Spark discovery service response: " + response.toString()); + logger.log(Level.INFO,"Spark discover service finished."); + return response; + } catch (Exception e) { + logger.log(Level.WARNING,"Error running Spark application: ", e); + response.setDetails(Utils.getExceptionAsString(e)); + } + response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR); + return response; + } + + @Override + public void setAnnotationStore(AnnotationStore annotationStore) { + this.annotationStore = annotationStore; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java new file mode 100755 index 0000000..91056b3 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java @@ -0,0 +1,58 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.json.JSONUtils; + +public class SparkServiceRuntime implements ServiceRuntime { + + public static final String SPARK_RUNTIME_NAME = "Spark"; + + @Override + public String getName() { + return SPARK_RUNTIME_NAME; + } + + @Override + public long getWaitTimeUntilAvailable() { + return 0; + } + + @Override + public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) { + return new SparkDiscoveryServiceProxy(props); + } + + @Override + public String getDescription() { + return "The default Spark runtime"; + } + + @Override + public void validate(DiscoveryServiceProperties props) throws ValidationException { + try { + JSONUtils.convert(props.getEndpoint(), DiscoveryServiceSparkEndpoint.class); + } catch (JSONException e1) { + throw new ValidationException("Endpoint definition for Spark service is not correct: " + Utils.getExceptionAsString(e1)); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java new file mode 100755 index 0000000..206a6d0 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java @@ -0,0 +1,52 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.metadata.models.Annotation; + +// JSON +public class StatusQueueEntry { + + private Annotation annotation; + private AnalysisRequestTracker analysisRequestTracker; + + public Annotation getAnnotation() { + return annotation; + } + + public void setAnnotation(Annotation annotation) { + this.annotation = annotation; + } + + public AnalysisRequestTracker getAnalysisRequestTracker() { + return analysisRequestTracker; + } + + public void setAnalysisRequestTracker(AnalysisRequestTracker analysisRequestTracker) { + this.analysisRequestTracker = analysisRequestTracker; + } + + + public static String getRequestId(StatusQueueEntry seq) { + if (seq.getAnnotation() != null) { + return seq.getAnnotation().getAnalysisRun(); + } else if (seq.getAnalysisRequestTracker() != null) { + return seq.getAnalysisRequestTracker().getRequest().getId(); + } + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java new file mode 100755 index 0000000..33dba10 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java @@ -0,0 +1,68 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; + +import org.apache.atlas.odf.api.engine.ThreadStatus; + +public interface ThreadManager { + + void waitForThreadsToBeReady(long waitingLimitMs, List startedThreads) throws TimeoutException; + + ThreadStartupResult startUnmanagedThread(String name, ODFRunnable runnable); + + ThreadStatus.ThreadState getStateOfUnmanagedThread(String name); + + ODFRunnable getRunnable(String name); + + void setExecutorService(ExecutorService executorService); + + void shutdownAllUnmanagedThreads(); + + void shutdownThreads(List names); + + int getNumberOfRunningThreads(); + + List getThreadManagerStatus(); + + public abstract class ThreadStartupResult { + + private String threadId; + private boolean newThreadCreated; + + public ThreadStartupResult(String id) { + this.threadId = id; + } + + public String getThreadId() { + return threadId; + } + + public boolean isNewThreadCreated() { + return newThreadCreated; + } + + public void setNewThreadCreated(boolean newThreadCreated) { + this.newThreadCreated = newThreadCreated; + } + + public abstract boolean isReady(); + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java new file mode 100755 index 0000000..f1c7704 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java @@ -0,0 +1,76 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; + +public class TrackerUtil { + + /** + * @param tracker + * @return true if the first analysis of the tracker has not yet been started + */ + public static boolean isAnalysisWaiting(AnalysisRequestTracker tracker) { + return tracker.getNextDiscoveryServiceRequest() == 0 && (tracker.getStatus() == STATUS.IN_DISCOVERY_SERVICE_QUEUE || tracker.getStatus() == STATUS.INITIALIZED); // || tracker.getStatus() == STATUS.DISCOVERY_SERVICE_RUNNING); + } + + public static boolean isCancellable(AnalysisRequestTracker tracker) { + return (tracker.getStatus() == STATUS.IN_DISCOVERY_SERVICE_QUEUE || tracker.getStatus() == STATUS.INITIALIZED || tracker.getStatus() == STATUS.DISCOVERY_SERVICE_RUNNING); + } + + public static DiscoveryServiceRequest getCurrentDiscoveryServiceStartRequest(AnalysisRequestTracker tracker) { + int i = tracker.getNextDiscoveryServiceRequest(); + List requests = tracker.getDiscoveryServiceRequests(); + if (i >= 0 && i < requests.size()) { + return requests.get(i); + } + return null; + } + + public static DiscoveryServiceResponse getCurrentDiscoveryServiceStartResponse(AnalysisRequestTracker tracker) { + int i = tracker.getNextDiscoveryServiceRequest(); + List responses = tracker.getDiscoveryServiceResponses(); + if (responses == null || responses.isEmpty()) { + return null; + } + if (i >= 0 && i < responses.size()) { + return responses.get(i); + } + return null; + } + + public static void moveToNextDiscoveryService(AnalysisRequestTracker tracker) { + int i = tracker.getNextDiscoveryServiceRequest(); + List requests = tracker.getDiscoveryServiceRequests(); + if (i >= 0 && i < requests.size()) { + tracker.setNextDiscoveryServiceRequest(i+1); + } + } + + public static void addDiscoveryServiceStartResponse(AnalysisRequestTracker tracker, DiscoveryServiceResponse response) { + List l = tracker.getDiscoveryServiceResponses(); + if (l == null) { + l = new ArrayList(); + tracker.setDiscoveryServiceResponses(l); + } + l.add(response); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java new file mode 100755 index 0000000..1a3de04 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java @@ -0,0 +1,97 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.core.ODFInternalFactory; + +public class TransactionAsyncDiscoveryServiceProxy implements AsyncDiscoveryService { + + private AsyncDiscoveryService wrappedService; + + public TransactionAsyncDiscoveryServiceProxy(AsyncDiscoveryService wrappedService) { + this.wrappedService = wrappedService; + } + + public DiscoveryServiceAsyncStartResponse startAnalysis(final DiscoveryServiceRequest request) { + TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class); + try { + return (DiscoveryServiceAsyncStartResponse) transactionContextExecutor.runInTransactionContext(new Callable() { + + @Override + public Object call() throws Exception { + return wrappedService.startAnalysis(request); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + public DiscoveryServiceAsyncRunStatus getStatus(final String runId) { + TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class); + try { + return (DiscoveryServiceAsyncRunStatus) transactionContextExecutor.runInTransactionContext(new Callable() { + + @Override + public Object call() throws Exception { + return wrappedService.getStatus(runId); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + public void setExecutorService(ExecutorService executorService) { + wrappedService.setExecutorService(executorService); + } + + public void setMetadataStore(MetadataStore metadataStore) { + wrappedService.setMetadataStore(metadataStore); + } + + public void setAnnotationStore(AnnotationStore annotationStore) { + wrappedService.setAnnotationStore(annotationStore); + } + + public DataSetCheckResult checkDataSet(final DataSetContainer dataSetContainer) { + TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class); + try { + return (DataSetCheckResult) transactionContextExecutor.runInTransactionContext(new Callable() { + + @Override + public Object call() throws Exception { + return wrappedService.checkDataSet(dataSetContainer); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java new file mode 100755 index 0000000..6c17686 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java @@ -0,0 +1,33 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.Callable; + +/** + * Use this interface in the core framework whenever you want to run code that is run from an unmanaged thread (typically in the Kafka consumers) + * and that accesses the metadata repository. The implementation of this class will ensure that the code will be run in the + * correct context (regarding transactions etc.) + * + * + */ +public interface TransactionContextExecutor { + + /** + * Run a generic callable in a transaction context. This is not a template function as some of the underlying infrastructures + * might not be able to support it. + */ + Object runInTransactionContext(Callable callable) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java new file mode 100755 index 0000000..ec96e96 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java @@ -0,0 +1,79 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.core.ODFInternalFactory; + +public class TransactionSyncDiscoveryServiceProxy implements SyncDiscoveryService { + + private SyncDiscoveryService wrappedService; + + public TransactionSyncDiscoveryServiceProxy(SyncDiscoveryService wrappedService) { + this.wrappedService = wrappedService; + } + + public DiscoveryServiceSyncResponse runAnalysis(final DiscoveryServiceRequest request) { + TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class); + try { + return (DiscoveryServiceSyncResponse) transactionContextExecutor.runInTransactionContext(new Callable() { + + @Override + public Object call() throws Exception { + return wrappedService.runAnalysis(request); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + public void setExecutorService(ExecutorService executorService) { + wrappedService.setExecutorService(executorService); + } + + public void setMetadataStore(MetadataStore metadataStore) { + wrappedService.setMetadataStore(metadataStore); + } + + public void setAnnotationStore(AnnotationStore annotationStore) { + wrappedService.setAnnotationStore(annotationStore); + } + + public DataSetCheckResult checkDataSet(final DataSetContainer dataSetContainer) { + TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class); + try { + return (DataSetCheckResult) transactionContextExecutor.runInTransactionContext(new Callable() { + + @Override + public Object call() throws Exception { + return wrappedService.checkDataSet(dataSetContainer); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java new file mode 100755 index 0000000..e7cbc44 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java @@ -0,0 +1,258 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.discoveryservice; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRuntimeStatistics; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceStatus; +import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException; +import org.apache.atlas.odf.api.discoveryservice.ServiceStatusCount; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.configuration.ConfigContainer; +import org.apache.atlas.odf.core.configuration.ConfigManager; +import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore; +import org.apache.atlas.odf.core.controlcenter.ControlCenter; + +/** + * + * External Java API for creating and managing discovery services + * + */ +public class DiscoveryServiceManagerImpl implements DiscoveryServiceManager { + private Logger logger = Logger.getLogger(DiscoveryServiceManagerImpl.class.getName()); + public ConfigManager configManager; + + public DiscoveryServiceManagerImpl() { + configManager = new ODFInternalFactory().create(ConfigManager.class); + } + + /** + * Retrieve list of discovery services registered in ODF + * @return List of registered ODF discovery services + */ + public List getDiscoveryServicesProperties() { + logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServicesProperties"); + List dsProperties = configManager.getConfigContainer().getRegisteredServices(); + return dsProperties; + }; + + /** + * Register a new service in ODF + * @param dsProperties Properties of the discovery service to register + * @throws ValidationException Validation of a property failed + */ + public void createDiscoveryService(DiscoveryServiceProperties dsProperties) throws ValidationException { + logger.entering(DiscoveryServiceManager.class.getName(), "createDiscoveryService"); + ConfigContainer update = new ConfigContainer(); + List registeredServices = configManager.getConfigContainer().getRegisteredServices(); + registeredServices.addAll(Collections.singletonList(dsProperties)); + update.setRegisteredServices(registeredServices); + configManager.updateConfigContainer(update); + + + }; + + /** + * Update configuration of an ODF discovery service + * @param dsProperties Properties of the discovery service to update + */ + public void replaceDiscoveryService(DiscoveryServiceProperties dsProperties) throws ServiceNotFoundException, ValidationException { + logger.entering(DiscoveryServiceManager.class.getName(), "updateDiscoveryService"); + String serviceId = dsProperties.getId(); + deleteDiscoveryService(serviceId); + createDiscoveryService(dsProperties); + }; + + /** + * Remove a registered service from ODF + * @param serviceId Discovery service ID + */ + public void deleteDiscoveryService(String serviceId) throws ServiceNotFoundException, ValidationException { + logger.entering(DiscoveryServiceManager.class.getName(), "deleteDiscoveryService"); + ConfigContainer cc = configManager.getConfigContainer(); + Iterator iterator = cc.getRegisteredServices().iterator(); + boolean serviceFound = false; + while (iterator.hasNext()) { + if (iterator.next().getId().equals(serviceId)) { + iterator.remove(); + serviceFound = true; + } + } + if (!serviceFound) { + throw new ServiceNotFoundException(serviceId); + } else { + configManager.updateConfigContainer(cc); + } + }; + + /** + * Retrieve current configuration of a discovery services registered in ODF + * @param serviceId Discovery Service ID + * @return Properties of the service with this ID + * @throws ServiceNotFoundException A service with this ID is not registered + */ + public DiscoveryServiceProperties getDiscoveryServiceProperties(String serviceId) throws ServiceNotFoundException { + logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServiceProperties"); + DiscoveryServiceProperties serviceFound = null; + List registeredServices; + registeredServices = configManager.getConfigContainer().getRegisteredServices(); + for (DiscoveryServiceProperties service : registeredServices) { + if (service.getId().equals(serviceId)) { + serviceFound = service; + break; + } + } + if (serviceFound == null) { + throw new ServiceNotFoundException(serviceId); + } + return serviceFound; + }; + + /** + * Retrieve status overview of all discovery services registered in ODF + * @return List of status count maps for all discovery services + */ + public List getDiscoveryServiceStatusOverview() { + DiscoveryServiceStatistics stats = new DiscoveryServiceStatistics(new ODFInternalFactory().create(AnalysisRequestTrackerStore.class).getRecentTrackers(0,-1)); + return stats.getStatusCountPerService(); + } + + /** + * Retrieve status of a specific discovery service. Returns null if no service info can be obtained + * @param serviceId Discovery Service ID + * @return Status of the service with this ID + */ + public DiscoveryServiceStatus getDiscoveryServiceStatus(String serviceId) throws ServiceNotFoundException { + logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServiceStatus"); + + DiscoveryServiceStatus dsStatus = null; + ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class); + DiscoveryService ds = cc.getDiscoveryServiceProxy(serviceId, null); + if (ds == null) { + throw new ServiceNotFoundException(serviceId); + } + dsStatus = new DiscoveryServiceStatus(); + dsStatus.setStatus(DiscoveryServiceStatus.Status.OK); + dsStatus.setMessage(MessageFormat.format("Discovery service ''{0}'' status is OK", serviceId)); + ServiceStatusCount serviceStatus = null; + List statusCounts = getDiscoveryServiceStatusOverview(); + for (ServiceStatusCount cnt : statusCounts) { + if (cnt.getId().equals(serviceId)) { + serviceStatus = cnt; + break; + } + } + if (serviceStatus != null) { + dsStatus.setStatusCount(serviceStatus); + } + return dsStatus; + }; + + /** + * Retrieve runtime statistics of a specific discovery service + * @param serviceId Discovery Service ID + * @return Runtime statistics of the service with this ID + */ + public DiscoveryServiceRuntimeStatistics getDiscoveryServiceRuntimeStatistics(String serviceId) throws ServiceNotFoundException { + logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServiceRuntimeStatistics"); + DiscoveryServiceRuntimeStatistics dsrs = new DiscoveryServiceRuntimeStatistics(); + dsrs.setAverageProcessingTimePerItemInMillis(0); // TODO: implement + return dsrs; + }; + + /** + * Delete runtime statistics of a specific discovery service + * @param serviceId Discovery Service ID + */ + public void deleteDiscoveryServiceRuntimeStatistics(String serviceId) throws ServiceNotFoundException { + logger.entering(DiscoveryServiceManager.class.getName(), "deleteDiscoveryServiceRuntimeStatistics"); + // TODO: implement + }; + + /** + * Retrieve picture representing a discovery service + * @param serviceId Discovery Service ID + * @return Input stream for image + */ + public InputStream getDiscoveryServiceImage(String serviceId) throws ServiceNotFoundException { + logger.entering(DiscoveryServiceManager.class.getName(), "getDiscoveryServiceImage"); + final String defaultImageDir = "org/apache/atlas/odf/images"; + + String imgUrl = null; + for (DiscoveryServiceProperties info : configManager.getConfigContainer().getRegisteredServices()) { + if (info.getId().equals(serviceId)) { + imgUrl = info.getIconUrl(); + break; + } + } + + ClassLoader cl = this.getClass().getClassLoader(); + InputStream is = null; + if (imgUrl != null) { + is = cl.getResourceAsStream("META-INF/odf/" + imgUrl); + if (is == null) { + is = cl.getResourceAsStream(defaultImageDir + "/" + imgUrl); + if (is == null) { + try { + is = new URL(imgUrl).openStream(); + } catch (MalformedURLException e) { + logger.log(Level.WARNING, "The specified image url {0} for service {1} is invalid!", new String[] { imgUrl, serviceId }); + } catch (IOException e) { + logger.log(Level.WARNING, "The specified image url {0} for service {1} could not be accessed!", new String[] { imgUrl, serviceId }); + } + } + } + } + if (imgUrl == null || is == null) { + //TODO is this correct? maybe we should use a single default image instead of a random one + try { + is = cl.getResourceAsStream(defaultImageDir); + if (is != null) { + InputStreamReader r = new InputStreamReader(is); + BufferedReader br = new BufferedReader(r); + List images = new ArrayList<>(); + String line = null; + while ((line = br.readLine()) != null) { + images.add(line); + } + // return random image + int ix = Math.abs(serviceId.hashCode()) % images.size(); + is = cl.getResourceAsStream(defaultImageDir + "/" + images.get(ix)); + } + } catch (IOException exc) { + logger.log(Level.WARNING, "Exception occurred while retrieving random image, ignoring it", exc); + is = null; + } + } + return is; + }; + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java new file mode 100755 index 0000000..6be0e5a --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java @@ -0,0 +1,83 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.discoveryservice; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.ServiceStatusCount; + +public class DiscoveryServiceStatistics { + + private List requests = new ArrayList(); + + public DiscoveryServiceStatistics(List requests) { + this.requests = requests; + } + + public List getStatusCountPerService() { + List result = new ArrayList(); + + Map> statusMap = new HashMap>(); + + for (AnalysisRequestTracker tracker : requests) { + int maxDiscoveryServiceRequest = (tracker.getNextDiscoveryServiceRequest() == 0 ? 1 : tracker.getNextDiscoveryServiceRequest()); + for (int no = 0; no < maxDiscoveryServiceRequest; no++) { + STATUS cntStatus = tracker.getStatus(); + + //No parallel requests are possible atm -> all requests leading to current one must be finished + if (no < maxDiscoveryServiceRequest - 1) { + cntStatus = STATUS.FINISHED; + } + + DiscoveryServiceRequest req = tracker.getDiscoveryServiceRequests().get(no); + LinkedHashMap cntMap = statusMap.get(req.getDiscoveryServiceId()); + if (cntMap == null) { + cntMap = new LinkedHashMap(); + //add 0 default values + for (STATUS status : STATUS.values()) { + cntMap.put(status, 0); + } + } + Integer val = cntMap.get(cntStatus); + val++; + cntMap.put(cntStatus, val); + statusMap.put(req.getDiscoveryServiceId(), cntMap); + } + } + + for (String key : statusMap.keySet()) { + ServiceStatusCount cnt = new ServiceStatusCount(); + cnt.setId(key); + for (DiscoveryServiceProperties info : new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()) { + if (info.getId().equals(key)) { + cnt.setName(info.getName()); + break; + } + } + cnt.setStatusCountMap(statusMap.get(key)); + result.add(cnt); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java new file mode 100755 index 0000000..d09297a --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java @@ -0,0 +1,221 @@ +/** + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.engine; + +import java.io.InputStream; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.apache.atlas.odf.api.engine.EngineManager; +import org.apache.atlas.odf.api.engine.MessagingStatus; +import org.apache.atlas.odf.api.engine.ODFEngineOptions; +import org.apache.atlas.odf.api.engine.ODFStatus; +import org.apache.atlas.odf.api.engine.ODFVersion; +import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo; +import org.apache.atlas.odf.api.engine.SystemHealth; +import org.apache.atlas.odf.api.engine.ThreadStatus; +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.core.ODFInitializer; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.ODFUtils; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.controlcenter.AdminMessage; +import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type; +import org.apache.atlas.odf.core.controlcenter.ControlCenter; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes; +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager; + +/** +* +* External Java API for managing and controlling the ODF engine +* +*/ +public class EngineManagerImpl implements EngineManager { + + private Logger logger = Logger.getLogger(EngineManagerImpl.class.getName()); + + public EngineManagerImpl() { + } + + /** + * Checks the health status of ODF + * + * @return Health status of the ODF engine + */ + public SystemHealth checkHealthStatus() { + SystemHealth health = new SystemHealth(); + try { + AnalysisRequest dummyRequest = new AnalysisRequest(); + String dataSetID = ControlCenter.HEALTH_TEST_DATA_SET_ID_PREFIX + UUID.randomUUID().toString(); + MetaDataObjectReference dataSetRef = new MetaDataObjectReference(); + dataSetRef.setId(dataSetID); + dummyRequest.setDataSets(Collections.singletonList(dataSetRef)); + List discoveryServiceSequence = new ArrayList(); + discoveryServiceSequence.add(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID); + dummyRequest.setDiscoveryServiceSequence(discoveryServiceSequence); + + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + AnalysisResponse resp = analysisManager.runAnalysis(dummyRequest); + String reqId = resp.getId(); + AnalysisRequestStatus status = null; + final int maxNumberOfTimesToPoll = 500; + int count = 0; + int msToSleepBetweenPolls = 20; + boolean continuePolling = false; + do { + status = analysisManager.getAnalysisRequestStatus(reqId); + continuePolling = (status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.NOT_FOUND) && count < maxNumberOfTimesToPoll; + if (continuePolling) { + count++; + Thread.sleep(msToSleepBetweenPolls); + } + } while (continuePolling); + logger.log(Level.INFO, "Health check request ''{3}'' has status ''{0}'', time spent: {2}ms details ''{1}''", new Object[] { status.getState(), status.getDetails(), + count * msToSleepBetweenPolls, reqId }); + health.getMessages().add(MessageFormat.format("Details message: {0}", status.getDetails())); + if (count >= maxNumberOfTimesToPoll) { + health.setStatus( SystemHealth.HealthStatus.WARNING); + String msg = MessageFormat.format("Health test request could not be processed in time ({0}ms)", (maxNumberOfTimesToPoll * msToSleepBetweenPolls)); + logger.log(Level.INFO, msg); + health.getMessages().add(msg); + } else { + switch (status.getState()) { + case NOT_FOUND: + health.setStatus(SystemHealth.HealthStatus.ERROR); + health.getMessages().add(MessageFormat.format("Request ID ''{0}'' got lost", reqId)); + break; + case ERROR: + health.setStatus(SystemHealth.HealthStatus.ERROR); + break; + case FINISHED: + health.setStatus(SystemHealth.HealthStatus.OK); + break; + default: + health.setStatus(SystemHealth.HealthStatus.ERROR); + } + } + } catch (Exception exc) { + logger.log(Level.WARNING, "An unknown error occurred", exc); + health.setStatus(SystemHealth.HealthStatus.ERROR); + health.getMessages().add(Utils.getExceptionAsString(exc)); + } + return health; + } + + /** + * Returns the status of the ODF thread manager + * + * @return Status of all threads making up the ODF thread manager + */ + public List getThreadManagerStatus() { + ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class); + return tm.getThreadManagerStatus(); + } + + /** + * Returns the status of the ODF messaging subsystem + * + * @return Status of the ODF messaging subsystem + */ + public MessagingStatus getMessagingStatus() { + return new ODFInternalFactory().create(DiscoveryServiceQueueManager.class).getMessagingStatus(); + } + + /** + * Returns the status of the messaging subsystem and the internal thread manager + * + * @return Combined status of the messaging subsystem and the internal thread manager + */ + public ODFStatus getStatus() { + ODFStatus status = new ODFStatus(); + status.setMessagingStatus(this.getMessagingStatus()); + status.setThreadManagerStatus(this.getThreadManagerStatus()); + return status; + } + + /** + * Returns the current ODF version + * + * @return ODF version identifier + */ + public ODFVersion getVersion() { + InputStream is = ODFUtils.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/odfversion.txt"); + ODFVersion version = new ODFVersion(); + if (is == null) { + version.setVersion("NOTFOUND"); + } else { + version.setVersion(Utils.getInputStreamAsString(is, "UTF-8").trim()); + } + return version; + } + + /** + * Shuts down the ODF engine, purges all scheduled analysis requests from the queues, and cancels all running analysis requests. + * This means that all running jobs will be cancelled or their results will not be reported back. + * (for debugging purposes only) + * + * @param options Option for immediately restarting the engine after shutdown (default is not to restart immediately but only when needed) + */ + public void shutdown(ODFEngineOptions options) { + long currentTime = System.currentTimeMillis(); + + ControlCenter controlCenter = new ODFInternalFactory().create(ControlCenter.class); + AdminMessage shutDownMessage = new AdminMessage(); + Type t = Type.SHUTDOWN; + if (options.isRestart()) { + t = Type.RESTART; + } + shutDownMessage.setAdminMessageType(t); + String detailMsg = MessageFormat.format("Shutdown was requested on {0} via ODF API", new Object[] { new Date() }); + shutDownMessage.setDetails(detailMsg); + logger.log(Level.INFO, detailMsg); + controlCenter.getQueueManager().enqueueInAdminQueue(shutDownMessage); + int maxPolls = 60; + int counter = 0; + int timeBetweenPollsMs = 1000; + while (counter < maxPolls && ODFInitializer.getLastStopTimestamp() <= currentTime) { + try { + Thread.sleep(timeBetweenPollsMs); + } catch (InterruptedException e) { + e.printStackTrace(); + } + counter++; + } + long timeWaited = ((counter * timeBetweenPollsMs) / 1000); + logger.log(Level.INFO, "Waited for {0} seconds for shutdown", timeWaited); + if (counter >= maxPolls) { + logger.log(Level.WARNING, "Waited for shutdown too long. Continuing." ); + } else { + logger.log(Level.INFO, "Shutdown issued successfully"); + } + } + + @Override + public ServiceRuntimesInfo getRuntimesInfo() { + return ServiceRuntimes.getRuntimesInfo(ServiceRuntimes.getAllRuntimes()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java new file mode 100755 index 0000000..9177556 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java @@ -0,0 +1,53 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.messaging; + +/** + * Default encryption: no encryption + * + */ +public class DefaultMessageEncryption implements MessageEncryption { + + @Override + public String encrypt(String message) { + return message; + } + + @Override + public String decrypt(String message) { + return message; + } + + + /* + // this used to be our default encryption. Leaving it in here for reference. + @Override + public String encrypt(String message) { + try { + return DatatypeConverter.printBase64Binary(message.getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + @Override + public String decrypt(String message) { + try { + return new String(DatatypeConverter.parseBase64Binary(message), "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + */ +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java new file mode 100755 index 0000000..d2d84dd --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.messaging; + +import java.util.concurrent.TimeoutException; + +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.engine.MessagingStatus; +import org.apache.atlas.odf.core.controlcenter.AdminMessage; +import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry; + + + +public interface DiscoveryServiceQueueManager { + + void start() throws TimeoutException; + + void stop() throws TimeoutException; + + // find the next queue where this tracker should go and put it there + void enqueue(AnalysisRequestTracker tracker); + + void enqueueInStatusQueue(StatusQueueEntry sqe); + + void enqueueInAdminQueue(AdminMessage message); + + MessagingStatus getMessagingStatus(); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java new file mode 100755 index 0000000..ad1bf28 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java @@ -0,0 +1,20 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.messaging; + +public interface MessageEncryption { + String encrypt(String message); + + String decrypt(String message); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java new file mode 100755 index 0000000..c71ba3c --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java @@ -0,0 +1,381 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.metadata; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.metadata.AnnotationPropagator; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.api.metadata.StoredMetaDataObject; +import org.apache.atlas.odf.api.metadata.models.Annotation; +import org.apache.atlas.odf.api.metadata.models.RelationshipAnnotation; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; + +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.metadata.DefaultMetadataQueryBuilder; +import org.apache.atlas.odf.api.metadata.InternalMetaDataUtils; +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.api.metadata.MetadataStoreException; +import org.apache.atlas.odf.api.metadata.models.ClassificationAnnotation; +import org.apache.atlas.odf.api.metadata.models.ConnectionInfo; +import org.apache.atlas.odf.api.metadata.models.MetaDataObject; +import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation; +import org.apache.atlas.odf.json.JSONUtils; + +/** + * In-memory implementation of MetadataStore interface to be used for testing as + * well as for single-node ODF deployments. Uses static HashMaps for storing the + * metadata types and objects. + * + * + */ +public class DefaultMetadataStore extends WritableMetadataStoreBase implements WritableMetadataStore { + private Logger logger = Logger.getLogger(DefaultMetadataStore.class.getName()); + + private static final String METADATA_STORE_ID = "ODF_LOCAL_METADATA_STORE"; + private static final String STORE_PROPERTY_TYPE = "default"; + private static final String STORE_PROPERTY_DESCRIPTION = "ODF local metadata store"; + + private static HashMap typeStore; + private static HashMap objectStore; + protected LinkedHashMap stagedObjects = new LinkedHashMap(); + private static boolean isInitialized = false; + protected static Object accessLock = new Object(); + static Object initializationLock = new Object(); + + public DefaultMetadataStore() { + synchronized (initializationLock) { + if (!isInitialized) { + isInitialized = true; + this.resetAllData(); + } + } + } + + protected WritableMetadataStore getMetadataStore() { + return this; + } + + protected Object getAccessLock() { + return accessLock; + } + + protected HashMap getObjects() { + return objectStore; + } + + protected LinkedHashMap getStagedObjects() { + return stagedObjects; + } + + @Override + public ConnectionInfo getConnectionInfo(MetaDataObject informationAsset) { + synchronized(accessLock) { + return WritableMetadataStoreUtils.getConnectionInfo(this, informationAsset); + } + }; + + @Override + public void resetAllData() { + logger.log(Level.INFO, "Resetting all data in metadata store."); + synchronized (accessLock) { + typeStore = new HashMap(); + objectStore = new HashMap(); + createTypes(WritableMetadataStoreUtils.getBaseTypes()); + } + } + + @Override + public Properties getProperties() { + Properties props = new Properties(); + props.put(MetadataStore.STORE_PROPERTY_DESCRIPTION, STORE_PROPERTY_DESCRIPTION); + props.put(MetadataStore.STORE_PROPERTY_TYPE, STORE_PROPERTY_TYPE); + props.put(STORE_PROPERTY_ID, METADATA_STORE_ID); + return props; + } + + @Override + public String getRepositoryId() { + return METADATA_STORE_ID; + } + + @Override + public List search(String query) { + if ((query == null) || query.isEmpty()) { + throw new MetadataStoreException("The search term cannot be null or empty."); + } + logger.log(Level.INFO, MessageFormat.format("Processing query \"{0}\".", query)); + synchronized (accessLock) { + LinkedList queryElements = new LinkedList(); + for (String el : query.split(DefaultMetadataQueryBuilder.SEPARATOR_STRING)) { + queryElements.add(el); + } + List result = new ArrayList(); + String firstOperator = queryElements.removeFirst(); + + if (firstOperator.equals(DefaultMetadataQueryBuilder.DATASET_IDENTIFIER)) { + String requestedObjectType = queryElements.removeFirst(); + for (StoredMetaDataObject currentInternalObject : getObjects().values()) { + MetaDataObject currentObject = currentInternalObject.getMetaDataObject(); + String currentObjectType = getObjectType(currentObject); + try { + if (isSubTypeOf(requestedObjectType, currentObjectType) + && isConditionMet(currentObject, queryElements)) { + result.add(currentObject.getReference()); + } + } catch (IllegalArgumentException | IllegalAccessException e) { + throw new MetadataStoreException( + MessageFormat.format("Error processing \"{0}\" clause of query.", + DefaultMetadataQueryBuilder.DATASET_IDENTIFIER)); + } + } + return result; + } else { + throw new MetadataStoreException(MessageFormat.format("Query ''{0}'' is not valid.", query)); + } + } + } + + @Override + public void createSampleData() { + logger.log(Level.INFO, "Creating sample data in metadata store."); + SampleDataHelper.copySampleFiles(); + WritableMetadataStoreUtils.createSampleDataObjects(this); + } + + @Override + public AnnotationPropagator getAnnotationPropagator() { + return new AnnotationPropagator() { + + @Override + public void propagateAnnotations(AnnotationStore as, String requestId) { + List annotations = as.getAnnotations(null, requestId); + for (Annotation annot : annotations) { + ensureAnnotationTypeExists(annot); + annot.setReference(null); // Set reference to null because a new reference will be generated by the metadata store + getMetadataStore().createObject(annot); + commit(); + } + } + }; + } + + /** + * Internal helper that creates a list of types in the metadata store. + * + * @param typeList List of types to be created + */ + private void createTypes(List> typeList) { + synchronized (accessLock) { + for (Class type : typeList) { + if (!typeStore.containsKey(type.getSimpleName())) { + logger.log(Level.INFO, + MessageFormat.format("Creating new type \"{0}\" in metadata store.", type.getSimpleName())); + typeStore.put(type.getSimpleName(), type.getSuperclass().getSimpleName()); + } else { + throw new MetadataStoreException(MessageFormat.format( + "A type with the name \"{0}\" already exists in this metadata store.", type.getName())); + } + } + } + }; + + /** + * Internal helper that returns the type name of a given metadata object. + * + * @param mdo Metadata object + * @return Type name + */ + protected String getObjectType(MetaDataObject mdo) { + if (mdo instanceof Annotation) { + // Important when using the MetadataStore as an AnnotationStore + return ((Annotation) mdo).getAnnotationType(); + } else { + return mdo.getClass().getSimpleName(); + } + } + + /** + * Internal helper that checks if a type is a sub type of another type + * + * @param subTypeName Name of the type that is supposed to be the sub type + * @param parentTypeName Name of the type that is supposed to be the parent type + */ + private boolean isSubTypeOf(String subTypeName, String parentTypeName) { + if (subTypeName.equals(parentTypeName)) { + return true; + } + if (typeStore.get(parentTypeName) != null) { + String parent = typeStore.get(parentTypeName); + if ((parent != null) && (!parent.equals(parentTypeName))) { + if (isSubTypeOf(subTypeName, parent)) { + return true; + } + } + } + return false; + } + + /** + * Internal helper that checks if the attributes of a given metadata object meet a given condition. + * + * @param mdo Metadata object + * @param condition List of tokens that make up the condition phrase + */ + private boolean isConditionMet(MetaDataObject mdo, LinkedList condition) + throws IllegalArgumentException, IllegalAccessException { + if (condition.isEmpty()) { + return true; + } + LinkedList clonedCondition = new LinkedList(); + clonedCondition.addAll(condition); + try { + JSONObject mdoJson = JSONUtils.toJSONObject(mdo); + logger.log(Level.FINER, MessageFormat.format("Evaluating object \"{0}\".", mdoJson)); + while (clonedCondition.size() >= 4) { + // Each condition clause consists of four elements, e.g. "where + // name = 'BankClientsShort'" or "and name = 'BankClientsShort'" + String operator = clonedCondition.removeFirst(); + String attribute = clonedCondition.removeFirst(); + String comparator = clonedCondition.removeFirst(); + String expectedValueWithQuotes = clonedCondition.removeFirst(); + while ((!expectedValueWithQuotes.endsWith(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER)) && (clonedCondition.size() != 0)) { + expectedValueWithQuotes = expectedValueWithQuotes + DefaultMetadataQueryBuilder.SEPARATOR_STRING + clonedCondition.removeFirst(); + } + if (operator.equals(DefaultMetadataQueryBuilder.CONDITION_PREFIX) + || operator.equals(DefaultMetadataQueryBuilder.AND_IDENTIFIER)) { + if (mdoJson.containsKey(attribute)) { + String actualValue = (String) mdoJson.get(attribute) != null ? mdoJson.get(attribute).toString() : null; + if (comparator.equals(DefaultMetadataQueryBuilder.EQUALS_IDENTIFIER)) { + if (!expectedValueWithQuotes.equals(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER + actualValue + DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER)) { + // Condition is not met + return false; + } + } else if (comparator.equals(DefaultMetadataQueryBuilder.NOT_EQUALS_IDENTIFIER)) { + if (expectedValueWithQuotes.equals(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER + actualValue + DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER)) { + // Condition is not met + return false; + } + } else { + throw new MetadataStoreException( + MessageFormat.format("Unknown comparator \"{0}\" in query condition \"{1}\".", + new Object[] { comparator, condition.toString() })); + } + } else { + logger.log(Level.INFO, + MessageFormat.format("The object does not contain attribute \"{0}\".", attribute)); + // Condition is not met + return false; + } + } else { + throw new MetadataStoreException( + MessageFormat.format("Syntax error in query condition \"{0}\".", condition.toString())); + } + } + if (clonedCondition.size() != 0) { + throw new MetadataStoreException( + MessageFormat.format("Error parsing trailing query elements \"{0}\".", clonedCondition)); + } + // All conditions are met + return true; + } catch (JSONException e) { + throw new MetadataStoreException(MessageFormat.format("Error parsing JSON object {0} in query.", mdo), e); + } + } + + /** + * Internal helper that merges the references of a staged metadata object with the references of the current metadata object + * stored in the metadata store. The missing references are added to the provided object in place. + * + * @param object Internal representation of a staged metadata object + */ + private void mergeReferenceMap(StoredMetaDataObject object) { + HashMap> mergedObjectRefMap = new HashMap>(); + String objectId = object.getMetaDataObject().getReference().getId(); + if (getObjects().get(objectId) != null) { + // Only merge if the object already exists in the metadata store + HashMap> originalRefMap = getObjects().get(objectId) + .getReferenceMap(); // Get reference map of exiting object + HashMap> updatedObjectRefMap = object.getReferenceMap(); + for (String referenceId : updatedObjectRefMap.keySet()) { + // Update original reference map in place + mergedObjectRefMap.put(referenceId, + InternalMetaDataUtils.mergeReferenceLists(originalRefMap.get(referenceId), updatedObjectRefMap.get(referenceId))); + } + object.setReferencesMap(mergedObjectRefMap); + } + } + + @Override + public void commit() { + synchronized (accessLock) { + // Check if all required types exist BEFORE starting to create the + // objects in order to avoid partial creation of objects + for (Map.Entry mapEntry : this.stagedObjects.entrySet()) { + String typeName = getObjectType(mapEntry.getValue().getMetaDataObject()); + if ((typeName == null) || !typeStore.containsKey(typeName)) { + throw new MetadataStoreException(MessageFormat.format( + "The type \"{0}\" of the object you are trying to create does not exist in this metadata store.", + typeName)); + } + } + + // Move objects from staging area into metadata store + for (Map.Entry mapEntry : this.stagedObjects.entrySet()) { + StoredMetaDataObject object = mapEntry.getValue(); + String typeName = getObjectType(mapEntry.getValue().getMetaDataObject()); + logger.log(Level.INFO, + MessageFormat.format( + "Creating or updating object with id ''{0}'' and type ''{1}'' in metadata store.", + new Object[] { object.getMetaDataObject().getReference(), typeName })); + String objectId = object.getMetaDataObject().getReference().getId(); + mergeReferenceMap(object); // Merge new object references with + // existing object references in + // metadata store + getObjects().put(objectId, object); + } + + // Clear staging area + stagedObjects = new LinkedHashMap(); + } + } + + /** + * Internal helper that creates a new annotation type in the internal type store if it does not yet exist. + * + * @param mds Metadata store to operate on + */ + private void ensureAnnotationTypeExists(Annotation annotation) { + String annotationType = annotation.getAnnotationType(); + if (typeStore.get(annotationType) == null) { + if (annotation instanceof ProfilingAnnotation) { + typeStore.put(annotationType, "ProfilingAnnotation"); + } else if (annotation instanceof ClassificationAnnotation) { + typeStore.put(annotationType, "ClassificationAnnotation"); + } else if (annotation instanceof RelationshipAnnotation) { + typeStore.put(annotationType, "RelationshipAnnotation"); + } + } + } +}