Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D8324200CB5 for ; Wed, 28 Jun 2017 07:57:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D67D0160BE9; Wed, 28 Jun 2017 05:57:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8A1EB160BF6 for ; Wed, 28 Jun 2017 07:57:48 +0200 (CEST) Received: (qmail 59348 invoked by uid 500); 28 Jun 2017 05:57:47 -0000 Mailing-List: contact commits-help@atlas.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.apache.org Delivered-To: mailing list commits@atlas.apache.org Received: (qmail 59339 invoked by uid 99); 28 Jun 2017 05:57:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jun 2017 05:57:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 275EC1AA96F for ; Wed, 28 Jun 2017 05:57:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id RGZl8dTVB6_O for ; Wed, 28 Jun 2017 05:57:31 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 6CBB560D65 for ; Wed, 28 Jun 2017 05:57:17 +0000 (UTC) Received: (qmail 56922 invoked by uid 99); 28 Jun 2017 05:57:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jun 2017 05:57:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F4C0F3242; Wed, 28 Jun 2017 05:57:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: madhan@apache.org To: commits@atlas.incubator.apache.org Date: Wed, 28 Jun 2017 05:57:29 -0000 Message-Id: <375421df7dd84a4291d41f342b39601c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF archived-at: Wed, 28 Jun 2017 05:57:51 -0000 http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java new file mode 100755 index 0000000..4ffa195 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java @@ -0,0 +1,454 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.OpenDiscoveryFramework; +import org.apache.atlas.odf.api.analysis.*; +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.api.metadata.AnnotationPropagator; +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.api.metadata.models.MetaDataObject; +import org.apache.atlas.odf.api.metadata.models.UnknownDataSet; +import org.apache.atlas.odf.core.Encryption; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisCancelResult; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException; +import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService; +import org.apache.atlas.odf.core.Utils; + +public class ControlCenter { + + private static final String CLASSNAME = ControlCenter.class.getName(); + private Logger logger = Logger.getLogger(ControlCenter.class.getName()); + + public static final String HEALTH_TEST_DISCOVERY_SERVICE_ID = "odf-health-test-discovery-service-id"; + public static final String HEALTH_TEST_DATA_SET_ID_PREFIX = "odf-health-test-dummy-data-set-id"; + + DiscoveryServiceQueueManager queueManager = null; + AnalysisRequestTrackerStore store = null; + Environment environment = null; + OpenDiscoveryFramework odf; + + public ControlCenter() { + ODFInternalFactory f = new ODFInternalFactory(); + queueManager = f.create(DiscoveryServiceQueueManager.class); + store = f.create(AnalysisRequestTrackerStore.class); + odf = new ODFFactory().create(); + environment = f.create(Environment.class); + } + + private String createNewRequestId() { + return "odf-request-" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis(); + } + + public DiscoveryServiceQueueManager getQueueManager() { + return queueManager; + } + + public AnalysisResponse startRequest(AnalysisRequest request) { + final String METHODNAME = "startRequest()"; + logger.entering(CLASSNAME, METHODNAME); + AnalysisResponse response = new AnalysisResponse(); + AnalysisRequest requestWithServiceSequence = null; + try { + requestWithServiceSequence = JSONUtils.fromJSON(JSONUtils.toJSON(request), AnalysisRequest.class); + } catch (JSONException e) { + throw new RuntimeException("Error cloning analysis request."); + } + if ((request.getDiscoveryServiceSequence() == null) || request.getDiscoveryServiceSequence().isEmpty()) { + DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request); + List discoveryServiceSequence = mapper.getRecommendedDiscoveryServiceSequence(); + logger.log(Level.INFO, "Using discovery service sequence: " + Utils.joinStrings(discoveryServiceSequence, ',')); + if (discoveryServiceSequence == null) { + response.setId(request.getId()); + response.setInvalidRequest(true); + response.setDetails("No suitable discovery services found to create the requested annotation types."); + return response; + } + requestWithServiceSequence.setDiscoveryServiceSequence(discoveryServiceSequence); + } + try { + //Initialize queues to make sure analysis can be started + queueManager.start(); + } catch (TimeoutException e) { + logger.warning("queues could not be started in time"); + } + AnalysisRequestTracker similarTracker = store.findSimilarQueuedRequest(requestWithServiceSequence); + if (similarTracker != null) { + logger.log(Level.WARNING, "A similar request for the issued one is already in the queue."); + logger.log(Level.FINE, "A similar request for the issued one is already in the queue. Original request: {0}, found similar request: {1}", + new Object[] { JSONUtils.lazyJSONSerializer(requestWithServiceSequence), + JSONUtils.lazyJSONSerializer(similarTracker) }); + } + String newRequestId = createNewRequestId(); + response.setId(newRequestId); + requestWithServiceSequence.setId(newRequestId); + AnalysisRequestTracker tracker = createTracker(requestWithServiceSequence, response); + // if request is invalid, response was already modified and null is returned + if (tracker != null) { + tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.IN_DISCOVERY_SERVICE_QUEUE); + logger.log(Level.FINE, "Starting new request with ID ''{0}''. Tracker: {1}", new Object[] { newRequestId, JSONUtils.lazyJSONSerializer(tracker) }); + store.store(tracker); + logger.log(Level.FINEST, "Stored tracker for new request with ID ''{0}''. Tracker: {1}", new Object[] { newRequestId, JSONUtils.lazyJSONSerializer(tracker) }); + queueManager.enqueue(tracker); + logger.log(Level.FINEST, "Tracker enqueued for new request with ID ''{0}''. Tracker: {1}", new Object[] { newRequestId, JSONUtils.lazyJSONSerializer(tracker) }); + } + logger.exiting(CLASSNAME, METHODNAME); + return response; + } + + public AnalysisRequestStatus getRequestStatus(String requestId) { + final String METHODNAME = "getRequestStatus(String)"; + logger.entering(CLASSNAME, METHODNAME); + AnalysisRequestStatus result = new AnalysisRequestStatus(); + AnalysisRequestTracker tracker = store.query(requestId); + if (tracker == null) { + result.setState(AnalysisRequestStatus.State.NOT_FOUND); + } else { + AnalysisRequestStatus.State state = null; + switch (tracker.getStatus()) { + case INITIALIZED: + case IN_DISCOVERY_SERVICE_QUEUE: + state = AnalysisRequestStatus.State.QUEUED; + break; + case ERROR: + state = AnalysisRequestStatus.State.ERROR; + break; + case DISCOVERY_SERVICE_RUNNING: + state = AnalysisRequestStatus.State.ACTIVE; + break; + case FINISHED: + state = AnalysisRequestStatus.State.FINISHED; + break; + case CANCELLED: + state = AnalysisRequestStatus.State.CANCELLED; + default: + ; + } + result.setState(state); + result.setDetails(tracker.getStatusDetails()); + result.setRequest(tracker.getRequest()); + + long totalProcessingTime = 0; + long totalQueuingTime = 0; + long totalTimeSpentStoringAnnotations = 0; + + List requests = new ArrayList(); + for (DiscoveryServiceRequest req : tracker.getDiscoveryServiceRequests()) { + DiscoveryServiceRequest copyReq = new DiscoveryServiceRequest(); + copyReq.setDiscoveryServiceId(req.getDiscoveryServiceId()); + long putOnQueue = req.getPutOnRequestQueue(); + long startedProcessing = req.getTakenFromRequestQueue(); + long finishedProcessing = req.getFinishedProcessing(); + + totalProcessingTime += (finishedProcessing > 0 ? finishedProcessing - startedProcessing : finishedProcessing); + totalQueuingTime += (startedProcessing > 0 ? startedProcessing - putOnQueue : startedProcessing); + totalTimeSpentStoringAnnotations += req.getTimeSpentStoringResults(); + + copyReq.setFinishedProcessing(finishedProcessing); + copyReq.setPutOnRequestQueue(putOnQueue); + copyReq.setTakenFromRequestQueue(startedProcessing); + requests.add(copyReq); + } + + result.setTotalTimeOnQueues(totalQueuingTime); + result.setTotalTimeProcessing(totalProcessingTime); + result.setTotalTimeStoringAnnotations(totalTimeSpentStoringAnnotations); + result.setServiceRequests(requests); + } + logger.log(Level.FINE, "Returning request status object {0}", JSONUtils.lazyJSONSerializer(result)); + logger.exiting(CLASSNAME, METHODNAME); + return result; + } + + public AnalysisCancelResult cancelRequest(String requestId) { + final String METHODNAME = "cancelRequest(String)"; + logger.entering(CLASSNAME, METHODNAME); + + AnalysisCancelResult result = new AnalysisCancelResult(); + result.setState(AnalysisCancelResult.State.NOT_FOUND); + + AnalysisRequestTracker request = store.query(requestId); + //TODO implement cancellation of running instead of only queued requests. + if (request != null) { + if (TrackerUtil.isCancellable(request)) { + request.setStatus(AnalysisRequestTrackerStatus.STATUS.CANCELLED); + store.store(request); + logger.info("cancelled request with id " + requestId); + result.setState(AnalysisCancelResult.State.SUCCESS); + } else { + logger.log(Level.FINER, "Request ''{0}'' could not be cancelled. State ''{1}'', next request number:. ''{2}''", new Object[]{requestId, request.getStatus(), request.getNextDiscoveryServiceRequest()}); + result.setState(AnalysisCancelResult.State.INVALID_STATE); + } + } + logger.exiting(CLASSNAME, METHODNAME); + return result; + } + + private AnalysisRequestTracker createTracker(AnalysisRequest request, AnalysisResponse response) { + DiscoveryServiceManager discoveryServiceManager = odf.getDiscoveryServiceManager(); + List registeredServices = new ArrayList<>(discoveryServiceManager.getDiscoveryServicesProperties()); + registeredServices.add(HealthCheckServiceRuntime.getHealthCheckServiceProperties()); + String currentUser = this.environment.getCurrentUser(); + + /* + List datasets = request.getDataSets(); + + if (datasets.size() == 1 && datasets.get(0).getId().startsWith(HEALTH_TEST_DATA_SET_ID_PREFIX)) { + // health test mode + AnalysisRequestTracker healthTestTracker = new AnalysisRequestTracker(); + DiscoveryServiceRequest dssr = new DiscoveryServiceRequest(); + dssr.setOdfRequestId(request.getId()); + dssr.setDiscoveryServiceId(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID); + String odfUrl = new ODFFactory().create().getSettingsManager().getODFSettings().getOdfUrl(); + dssr.setOdfUrl(odfUrl); + MetaDataObjectReference dsr = datasets.get(0); + + DataSetContainer dataSetContainer = new DataSetContainer(); + DataSet oMDataSet = new UnknownDataSet(); + oMDataSet.setReference(dsr); + dataSetContainer.setDataSet(oMDataSet); + + dssr.setDataSetContainer(dataSetContainer); + dssr.setUser(currentUser); + dssr.setAdditionalProperties(request.getAdditionalProperties()); + healthTestTracker.setDiscoveryServiceRequests(Collections.singletonList(dssr)); + healthTestTracker.setRequest(request); + healthTestTracker.setStatus(STATUS.INITIALIZED); + Utils.setCurrentTimeAsLastModified(healthTestTracker); + healthTestTracker.setUser(currentUser); + response.setDetails("Request is a special health test request."); + return healthTestTracker; + } + */ + + List startRequests = new ArrayList(); + List discoveryServiceSequence = request.getDiscoveryServiceSequence(); + if (discoveryServiceSequence != null && !discoveryServiceSequence.isEmpty()) { + logger.log(Level.FINE, "Request issued with fixed discovery service sequence: {0}", discoveryServiceSequence); + // first check if discoveryService IDs are valid + Set foundDSs = new HashSet(discoveryServiceSequence); + for (String ds : discoveryServiceSequence) { + for (DiscoveryServiceProperties regInfo : registeredServices) { + if (regInfo.getId().equals(ds)) { + foundDSs.remove(ds); + } + } + } + // if there are some IDs left that were not found + if (!foundDSs.isEmpty()) { + String msg = MessageFormat.format("The discovery services {0} could not be found", Utils.collectionToString(foundDSs, ",")); + logger.log(Level.WARNING, msg); + response.setInvalidRequest(true); + response.setDetails(msg); + return null; + } + + // for each data set process all discovery services + // (possible alternative, not used here: for all discovery services process each data set) + for (MetaDataObjectReference dataSetId : request.getDataSets()) { + MetaDataObject mdo = null; + if (dataSetId.getId().startsWith(HEALTH_TEST_DATA_SET_ID_PREFIX)) { + mdo = new UnknownDataSet(); + mdo.setReference(dataSetId); + } else { + mdo = odf.getMetadataStore().retrieve(dataSetId); + } + if (mdo == null) { + String msg = MessageFormat.format("The meta data object id ''{0}'' does not reference an existing metadata object. Request will be set to error.", dataSetId.toString()); + logger.log(Level.WARNING, msg); + response.setInvalidRequest(true); + response.setDetails(msg); + return null; + } + if (dataSetId.getUrl() == null) { + dataSetId.setUrl(mdo.getReference().getUrl()); + } + for (String ds : discoveryServiceSequence) { + DiscoveryServiceRequest req = new DiscoveryServiceRequest(); + DataSetContainer dataSetContainer = new DataSetContainer(); + dataSetContainer.setDataSet(mdo); + req.setDataSetContainer(dataSetContainer); + req.setOdfRequestId(request.getId()); + req.setDiscoveryServiceId(ds); + req.setUser(currentUser); + req.setAdditionalProperties(request.getAdditionalProperties()); + String odfUrl = odf.getSettingsManager().getODFSettings().getOdfUrl(); + req.setOdfUrl(odfUrl); + for (DiscoveryServiceProperties dsri : odf.getDiscoveryServiceManager().getDiscoveryServicesProperties()) { + if (dsri.getId().equals(ds)) { + if (dsri.getEndpoint().getRuntimeName().equals(SparkServiceRuntime.SPARK_RUNTIME_NAME)) { + req.setOdfUser(odf.getSettingsManager().getODFSettings().getOdfUser()); + //Note that the password has to be provided as plain text here because the remote service cannot decrypt it otherwise. + //TODO: Consider to provide a temporary secure token instead of the password. + req.setOdfPassword(Encryption.decryptText(odf.getSettingsManager().getODFSettings().getOdfPassword())); + } + } + } + startRequests.add(req); + } + } + } else { + String msg = "The request didn't contain any processing hints. ODF cannot process a request without an analysis sequence."; + logger.log(Level.WARNING, msg); + response.setInvalidRequest(true); + response.setDetails(msg); + return null; + } + + AnalysisRequestTracker tracker = new AnalysisRequestTracker(); + tracker.setDiscoveryServiceRequests(startRequests); + tracker.setNextDiscoveryServiceRequest(0); + tracker.setRequest(request); + tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.INITIALIZED); + Utils.setCurrentTimeAsLastModified(tracker); + tracker.setUser(currentUser); + return tracker; + } + + boolean requiresMetaDataCache(DiscoveryService service) { + return service instanceof SparkDiscoveryServiceProxy; + } + + public static SyncDiscoveryService getDiscoveryServiceProxy(String discoveryServiceId, AnalysisRequest request) { + try { + ODFInternalFactory factory = new ODFInternalFactory(); + DiscoveryServiceManager dsm = factory.create(DiscoveryServiceManager.class); + DiscoveryServiceProperties serviceProps = null; + if (discoveryServiceId.startsWith(HEALTH_TEST_DISCOVERY_SERVICE_ID)) { + serviceProps = HealthCheckServiceRuntime.getHealthCheckServiceProperties(); + } else { + serviceProps = dsm.getDiscoveryServiceProperties(discoveryServiceId); + } + ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(discoveryServiceId); + if (runtime == null) { + throw new RuntimeException(MessageFormat.format("Service runtime for service ''{0}'' was not found.", discoveryServiceId)); + } + DiscoveryService runtimeProxy = runtime.createDiscoveryServiceProxy(serviceProps); + SyncDiscoveryService proxy = null; + if (runtimeProxy instanceof AsyncDiscoveryService) { + proxy = new AsyncDiscoveryServiceWrapper( (AsyncDiscoveryService) runtimeProxy); + } else { + proxy = (SyncDiscoveryService) runtimeProxy; + } + proxy.setMetadataStore(factory.create(MetadataStore.class)); + AnnotationStore as = factory.create(AnnotationStore.class); + if (request != null) { + as.setAnalysisRun(request.getId()); + } + proxy.setAnnotationStore(as); + return proxy; + } catch (ServiceNotFoundException exc) { + throw new RuntimeException(exc); + } + } + + /** + * package private helper method that can be called when the current discovery service was finished + * and you want to advance to the next. + * NOTE: This should only be called once for all nodes, i.e., typically from a Kafka consumer + * that has runs on all nodes with the same consumer group ID. + * + * @param dsRunID runID is just used for logging, could be any value + * @param dsID + */ + void advanceToNextDiscoveryService(final AnalysisRequestTracker tracker) { + DiscoveryServiceRequest req = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker); + DiscoveryServiceResponse resp = TrackerUtil.getCurrentDiscoveryServiceStartResponse(tracker); + String dsRunID = "N/A"; + if (resp instanceof DiscoveryServiceAsyncStartResponse) { + dsRunID = ((DiscoveryServiceAsyncStartResponse) resp).getRunId(); + } + String dsID = req.getDiscoveryServiceId(); + + TrackerUtil.moveToNextDiscoveryService(tracker); + DiscoveryServiceRequest nextDSReq = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker); + if (nextDSReq == null) { + logger.log(Level.FINER, "DSWatcher: Run ''{0}'' of DS ''{1}'' was last of request ''{2}'', marking overall request as finished", + new Object[] { dsRunID, dsID, tracker.getRequest().getId() }); + // overall request is finished + tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.FINISHED); + tracker.setStatusDetails("All discovery services ran successfully"); + + // now propagate annotations if configured + logger.log(Level.FINE, "Request is finished, checking for annotation propagation"); + Boolean doPropagation = odf.getSettingsManager().getODFSettings().getEnableAnnotationPropagation(); + if (Boolean.TRUE.equals(doPropagation)) { + TransactionContextExecutor transactionContextExecutor = new ODFInternalFactory().create(TransactionContextExecutor.class); + try { + transactionContextExecutor.runInTransactionContext(new Callable() { + + @Override + public Object call() throws Exception { + AnnotationPropagator ap = odf.getMetadataStore().getAnnotationPropagator(); + if (ap != null) { + logger.log(Level.FINE, "Annotation Propagator exists, running propagation"); + try { + ap.propagateAnnotations(new ODFFactory().create().getAnnotationStore(), tracker.getRequest().getId()); + } catch(Exception exc) { + logger.log(Level.SEVERE, "An unexcepted exception occurred while propagating annotations", exc); + tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.ERROR); + String msg = MessageFormat.format("An unexpected exception occured while propagating annotations: ''{0}''", Utils.getExceptionAsString(exc)); + tracker.setStatusDetails(msg); + } + } + return null; + } + }); + } catch (Exception e) { + // should never happen as exception is handled inside the callable + throw new RuntimeException(e); + } + } + } else { + logger.log(Level.FINER, "DSWatcher: Run ''{0}'' of DS ''{1}'' was not the last of request ''{2}'', moving over to next request", + new Object[] { dsRunID, dsID, tracker.getRequest().getId() }); + tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.IN_DISCOVERY_SERVICE_QUEUE); + queueManager.enqueue(tracker); + } + Utils.setCurrentTimeAsLastModified(tracker); + store.store(tracker); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java new file mode 100755 index 0000000..9b16270 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java @@ -0,0 +1,279 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import java.text.MessageFormat; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; + +/** +* +* Maps a list of {@link AnnotationType} objects to a list of service ids representing concrete discovery +* services that generate the requested annotation types. +* +* Internally, this class generates a list of all possible combinations of discovery services which may be +* used to generate the requested annotation types. The combinations are then assessed and ordered by the +* expected execution effort and the one with the least execution effort is provided. +* +*/ +public class DeclarativeRequestMapper { + + private Logger logger = Logger.getLogger(DeclarativeRequestMapper.class.getName()); + + DiscoveryServiceManager dsManager = new ODFFactory().create().getDiscoveryServiceManager(); + List dsPropList = dsManager.getDiscoveryServicesProperties(); + + private List discoveryServiceSequences = new ArrayList(); + + public DeclarativeRequestMapper(AnalysisRequest request) { + String messageText = "Generating possible discovery service sequences for annotation types {0}."; + logger.log(Level.INFO, MessageFormat.format(messageText, request.getAnnotationTypes())); + + this.discoveryServiceSequences = calculateDiscoveryServiceSequences(request.getAnnotationTypes()); + Collections.sort(this.discoveryServiceSequences, new EffortComparator()); + } + + /** + * + * Represents a single discovery service sequence. + * + */ + public class DiscoveryServiceSequence { + private LinkedHashSet serviceSequence; + + public DiscoveryServiceSequence() { + this.serviceSequence = new LinkedHashSet(); + } + + public DiscoveryServiceSequence(LinkedHashSet serviceIds) { + this.serviceSequence = serviceIds; + } + + public LinkedHashSet getServiceSequence() { + return this.serviceSequence; + } + + public List getServiceSequenceAsList() { + return new ArrayList(this.serviceSequence); + } + + @Override + public boolean equals(Object obj) { + if ((obj == null) || !(obj instanceof DiscoveryServiceSequence)) { + return false; + } + return this.getServiceSequence().equals(((DiscoveryServiceSequence) obj).getServiceSequence()); + } + + // Overriding hashCode method to ensure proper results of equals() method + // (See of http://www.javaranch.com/journal/2002/10/equalhash.html) + @Override + public int hashCode() { + return Utils.joinStrings(new ArrayList(this.serviceSequence), ',').hashCode(); + } + } + + /** + * + * Internal class that estimates the effort for executing a sequence of discovery services. + * Should be extended to take runtime statistics into account. + * + */ + private class EffortComparator implements Comparator { + public int compare(DiscoveryServiceSequence da1, DiscoveryServiceSequence da2) { + if (da1.getServiceSequence().size() < da2.getServiceSequence().size()) { + return -1; + } else if (da1.getServiceSequence().size() > da2.getServiceSequence().size()) { + return 1; + } else { + return 0; + } + } + } + + /** + * Returns the calculated list of discovery service sequences ordered by the execution effort, + * starting with the sequence that is supposed to cause the minimum execution effort. + * + * @return List of discovery service sequences + */ + public List getDiscoveryServiceSequences() { + return this.discoveryServiceSequences; + } + + /** + * Returns recommended discovery service sequence, i.e. the one that is supposed to cause the + * minimum execution effort. + * + * @return Discovery service sequence + */ + public List getRecommendedDiscoveryServiceSequence() { + if (!getDiscoveryServiceSequences().isEmpty()) { + return new ArrayList(this.discoveryServiceSequences.get(0).getServiceSequence()); + } else { + return null; + } + } + + /** + * Remove all discovery service sequences that contain a specific service id. Use this method + * to update the list of discovery service sequences after a specific discovery service has + * failed and should not be used any more. + * + * @param serviceId Id of discovery service to be removed + * @return Discovery service sequence + */ + public boolean removeDiscoveryServiceSequences(String serviceId) { + boolean serviceRemoved = false; + List updatedList = new ArrayList(); + updatedList.addAll(this.discoveryServiceSequences); + for (DiscoveryServiceSequence sequence : this.discoveryServiceSequences) { + if (sequence.getServiceSequence().contains(serviceId)) { + updatedList.remove(sequence); + serviceRemoved = true; + } + } + this.discoveryServiceSequences = updatedList; + return serviceRemoved ? true : false; + } + + /** + * Internal method that determines all possible sequences of discovery services which could be used + * to generate the requested annotation type. Using recursion, all levels of prerequisites are taken + * into account. + * + * @param annotationType Annotation type to be generated + * @return List of discovery service sequences that generate the requested annotation type + */ + private List getDiscoveryServiceSequencesForAnnotationType(String annotationType) { + List result = new ArrayList(); + for (DiscoveryServiceProperties dsProps : this.dsPropList) { + if ((dsProps.getResultingAnnotationTypes() != null) && dsProps.getResultingAnnotationTypes().contains(annotationType)) { + DiscoveryServiceSequence da = new DiscoveryServiceSequence(); + da.getServiceSequence().add(dsProps.getId()); + List discoveryApproachesForService = new ArrayList(); + discoveryApproachesForService.add(da); + + // If there are prerequisite annotation types, also merge their services into the result + if ((dsProps.getPrerequisiteAnnotationTypes() != null) + && !dsProps.getPrerequisiteAnnotationTypes().isEmpty()) { + discoveryApproachesForService = combineDiscoveryServiceSequences( + calculateDiscoveryServiceSequences(dsProps.getPrerequisiteAnnotationTypes()), + discoveryApproachesForService); + ; + } + logger.log(Level.INFO, "Discovery appoaches for annotationType " + annotationType + ":"); + for (DeclarativeRequestMapper.DiscoveryServiceSequence discoveryApproach : discoveryApproachesForService) { + logger.log(Level.INFO, + Utils.joinStrings(new ArrayList(discoveryApproach.getServiceSequence()), ',')); + } + + result.addAll(discoveryApproachesForService); + } + } + return result; + } + + /** + * Internal method that combines two lists of discovery service sequences by generating all possible + * combinations of the entries of both lists. The methods avoids duplicate services in each sequence + * and duplicate sequences in the resulting list. + * + * @param originalSequences Original list of discovery service sequences + * @param additionalSequences Second list discovery service sequences + * @return Combined list of discovery service sequences + */ + private List combineDiscoveryServiceSequences(List originalSequences, List additionalSequences) { + // Example scenario for combining service sequences: + // + // Lets assume a service S that generates two annotation types AT1 and AT2 and S has prerequisite + // annotation type AT_P. There are two services P1 and P2 creating annotation type AT_P. + // The possible service sequences for generating annotation type AT1 are "P1, S" and "P2, S", same for AT2. + // + // When requesting a set of annotation types AT1 and AT2, this will result in the following four combinations + // which contain several redundancies: + // "P1, S, P1, S", "P1, S, P2, S", "P2, S, P1, S", "P2, S, P2, S" + // + // This method uses three ways of removing redundancies: + // + // 1. Given that class DiscoveryServiceSequence internally uses LinkedHashSet, duplicate services are removed from the + // service sequences, resulting in: "P1, S", "P1, S, P2", "P2, S, P1", "P2, S" + // + // 2. Service sequences are only merged if the last service of the additional sequence is not already part of the original + // one which results in: "P1, S", "P1, S", "P2, S", "P2, S" + // + // 3. Duplicate sequences are ignored, resulting in: "P1, S", "P2, S" which is the final result. + + List discoveryApproaches = new ArrayList(); + for (DiscoveryServiceSequence da1 : originalSequences) { + for (DiscoveryServiceSequence da2 : additionalSequences) { + DiscoveryServiceSequence da = new DiscoveryServiceSequence(); + da.getServiceSequence().addAll(da1.getServiceSequence()); + + // Add the second list only if its last serviceId is not already part of the first list + // (Otherwise unnecessary prerequisite services might be added, because the 2nd list may use different ones) + if (!da1.getServiceSequence().contains(da2.getServiceSequenceAsList().get(da2.getServiceSequenceAsList().size() - 1))) { + da.getServiceSequence().addAll(da2.getServiceSequence()); + } + + // Avoid duplicate entries (uses DiscoveryServiceSequence.equals() method) + if (!discoveryApproaches.contains(da)) { + discoveryApproaches.add(da); + } + } + } + return discoveryApproaches; + } + + /** + * Internal method that determines all possible sequences of discovery services which could be used + * to generate a set of requested annotation types. + * + * Each discovery service creates one or multiple annotation types and may have prerequisite annotation types. + * As there may be multiple services creating the same annotation type (maybe by using different prerequisite + * annotation types), this may result in a complex dependencies. Using recursion, this method iterates through + * all the dependencies in order to calculate a list of all possible sequences of discovery services that could + * be used to calculate the requested annotation types. + * + * @param annotationTypes List of annotation types to be generated + * @return List of discovery service sequences that generate the requested annotation types + */ + private List calculateDiscoveryServiceSequences(List annotationTypes) { + List result = null; + + for (String currentType : annotationTypes) { + // Calculate discovery sequences for current annotation type + List additionalDiscoveryApproaches = getDiscoveryServiceSequencesForAnnotationType(currentType); + if (result == null) { + result = additionalDiscoveryApproaches; + } else { + // Merge with discovery sequences determined for the previous annotation types in the list + result = combineDiscoveryServiceSequences(result, additionalDiscoveryApproaches); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java new file mode 100755 index 0000000..20b7661 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java @@ -0,0 +1,478 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.metadata.models.Annotation; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.annotation.AnnotationStoreUtils; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; + +/** + * This class is an in-memory store for both request trackers (showing the status of analysis requests) as well as + * a for annotations. Both trackers and annotations are put on the ODF status queue which + * (a) stores as "semi"-persistent store ("semi" because Kafka's retention mechanism will evantuall delete them), and + * (b) a way to propagate those changes to other ODF nodes. + * The annotations and trackers themselves are stored in memory in static variables. + * + * This is how it works: + * 1. A single consumer thread listens on the status topic + * 2. If an incoming status queue entry is a tracker, it stores it in the in-memory tracker store + * If it is an annotation, it stores it in the in-memory annotation store + * 3. Queries for trackers and annotations only go against the in-memory stores + * 4. When a check for overaged entries occurs (a check that removes trackers form the store which are older than the queue retention time) + * the annotations for overaged and finished requests are also deleted (see removeOveragedEntries()) + * + * + * + */ +public class DefaultStatusQueueStore implements AnalysisRequestTrackerStore, AnnotationStore { + + static Logger logger = Logger.getLogger(DefaultStatusQueueStore.class.getName()); + + public static final long IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS = 5000; + + static Object globalRequestStoreMapLock = new Object(); + + /* + * http://docs.oracle.com/javase/7/docs/api/java/util/LinkedHashMap.html + * + * A structural modification is any operation that adds or deletes one or more mappings or, in the case of access-ordered linked hash maps, affects iteration order. + * In insertion-ordered linked hash maps, merely changing the value associated with a key that is already contained in the map is not a structural modification. + * In access-ordered linked hash maps, merely querying the map with get is a structural modification.) + */ + static LinkedHashMap globalRequestStoreMap = new LinkedHashMap(); + + /* + * This map is only used to track if storing an object was successful + * + */ + static ConcurrentHashMap globalStoreSuccessMap = new ConcurrentHashMap(); + + private String analysisRun; + + // simplest implementation for now: just keep a simple list + private static List storedAnnotations = new LinkedList<>(); + private static Object storedAnnotationsLock = new Object(); + + /** + * This processor reads trackers from the queue and stores it in the globalRequestStoreMap. + * The thread for this processor is created in the QueueManager implementation. + * + */ + public static class StatusQueueProcessor implements QueueMessageProcessor { + Logger logger = Logger.getLogger(StatusQueueProcessor.class.getName()); + + @Override + public void process(ExecutorService executorService, String message, int partition, long offset) { + StatusQueueEntry sqe = new StatusQueueEntry(); + try { + sqe = JSONUtils.fromJSON(message, StatusQueueEntry.class); + } catch (Exception e) { + logger.log(Level.WARNING, "Entry in status queue could not be processed", e); + } + + // first handle trackers and / or initial cleanup + synchronized (globalRequestStoreMapLock) { + if (sqe.getAnalysisRequestTracker() != null) { + try { + AnalysisRequestTracker tracker = sqe.getAnalysisRequestTracker(); + String requestID = tracker.getRequest().getId(); + logger.log(Level.FINEST, "Store status queue: found tracker with id ''{0}'', tracker: {1}", new Object[] { requestID, message }); + if (tracker.getStatus() == STATUS.FINISHED) { + logger.log(Level.INFO, "Request with id ''{0}'' is finished, result: {1}", new Object[] { requestID, message }); + } + //remove item so that it is added to the end of the list. + if (globalRequestStoreMap.containsKey(requestID)) { + globalRequestStoreMap.remove(requestID); + } + + globalRequestStoreMap.put(requestID, tracker); + if (tracker != null && tracker.getRevisionId() != null) { + globalStoreSuccessMap.put(tracker.getRevisionId(), true); + } + + } catch (Exception e) { + logger.log(Level.WARNING, "Tracker entry in status queue could not be processed", e); + } + } + } + + if (sqe.getAnnotation() != null) { + Annotation annot = sqe.getAnnotation(); + logger.log(Level.FINEST, "Received annotationk over status queue: ''{0}''", annot.getReference().getId()); + synchronized (storedAnnotationsLock) { + storedAnnotations.add(annot); + globalStoreSuccessMap.put(annot.getReference().getId(), true); + } + } + + removeOveragedEntries(); + } + + } + + ///////////////////////////////////////////// + // AnalysisRequestTrackerStore interface implementation + + + /* + * This store uses the lastModified timestamp to remove overaged trackers. + * Therefore, the lastModified timestamp MUST be set before storing anything and prevent unwanted removal + */ + @Override + public void store(AnalysisRequestTracker tracker) { + String id = tracker.getRequest().getId(); + logger.fine("Store " + id + " in trackerStore"); + + String revId = UUID.randomUUID() + "_" + System.currentTimeMillis(); + tracker.setRevisionId(revId); + globalStoreSuccessMap.put(revId, false); + + ODFInternalFactory factory = new ODFInternalFactory(); + DiscoveryServiceQueueManager qm = factory.create(DiscoveryServiceQueueManager.class); + // put the tracker onto the status queue, the actual map that is used in query() is filled by the ARTProcessor listening on the status queue + StatusQueueEntry sqe = new StatusQueueEntry(); + sqe.setAnalysisRequestTracker(tracker); + qm.enqueueInStatusQueue(sqe); + waitUntilEntryArrives(revId); + } + + private void waitUntilEntryArrives(String entryId) { + boolean found = false; + int maxNumWaits = 1500; + int sleepMS = 20; + while (maxNumWaits > 0) { + final Boolean storageSuccess = globalStoreSuccessMap.get(entryId); + if (storageSuccess != null && storageSuccess == true) { + found = true; + globalStoreSuccessMap.remove(entryId); + break; + } + try { + Thread.sleep(sleepMS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + maxNumWaits--; + } + if(!found){ + final String message = "The tracker could not be stored in 30 sec!"; + logger.warning(message); + throw new RuntimeException(message); + }else{ + logger.fine("Tracker stored after " + ((1500 - maxNumWaits) * sleepMS) + " ms"); + } + } + + @Override + public AnalysisRequestTracker query(String analysisRequestId) { + logger.fine("Querying store for " + analysisRequestId); + synchronized (globalRequestStoreMapLock) { + AnalysisRequestTracker tracker = globalRequestStoreMap.get(analysisRequestId); + return tracker; + } + } + + @Override + public void clearCache() { + logger.fine("Clearing store cache"); + synchronized (globalRequestStoreMapLock) { + globalRequestStoreMap.clear(); + } + } + + private static void removeOveragedEntries(){ + Set finishedRequests = new HashSet<>(); + logger.fine("Removing overaged entries from store"); + synchronized (globalRequestStoreMapLock) { + Iterator> entryIterator = globalRequestStoreMap.entrySet().iterator(); + long maxRetentionMS = new ODFFactory().create().getSettingsManager().getODFSettings().getMessagingConfiguration().getAnalysisRequestRetentionMs(); + long currentTimeMS = System.currentTimeMillis(); + while(entryIterator.hasNext()){ + Entry entry = entryIterator.next(); + AnalysisRequestTracker tracker = entry.getValue(); + if(currentTimeMS - tracker.getLastModified() >= maxRetentionMS){ + if (tracker.getStatus() == STATUS.FINISHED || tracker.getStatus() == STATUS.ERROR) { + finishedRequests.add(tracker.getRequest().getId()); + } + entryIterator.remove(); + logger.log(Level.INFO, "Removed overaged status tracker with id ''{0}''", new Object[] { entry.getKey() }); + }else{ + /* + * items in a linkedHashMap are ordered in the way they were put into the map. + * Because of this, if one item is not overaged, all following won't be either + */ + break; + } + } + } + synchronized (storedAnnotationsLock) { + ListIterator it = storedAnnotations.listIterator(); + while (it.hasNext()) { + Annotation annot = it.next(); + if (finishedRequests.contains(annot.getAnalysisRun())) { + it.remove(); + } + } + } + } + + @Override + public int getSize() { + synchronized (globalRequestStoreMapLock) { + return globalRequestStoreMap.keySet().size(); + } + } + + @Override + public AnalysisRequestTracker findSimilarQueuedRequest(AnalysisRequest request) { + synchronized (globalRequestStoreMapLock) { + for (AnalysisRequestTracker tracker : globalRequestStoreMap.values()) { + long startedAfterLimit = System.currentTimeMillis() - IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS; + if (TrackerUtil.isAnalysisWaiting(tracker) || + (tracker.getNextDiscoveryServiceRequest() == 0 && tracker.getStatus() == STATUS.DISCOVERY_SERVICE_RUNNING && tracker.getLastModified() >= startedAfterLimit)) { + AnalysisRequest otherRequest = tracker.getRequest(); + List dataSets = request.getDataSets(); + List otherDataSets = otherRequest.getDataSets(); + + if (otherDataSets.containsAll(dataSets) && tracker.getDiscoveryServiceRequests().get(0).getDiscoveryServiceId().equals( + request.getDiscoveryServiceSequence().get(0))) { + logger.log(Level.FINEST, "Found similar request for request {0}", new Object[] { request.getId()}); + return tracker; + } + } + } + return null; + } + } + + + @Override + public List getRecentTrackers(int offset, int limit) { + if (offset < 0) { + throw new RuntimeException("Offset parameter cannot be negative."); + } + if (limit < -1) { + throw new RuntimeException("Limit parameter cannot be smaller than -1."); + } + synchronized (globalRequestStoreMapLock) { + List arsList = new ArrayList<>(); + Iterator> it = globalRequestStoreMap.entrySet().iterator(); + // filter out health check requests + while (it.hasNext()) { + AnalysisRequestTracker t = it.next().getValue(); + if (!t.getRequest().getDataSets().get(0).getId().startsWith(ControlCenter.HEALTH_TEST_DATA_SET_ID_PREFIX)) { + arsList.add(t); + } + } + // now pick number many requests from the end + List result = new ArrayList<>(); + if (arsList.size() > offset) { + int startIndex = arsList.size() - offset - limit; + if (limit == -1 || startIndex < 0) { + startIndex = 0; + } + int endIndex = arsList.size() - offset - 1; + if (endIndex < 0) { + endIndex = 0; + } + for (int i=endIndex ; i>=startIndex; i--) { + result.add(arsList.get(i)); + } + } + return result; + } + } + + @Override + public AnalysisRequestSummary getRequestSummary() { + synchronized (globalRequestStoreMapLock) { + try { + List recentTrackers = this.getRecentTrackers(0, -1); + int totalSuccess = 0; + int totalFailure = 0; + + for (AnalysisRequestTracker tracker : recentTrackers) { + if (STATUS.FINISHED.equals(tracker.getStatus())) { + totalSuccess++; + } else if (STATUS.ERROR.equals(tracker.getStatus())) { + totalFailure++; + } + } + return new AnalysisRequestSummary(totalSuccess, totalFailure); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + } + } + + ///////////////////////////////////////////// + // AnnotationStore interface implementation + + @Override + public Properties getProperties() { + Properties props = new Properties(); + props.put(STORE_PROPERTY_TYPE, "DefaultAnnotationStore"); + props.put(STORE_PROPERTY_ID, getRepositoryId()); + props.put(STORE_PROPERTY_DESCRIPTION, "A default in-memory implementation of the annotation store storing its results via Kafka"); + return props; + } + + @Override + public String getRepositoryId() { + return "ODFDefaultAnnotationStore"; + } + + @Override + public ConnectionStatus testConnection() { + return ConnectionStatus.OK; + } + + @Override + public MetaDataObjectReference store(Annotation annotation) { + // clone object + try { + annotation = JSONUtils.cloneJSONObject(annotation); + } catch (JSONException e) { + logger.log(Level.SEVERE, "Annotation could not be stored because JSON conversion failed.", e); + throw new RuntimeException(e); + } + + // create a new reference + String annotId = "Annot" + UUID.randomUUID() + "_" + System.currentTimeMillis(); + logger.log(Level.FINEST, "Storing annotation with ID ''{0}''", annotId); + MetaDataObjectReference ref = new MetaDataObjectReference(); + ref.setId(annotId); + ref.setRepositoryId(getRepositoryId()); + annotation.setReference(ref); + if (analysisRun != null) { + annotation.setAnalysisRun(analysisRun); + } + + // re-use mechanism from status queue to wait until message has arrived via Kafka + globalStoreSuccessMap.put(annotId, false); + DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class); + StatusQueueEntry sqe = new StatusQueueEntry(); + sqe.setAnnotation(annotation); + qm.enqueueInStatusQueue(sqe); + waitUntilEntryArrives(annotId); + return ref; + } + + @Override + public List getAnnotations(MetaDataObjectReference object, String analysisRequestId) { + List results = new ArrayList<>(); + synchronized (storedAnnotationsLock) { + logger.log(Level.FINEST, "Number of annotations stored: ''{0}''", storedAnnotations.size()); + ListIterator it = storedAnnotations.listIterator(); + while (it.hasNext()) { + Annotation annot = it.next(); + boolean match = true; + if (object != null) { + match = match && object.equals(AnnotationStoreUtils.getAnnotatedObject(annot)); + } + if (annot.getAnalysisRun() != null) { + // analysisRun is not set for health check and for some of the tests + if (analysisRequestId != null) { + match &= annot.getAnalysisRun().equals(analysisRequestId); + } + } + if (match) { + results.add(annot); + } + } + } + logger.log(Level.FINEST, "Number of annotations found for request Id ''{0}'': ''{1}''", new Object[]{analysisRequestId, results.size()}); + return results; + } + + @Override + public void setAnalysisRun(String analysisRun) { + this.analysisRun = analysisRun; + } + + @Override + public String getAnalysisRun() { + return this.analysisRun; + } + + @Override + public Annotation retrieveAnnotation(MetaDataObjectReference ref) { + synchronized (storedAnnotationsLock) { + logger.log(Level.FINEST, "Number of annotations stored: ''{0}''", storedAnnotations.size()); + ListIterator it = storedAnnotations.listIterator(); + while (it.hasNext()) { + Annotation annot = it.next(); + if (annot.getReference().equals(ref)) { + return annot; + } + } + } + return null; + } + + @Override + public void setStatusOfOldRequest(long cutOffTimestamp, STATUS status, String detailsMessage) { + synchronized (globalRequestStoreMapLock) { + DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class); + for (AnalysisRequestTracker tracker : globalRequestStoreMap.values()) { + if (tracker.getLastModified() < cutOffTimestamp // + && (STATUS.DISCOVERY_SERVICE_RUNNING.equals(tracker.getStatus()) // + || STATUS.IN_DISCOVERY_SERVICE_QUEUE.equals(tracker.getStatus()) // + || STATUS.INITIALIZED.equals(tracker.getStatus()) // + )) { + // set the tracker in-memory to have the result available immediately + tracker.setStatus(status); + if (detailsMessage == null) { + detailsMessage = "Setting request to " + status + " because it was last modified before " + new Date(cutOffTimestamp); + } + tracker.setStatusDetails(detailsMessage); + // put tracker onto queue + StatusQueueEntry sqe = new StatusQueueEntry(); + sqe.setAnalysisRequestTracker(tracker); + qm.enqueueInStatusQueue(sqe); + } + } + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java new file mode 100755 index 0000000..0ea909f --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java @@ -0,0 +1,276 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.lang.Thread.State; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.engine.ThreadStatus; + +public class DefaultThreadManager implements ThreadManager { + + private Logger logger = Logger.getLogger(DefaultThreadManager.class.getName()); + + static Object unmanagedThreadLock = new Object(); + static Map unmanagedThreadMap = new HashMap(); + static Map unmanagedThreadRunnableMap = new HashMap(); + + ExecutorService executorService; + + public DefaultThreadManager() { + } + + private boolean isThreadRunning(Thread thread) { + return thread.getState() != State.TERMINATED; + } + + private void purgeTerminatedThreads() { + List entriesToBeRemoved = new ArrayList(); + List entriesToBeKept = new ArrayList(); + for (Map.Entry entry : unmanagedThreadMap.entrySet()) { + if (!isThreadRunning(entry.getValue())) { + entriesToBeRemoved.add(entry.getKey()); + } else { + entriesToBeKept.add(entry.getKey()); + } + } + for (String id : entriesToBeRemoved) { + unmanagedThreadMap.remove(id); + unmanagedThreadRunnableMap.remove(id); + } + logger.finer("Removed finished threads: " + entriesToBeRemoved.toString()); + logger.finer("Kept unfinished threads: " + entriesToBeKept.toString()); + } + + @Override + public ThreadStartupResult startUnmanagedThread(final String id, final ODFRunnable runnable) { + ThreadStartupResult result = new ThreadStartupResult(id) { + @Override + public boolean isReady() { + synchronized (unmanagedThreadLock) { + if (unmanagedThreadRunnableMap.containsKey(id)) { + return unmanagedThreadRunnableMap.get(id).isReady(); + } + } + return false; + } + }; + synchronized (unmanagedThreadLock) { + purgeTerminatedThreads(); + Thread t = unmanagedThreadMap.get(id); + if (t != null) { + if (isThreadRunning(t)) { + return result; + } + } + runnable.setExecutorService(executorService); + + Thread newThread = new Thread(runnable); + result.setNewThreadCreated(true); + newThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + logger.log(Level.WARNING, "Uncaught exception in thread " + id + " - Thread will shutdown!", throwable); + synchronized (unmanagedThreadLock) { + purgeTerminatedThreads(); + } + } + }); + + newThread.setDaemon(true); // TODO is it a daemon? + newThread.start(); + unmanagedThreadMap.put(id, newThread); + unmanagedThreadRunnableMap.put(id, runnable); + } + return result; + } + + @Override + public ThreadStatus.ThreadState getStateOfUnmanagedThread(String id) { + synchronized (unmanagedThreadLock) { + Thread t = unmanagedThreadMap.get(id); + if (t == null) { + return ThreadStatus.ThreadState.NON_EXISTENT; + } + Thread.State ts = t.getState(); + switch (ts) { + case TERMINATED: + return ThreadStatus.ThreadState.FINISHED; + default: + return ThreadStatus.ThreadState.RUNNING; + } + } + } + + + + @Override + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public void shutdownAllUnmanagedThreads() { + synchronized (unmanagedThreadLock) { + logger.log(Level.INFO, "Shutting down all ODF threads..."); + for (String id : unmanagedThreadMap.keySet()) { + shutdownThreadImpl(id, false); + } + unmanagedThreadMap.clear(); + unmanagedThreadRunnableMap.clear(); + logger.log(Level.INFO, "All ODF threads shutdown"); + purgeTerminatedThreads(); + } + } + + public void shutdownThreads(List names) { + synchronized (unmanagedThreadLock) { + for (String name : names) { + shutdownThreadImpl(name, true); + } + } + } + + private void shutdownThreadImpl(String id, boolean purge) { + Thread t = unmanagedThreadMap.get(id); + if (t == null) { + return; + } + ODFRunnable r = unmanagedThreadRunnableMap.get(id); + r.cancel(); + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + int max = 60; + while (t.getState() != Thread.State.TERMINATED) { + if (max == 0) { + break; + } + max--; + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // do nothing + e.printStackTrace(); + } + } + if (max == 0) { + logger.log(Level.WARNING, "Thread {0} did not stop on its own, must be interrupted.", id); + t.interrupt(); + } + if (purge) { + purgeTerminatedThreads(); + } + } + + @Override + public int getNumberOfRunningThreads() { + synchronized (unmanagedThreadLock) { + int result = 0; + for (Thread t : unmanagedThreadMap.values()) { + if (isThreadRunning(t)) { + result++; + } + } + return result; + } + } + + @Override + public List getThreadManagerStatus() { + synchronized (unmanagedThreadLock) { + List result = new ArrayList(); + for (Entry entry : unmanagedThreadMap.entrySet()) { + ThreadStatus status = new ThreadStatus(); + status.setId(entry.getKey()); + status.setState(getStateOfUnmanagedThread(entry.getKey())); + ODFRunnable odfRunnable = unmanagedThreadRunnableMap.get(entry.getKey()); + if (odfRunnable != null) { + status.setType(odfRunnable.getClass().getName()); + } + result.add(status); + } + + return result; + } + } + + @Override + public void waitForThreadsToBeReady(long waitingLimitMs, List startedThreads) throws TimeoutException { + Set threadsToWaitFor = new HashSet(); + for (ThreadStartupResult res : startedThreads) { + //Only if a new thread was created we wait for it to be ready. + if (res.isNewThreadCreated()) { + threadsToWaitFor.add(res.getThreadId()); + } + } + if (threadsToWaitFor.isEmpty()) { + return; + } + + final int msToWait = 200; + final long maxPolls = waitingLimitMs / msToWait; + int count = 0; + while (threadsToWaitFor.size() > 0 && count < maxPolls) { + List ready = new ArrayList(); + List notReady = new ArrayList(); + for (ThreadStartupResult thr : startedThreads) { + if (thr.isReady()) { + ready.add(thr.getThreadId()); + threadsToWaitFor.remove(thr.getThreadId()); + } else { + notReady.add(thr.getThreadId()); + } + } + + logger.fine("Ready: " + ready); + logger.fine("NotReady: " + notReady); + + try { + Thread.sleep(msToWait); + } catch (InterruptedException e) { + e.printStackTrace(); + } + count++; + } + if (count >= maxPolls) { + String msg = "Threads: " + threadsToWaitFor + "' are not ready yet after " + waitingLimitMs + " ms, give up to wait for it"; + logger.log(Level.WARNING, msg); + throw new TimeoutException(msg); + } + + logger.fine("All threads ready after " + (count * msToWait) + "ms"); + } + + @Override + public ODFRunnable getRunnable(String name) { + synchronized (unmanagedThreadLock) { + return unmanagedThreadRunnableMap.get(name); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java new file mode 100755 index 0000000..0f79e0c --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java @@ -0,0 +1,29 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.Callable; + +/** + * The default TransactionContextExecutor runs code in the same thread as the caller. + * + */ +public class DefaultTransactionContextExecutor implements TransactionContextExecutor { + + @Override + public Object runInTransactionContext(Callable callable) throws Exception { + return callable.call(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java new file mode 100755 index 0000000..dbfb597 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java @@ -0,0 +1,303 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore; +import org.apache.atlas.odf.api.metadata.models.MetaDataObject; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.annotation.InternalAnnotationStoreUtils; +import org.apache.atlas.odf.json.JSONUtils; + +/** + * This class processes the entries of a discovery service queue and runs its respective discovery services in a separate thread. + * + */ +public class DiscoveryServiceStarter implements QueueMessageProcessor { + + private Logger logger = Logger.getLogger(DiscoveryServiceStarter.class.getName()); + + AnalysisRequestTrackerStore trackerStore = null; + ControlCenter controlCenter = null; + Environment environment = null; + + /** + * parameters must be a three element String[] containing the DiscoveryServiceRequest, the partition number (int) and the offset (long). + */ + public DiscoveryServiceStarter() { + ODFInternalFactory factory = new ODFInternalFactory(); + trackerStore = factory.create(AnalysisRequestTrackerStore.class); + controlCenter = factory.create(ControlCenter.class); + environment = factory.create(Environment.class); + } + + private DiscoveryServiceRequest cloneDSRequestAndAddServiceProps(DiscoveryServiceRequest request, boolean requiresMetaDataCache) throws JSONException { + DiscoveryServiceRequest clonedRequest = JSONUtils.cloneJSONObject(request); + Map additionalProps = clonedRequest.getAdditionalProperties(); + if (additionalProps == null) { + additionalProps = new HashMap<>(); + clonedRequest.setAdditionalProperties(additionalProps); + } + // add service specific properties + String id = request.getDiscoveryServiceId(); + Map serviceProps = environment.getPropertiesWithPrefix(id); + additionalProps.putAll(serviceProps); + + // add cached metadata objects to request if required + if (requiresMetaDataCache) { + MetaDataObject mdo = request.getDataSetContainer().getDataSet(); + MetadataStore mds = new ODFInternalFactory().create(MetadataStore.class); + clonedRequest.getDataSetContainer().setMetaDataCache(CachedMetadataStore.retrieveMetaDataCache(mds, mdo)); + } + + return clonedRequest; + } + + + /** + * starts the service taken from the service runtime topic. + */ + public void process(ExecutorService executorService, String message, int partition, long offset) { + AnalysisRequestTracker tracker = null; + try { + tracker = JSONUtils.fromJSON(message, AnalysisRequestTracker.class); + logger.log(Level.FINEST, "DSStarter: received tracker {0}", JSONUtils.lazyJSONSerializer(tracker)); + // load tracker from store and check if it was cancelled in the meantime + AnalysisRequestTracker storedRequest = trackerStore.query(tracker.getRequest().getId()); + + if (storedRequest == null || storedRequest.getStatus() != STATUS.CANCELLED) { + // set tracker to running + tracker.setStatus(STATUS.DISCOVERY_SERVICE_RUNNING); + trackerStore.store(tracker); + + DiscoveryServiceRequest nextRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker); + if (nextRequest == null) { + logger.log(Level.WARNING, "Request in queue has wrong format"); + tracker.setStatus(STATUS.ERROR); + } else { + nextRequest.setTakenFromRequestQueue(System.currentTimeMillis()); + trackerStore.store(tracker); + String dsID = nextRequest.getDiscoveryServiceId(); + SyncDiscoveryService nextService = ControlCenter.getDiscoveryServiceProxy(dsID, tracker.getRequest()); + if (nextService == null) { + logger.log(Level.WARNING, "Discovery Service ''{0}'' could not be created", dsID); + throw new DiscoveryServiceUnreachableException("Java proxy for service with id " + dsID + " could not be created"); + } else { + DataSetContainer ds = nextRequest.getDataSetContainer(); + DataSetCheckResult checkResult = nextService.checkDataSet(ds); + if (checkResult.getDataAccess() == DataSetCheckResult.DataAccess.NotPossible) { + String responseDetails = ""; + if (checkResult.getDetails() != null) { + responseDetails = " Reason: " + checkResult.getDetails(); + } + if (tracker.getRequest().isIgnoreDataSetCheck()) { + String msg = MessageFormat.format("Discovery service ''{0}'' cannot process data set ''{1}''.{2} - Ignoring and advancing to next service", + new Object[]{dsID, ds.getDataSet().getReference(), responseDetails}); + logger.log(Level.INFO, msg); + // check for next queue + DiscoveryServiceSyncResponse dummyResponse = new DiscoveryServiceSyncResponse(); + dummyResponse.setCode(DiscoveryServiceResponse.ResponseCode.OK); + dummyResponse.setDetails(msg); + TrackerUtil.addDiscoveryServiceStartResponse(tracker, dummyResponse); + controlCenter.advanceToNextDiscoveryService(tracker); + } else { + tracker.setStatus(STATUS.ERROR); + String msg = MessageFormat.format("Discovery service ''{0}'' cannot process data set ''{1}''.{2}", + new Object[]{dsID, ds.getDataSet().getReference(), responseDetails}); + tracker.setStatusDetails(msg); + logger.log(Level.WARNING, msg); + } + } else { + nextService.setExecutorService(executorService); + runServiceInBackground(executorService, tracker, nextRequest, nextService); + } + } + } + } + } catch (DiscoveryServiceUnreachableException exc) { + logger.log(Level.WARNING, "Discovery service could not be started because it is unreachable", exc); + if (tracker != null) { + tracker.setStatus(STATUS.ERROR); + tracker.setStatusDetails(exc.getReason()); + } + } catch (Throwable exc) { + logger.log(Level.WARNING, "An error occurred when starting the discovery service", exc); + if (tracker != null) { + tracker.setStatus(STATUS.ERROR); + tracker.setStatusDetails(Utils.getExceptionAsString(exc)); + } + } + updateTracker(tracker); + } + + + class ServiceRunner implements ODFRunnable { + AnalysisRequestTracker tracker; + DiscoveryServiceRequest nextRequest; + SyncDiscoveryService nextService; + + public ServiceRunner(AnalysisRequestTracker tracker, DiscoveryServiceRequest nextRequest, SyncDiscoveryService nextService) { + super(); + this.tracker = tracker; + this.nextRequest = nextRequest; + this.nextService = nextService; + } + + @Override + public void run() { + try { + runService(tracker, nextRequest, nextService); + } catch (Throwable exc) { + logger.log(Level.WARNING, "An error occurred when running the discovery service", exc); + if (tracker != null) { + tracker.setStatus(STATUS.ERROR); + tracker.setStatusDetails(Utils.getExceptionAsString(exc)); + } + } + updateTracker(tracker); + } + + @Override + public void setExecutorService(ExecutorService service) { + + } + + @Override + public boolean isReady() { + return true; + } + + @Override + public void cancel() { + } + + } + + + private void runServiceInBackground(ExecutorService executorService, final AnalysisRequestTracker tracker, final DiscoveryServiceRequest nextRequest, final SyncDiscoveryService nextService) throws JSONException { + String suffix = nextRequest.getDiscoveryServiceId() + "_" + nextRequest.getOdfRequestId() + UUID.randomUUID().toString(); + String runnerId = "DSRunner_" + suffix; + ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class); + ServiceRunner serviceRunner = new ServiceRunner(tracker, nextRequest, nextService); + tm.setExecutorService(executorService); + tm.startUnmanagedThread(runnerId, serviceRunner); + } + + private void runService(AnalysisRequestTracker tracker, DiscoveryServiceRequest nextRequest, SyncDiscoveryService nextService) throws JSONException { + DiscoveryServiceResponse response = null; + String dsID = nextRequest.getDiscoveryServiceId(); + boolean requiresAuxObjects = controlCenter.requiresMetaDataCache(nextService); + if (nextService instanceof SyncDiscoveryService) { + SyncDiscoveryService nextServiceSync = (SyncDiscoveryService) nextService; + logger.log(Level.FINER, "Starting synchronous analysis on service {0}", dsID); + DiscoveryServiceSyncResponse syncResponse = nextServiceSync.runAnalysis(cloneDSRequestAndAddServiceProps(nextRequest, requiresAuxObjects)); + nextRequest.setFinishedProcessing(System.currentTimeMillis()); + //Even if the analysis was concurrently cancelled we store the results since the service implementation could do this by itself either way. + long before = System.currentTimeMillis(); + InternalAnnotationStoreUtils.storeDiscoveryServiceResult(syncResponse.getResult(), tracker.getRequest()); + nextRequest.setTimeSpentStoringResults(System.currentTimeMillis() - before); + // remove result to reduce size of response + syncResponse.setResult(null); + response = syncResponse; + } else { + throw new RuntimeException("Unknown Java proxy created for service with id " + dsID); + } + + // process response + if (response.getCode() == null) { + response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR); + String origDetails = response.getDetails(); + response.setDetails(MessageFormat.format("Discovery service did not return a response code. Assuming error. Original message: {0}", origDetails)); + } + switch (response.getCode()) { + case UNKNOWN_ERROR: + TrackerUtil.addDiscoveryServiceStartResponse(tracker, response); + tracker.setStatus(STATUS.ERROR); + tracker.setStatusDetails(response.getDetails()); + logger.log(Level.WARNING, "Discovery Service ''{2}'' responded with an unknown error ''{0}'', ''{1}''", new Object[] { response.getCode().name(), + response.getDetails(), dsID }); + break; + case NOT_AUTHORIZED: + TrackerUtil.addDiscoveryServiceStartResponse(tracker, response); + tracker.setStatus(STATUS.ERROR); + tracker.setStatusDetails(response.getDetails()); + logger.log(Level.WARNING, "Discovery Service ''{2}'' responded with an unauthorized ''{0}'', ''{1}''", new Object[] { response.getCode().name(), + response.getDetails(), dsID }); + break; + case TEMPORARILY_UNAVAILABLE: + tracker.setStatus(STATUS.IN_DISCOVERY_SERVICE_QUEUE); + logger.log(Level.INFO, "Discovery Service ''{2}'' responded that it is unavailable right now ''{0}'', ''{1}''", new Object[] { + response.getCode().name(), response.getDetails(), dsID }); + // reqeue and finish immediately + controlCenter.getQueueManager().enqueue(tracker); + return; + case OK: + TrackerUtil.addDiscoveryServiceStartResponse(tracker, response); + logger.log(Level.FINER, "Synchronous Discovery Service processed request ''{0}'', ''{1}''", new Object[] { response.getCode().name(), response.getDetails() }); + AnalysisRequestTracker storedTracker = trackerStore.query(tracker.getRequest().getId()); + //A user could've cancelled the analysis concurrently. In this case, ignore the response and don't overwrite the tracker + if (storedTracker != null && storedTracker.getStatus() != STATUS.CANCELLED) { + // check for next queue + controlCenter.advanceToNextDiscoveryService(tracker); + } else { + logger.log(Level.FINER, "Not advancing analysis request because it was cancelled!"); + } + break; + default: + tracker.setStatus(STATUS.ERROR); + tracker.setStatusDetails(response.getDetails()); + logger.log(Level.WARNING, "Discovery Service ''{2}'' responded with an unknown response ''{0}'', ''{1}''", new Object[] { + response.getCode().name(), response.getDetails(), dsID }); + break; + } + } + + private boolean updateTracker(AnalysisRequestTracker tracker) { + boolean cancelled = false; + if (tracker != null) { + AnalysisRequestTracker storedTracker = trackerStore.query(tracker.getRequest().getId()); + //A user could've cancelled the analysis concurrently. In this case, ignore the response and don't overwrite the tracker + if (storedTracker == null || (! STATUS.CANCELLED.equals(storedTracker.getStatus())) ) { + Utils.setCurrentTimeAsLastModified(tracker); + trackerStore.store(tracker); + } else { + cancelled = true; + logger.log(Level.FINER, "Not storing analysis tracker changes because it was cancelled!"); + } + } + return cancelled; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java new file mode 100755 index 0000000..38e0747 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java @@ -0,0 +1,30 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +public class DiscoveryServiceUnreachableException extends RuntimeException { + + private static final long serialVersionUID = 3581149213306073675L; + + private String reason; + + public DiscoveryServiceUnreachableException(String reason) { + super(reason); + this.reason = reason; + } + + public String getReason() { + return reason; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java new file mode 100755 index 0000000..4cba0f6 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java @@ -0,0 +1,33 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class ExecutorServiceFactory { + + static Object execServiceLock = new Object(); + static ExecutorService executorService = null; + + public ExecutorService createExecutorService() { + synchronized (execServiceLock) { + if (executorService == null) { + executorService = Executors.newCachedThreadPool(); + } + } + return executorService; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java new file mode 100755 index 0000000..848a673 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java @@ -0,0 +1,73 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.api.discoveryservice.*; + +public class HealthCheckServiceRuntime implements ServiceRuntime { + public static final String HEALTH_CHECK_RUNTIME_NAME = "HealthCheck"; + + @Override + public String getName() { + return HEALTH_CHECK_RUNTIME_NAME; + } + + @Override + public long getWaitTimeUntilAvailable() { + return 0; + } + + @Override + public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) { + return new SyncDiscoveryServiceBase() { + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse(); + response.setCode(DiscoveryServiceResponse.ResponseCode.OK); + response.setDetails("Health check service finished successfully"); + return response; + } + }; + } + + public static DiscoveryServiceProperties getHealthCheckServiceProperties() { + DiscoveryServiceProperties props = new DiscoveryServiceProperties(); + props.setId(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID); + props.setDescription("Health check service"); + + DiscoveryServiceEndpoint ep = new DiscoveryServiceEndpoint(); + ep.setRuntimeName(HEALTH_CHECK_RUNTIME_NAME); + + props.setEndpoint(ep); + return props; + } + + @Override + public String getDescription() { + return "Internal runtime dedicated to health checks"; + } + + @Override + public void validate(DiscoveryServiceProperties props) throws ValidationException { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java new file mode 100755 index 0000000..61a29b1 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java @@ -0,0 +1,87 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.api.settings.validation.ImplementationValidator; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService; +import org.apache.atlas.odf.core.Utils; + +public class JavaServiceRuntime implements ServiceRuntime { + + Logger logger = Logger.getLogger(JavaServiceRuntime.class.getName()); + + public static final String NAME = "Java"; + + @Override + public String getName() { + return NAME; + } + + @Override + public long getWaitTimeUntilAvailable() { + // for now, always run + return 0; + } + + @Override + public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) { + DiscoveryService service = null; + String className = null; + try { + className = JSONUtils.convert(props.getEndpoint(), DiscoveryServiceJavaEndpoint.class).getClassName(); + Class clazz = Class.forName(className); + Object o = clazz.newInstance(); + service = (DiscoveryService) o; + } catch (Exception e) { + logger.log(Level.FINE, "An error occurred while instatiating Java implementation", e); + logger.log(Level.WARNING, "Java implementation ''{0}'' for discovery service ''{1}'' could not be instantiated (internal error: ''{2}'')", + new Object[] { className, props.getId(), e.getMessage() }); + return null; + } + if (service instanceof SyncDiscoveryService) { + return new TransactionSyncDiscoveryServiceProxy((SyncDiscoveryService) service); + } else if (service instanceof AsyncDiscoveryService) { + return new TransactionAsyncDiscoveryServiceProxy((AsyncDiscoveryService) service); + } + return service; + } + + @Override + public String getDescription() { + return "The default Java runtime"; + } + + @Override + public void validate(DiscoveryServiceProperties props) throws ValidationException { + DiscoveryServiceJavaEndpoint javaEP; + try { + javaEP = JSONUtils.convert(props.getEndpoint(), DiscoveryServiceJavaEndpoint.class); + } catch (JSONException e) { + throw new ValidationException("Endpoint definition for Java service is not correct: " + Utils.getExceptionAsString(e)); + } + new ImplementationValidator().validate("Service.endpoint", javaEP.getClassName()); + } + +}