Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4864F200B8A for ; Sat, 24 Sep 2016 16:06:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 46F60160AD1; Sat, 24 Sep 2016 14:06:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E8B83160ABE for ; Sat, 24 Sep 2016 16:06:10 +0200 (CEST) Received: (qmail 80857 invoked by uid 500); 24 Sep 2016 14:06:10 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 80846 invoked by uid 99); 24 Sep 2016 14:06:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Sep 2016 14:06:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A17A7DFF56; Sat, 24 Sep 2016 14:06:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lahiru@apache.org To: commits@airavata.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: airavata git commit: [AIRAVATA-2065] Introduce more robust logging mechanism for Airavata [Forced Update!] Date: Sat, 24 Sep 2016 14:06:09 +0000 (UTC) archived-at: Sat, 24 Sep 2016 14:06:13 -0000 Repository: airavata Updated Branches: refs/heads/lahiru/AIRAVATA-2065 6786f8ee0 -> f7f1f9c09 (forced update) [AIRAVATA-2065] Introduce more robust logging mechanism for Airavata 1. Add configurable KafkaAppender (default turned off) which will push the airavata logs to kafka topic. 2. Use configured kafka topic prefix and construct the topic name based on the airavata role running. 3. Bring back logback as the logging library. 4. Use git describe in the server version string and introduce new serverId with more metadata. 5. Added new class to handle specifically AWS deployment (use aws-metadata service). 6. Add MDC context to orchestrator and gfac thread executions. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f7f1f9c0 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f7f1f9c0 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f7f1f9c0 Branch: refs/heads/lahiru/AIRAVATA-2065 Commit: f7f1f9c093ed66de182c72d9618ac9561a28e5fe Parents: e27c54e Author: Lahiru Ginnaliya Gamathige Authored: Sun Aug 28 21:39:11 2016 -0700 Committer: Lahiru Ginnaliya Gamathige Committed: Fri Sep 23 09:01:09 2016 -0700 ---------------------------------------------------------------------- modules/commons/pom.xml | 12 +- .../airavata/common/utils/BuildConstant.java | 26 ++++ .../airavata/common/logging/Exception.java | 66 +++++++++ .../airavata/common/logging/LogEntry.java | 132 ++++++++++++++++++ .../airavata/common/logging/MDCConstants.java | 30 ++++ .../apache/airavata/common/logging/MDCUtil.java | 32 +++++ .../airavata/common/logging/ServerId.java | 68 +++++++++ .../common/logging/kafka/KafkaAppender.java | 18 +-- .../airavata/common/utils/AiravataZKUtils.java | 13 +- .../common/utils/ApplicationSettings.java | 34 +++-- .../airavata/common/utils/AwsMetadata.java | 137 +++++++++++++++++++ .../apache/airavata/common/utils/JSONUtil.java | 1 - .../airavata/common/utils/ServerSettings.java | 34 +++++ .../main/resources/airavata-server.properties | 8 +- modules/distribution/pom.xml | 19 +-- .../gfac/bes/provider/impl/BESProvider.java | 7 +- .../impl/JSDLGeneratorTestWithMyProxyAuth.java | 2 +- .../apache/airavata/gfac/core/GFacUtils.java | 2 +- .../gfac/impl/task/BESJobSubmissionTask.java | 7 +- .../gfac/monitor/email/EmailBasedMonitor.java | 6 +- .../services/impl/BigRed2TestWithSSHAuth.java | 2 +- .../impl/GSISSHProviderTestWithMyProxyAuth.java | 2 +- .../impl/SSHProviderTestWithSSHAuth.java | 2 +- .../airavata/gfac/server/GfacServerHandler.java | 42 +++--- .../server/OrchestratorServerHandler.java | 31 +++-- modules/registry/registry-core/pom.xml | 27 ---- .../org/apache/airavata/server/ServerMain.java | 36 ++++- .../utils/PropertyReader.java | 3 +- .../core/AbstractThriftDeserializer.java | 3 +- pom.xml | 65 +++++++++ 30 files changed, 750 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/pom.xml ---------------------------------------------------------------------- diff --git a/modules/commons/pom.xml b/modules/commons/pom.xml index a2136c6..f4231ce 100644 --- a/modules/commons/pom.xml +++ b/modules/commons/pom.xml @@ -115,7 +115,6 @@ org.slf4j slf4j-log4j12 - test org.apache.curator @@ -127,12 +126,21 @@ libthrift ${thrift.version} - com.google.code.gson gson ${google.gson.version} + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + + + ch.qos.logback + logback-classic + ${logback.version} + http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java-templates/org/apache/airavata/common/utils/BuildConstant.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java-templates/org/apache/airavata/common/utils/BuildConstant.java b/modules/commons/src/main/java-templates/org/apache/airavata/common/utils/BuildConstant.java new file mode 100644 index 0000000..8cf5ddf --- /dev/null +++ b/modules/commons/src/main/java-templates/org/apache/airavata/common/utils/BuildConstant.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.common.utils; + +public class BuildConstant { + public static final String VERSION = "${git-describe}"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/logging/Exception.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/Exception.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/Exception.java new file mode 100644 index 0000000..cea0c95 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/Exception.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.common.logging; + + +public class Exception { + + private String message; + + private String[] stackTrace; + + private String className; + + public Exception(String message, String[] stackTrace) { + this.message = message; + this.stackTrace = stackTrace; + } + + public Exception(String message, String[] stackTrace, String className) { + this.message = message; + this.stackTrace = stackTrace; + this.className = className; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String[] getStackTrace() { + return stackTrace; + } + + public void setStackTrace(String[] stackTrace) { + this.stackTrace = stackTrace; + } + + public String getClassName() { + return className; + } + + public void setClassName(String className) { + this.className = className; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/logging/LogEntry.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/LogEntry.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/LogEntry.java new file mode 100644 index 0000000..72fc4a0 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/LogEntry.java @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.common.logging; + + +import java.lang.*; +import java.util.Map; + +public class LogEntry { + + private ServerId serverId; + + private String message; + + private String timestamp; + + private String level; + + private String loggerName; + + private Map mdc; + + private String threadName; + + private Exception exception; + + public LogEntry(ServerId serverId, String message, String timestamp, String level, String loggerName, Map mdc, String threadName, Exception exception) { + this.serverId = serverId; + this.message = message; + this.timestamp = timestamp; + this.level = level; + this.loggerName = loggerName; + this.mdc = mdc; + this.threadName = threadName; + this.exception = exception; + } + + public LogEntry(ServerId serverId, String message, String timestamp, String level, String loggerName, Map mdc, String threadName) { + this.serverId = serverId; + this.message = message; + this.timestamp = timestamp; + this.level = level; + this.loggerName = loggerName; + this.mdc = mdc; + this.threadName = threadName; + } + + + public ServerId getServerId() { + return serverId; + } + + public void setServerId(ServerId serverId) { + this.serverId = serverId; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getLevel() { + return level; + } + + public void setLevel(String level) { + this.level = level; + } + + public String getLoggerName() { + return loggerName; + } + + public void setLoggerName(String loggerName) { + this.loggerName = loggerName; + } + + public Map getMdc() { + return mdc; + } + + public void setMdc(Map mdc) { + this.mdc = mdc; + } + + public String getThreadName() { + return threadName; + } + + public void setThreadName(String threadName) { + this.threadName = threadName; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCConstants.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCConstants.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCConstants.java new file mode 100644 index 0000000..487bce0 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCConstants.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.common.logging; + + +public class MDCConstants { + public static final String EXPERIMENT_ID = "experiment_id"; + public static final String GATEWAY_ID = "gateway_id"; + public static final String EXPERIMENT_NAME = "experiment_name"; + public static final String PROCESS_ID = "process_id"; + public static final String TOKEN_ID = "token_id"; +} http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCUtil.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCUtil.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCUtil.java new file mode 100644 index 0000000..4549d25 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCUtil.java @@ -0,0 +1,32 @@ +/// +// Copyright (c) 2016. Highfive Technologies, Inc. +/// +package org.apache.airavata.common.logging; +import org.slf4j.MDC; + +import java.util.Map; + +public class MDCUtil { + public static Runnable wrapWithMDC(Runnable r) { + Map mdc = MDC.getCopyOfContextMap(); + return () -> { + Map oldMdc = MDC.getCopyOfContextMap(); + + if (mdc == null) { + MDC.clear(); + } else { + MDC.setContextMap(mdc); + } + try { + r.run(); + } finally { + if (oldMdc == null) { + MDC.clear(); + } else { + MDC.setContextMap(oldMdc); + } + } + + }; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/logging/ServerId.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/ServerId.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/ServerId.java new file mode 100644 index 0000000..9611302 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/ServerId.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.common.logging; + +public class ServerId { + private String serverId; + private String hostName; + private String version; + + private String[] roles; // gfac, orchestrator, apiserver, + + public ServerId(String serverId, String hostName, String version, String[] roles) { + this.serverId = serverId; + this.hostName = hostName; + this.version = version; + this.roles = roles; + } + + public String getServerId() { + return serverId; + } + + public void setServerId(String serverId) { + this.serverId = serverId; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String[] getRoles() { + return roles; + } + + public void setRoles(String[] roles) { + this.roles = roles; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java index 06649c6..6bc1c1d 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java @@ -62,7 +62,7 @@ public class KafkaAppender extends UnsynchronizedAppenderBase { props.put("producer.type", "async"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - this.kafkaTopic = getKafkaTopic(kafkaTopicPrefix); + this.kafkaTopic = createKafkaTopic(kafkaTopicPrefix); logger.info("Starting kafka producer: bootstrap-server:{}, topic : {}", kafkaHost, this.kafkaTopic); this.producer = new KafkaProducer<>(props); if(ServerSettings.isRunningOnAws()) { @@ -100,20 +100,12 @@ public class KafkaAppender extends UnsynchronizedAppenderBase { return Arrays.stream(stackTraceElement).map(StackTraceElementProxy::getSTEAsString).toArray(String[]::new); } - private String getKafkaTopic(String kafkaTopicPrefix) { - final StringBuilder stringBuffer = new StringBuilder(""); + private String createKafkaTopic(String kafkaTopicPrefix) { final String[] serverRoles = ServerSettings.getServerRoles(); - if (serverRoles.length == 4) { - return kafkaTopicPrefix + "_all"; + if (serverRoles.length >= 4) { + return String.format("%s_all_logs", kafkaTopicPrefix); } - for (String role : ServerSettings.getServerRoles()) { - stringBuffer.append("_"); - stringBuffer.append(role); - stringBuffer.append("_logs"); - // do not support multiple roles yet, topic name will become complex - break; - } - return kafkaTopicPrefix + stringBuffer.toString(); + return String.format("%s_%s_logs", kafkaTopicPrefix, ServerSettings.getServerRoles()[0]); } public void close() { http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java index 5faf985..75f91fd 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java @@ -119,13 +119,14 @@ public class AiravataZKUtils implements Watcher { public static void startEmbeddedZK(ServerCnxnFactory cnxnFactory) { if (ServerSettings.isEmbeddedZK()) { ServerConfig serverConfig = new ServerConfig(); - URL resource = AiravataZKUtils.class.getClassLoader().getResource("zoo.cfg"); - if (resource == null) { - logger.error("There is no zoo.cfg file in the classpath... Failed to start Zookeeper Server"); - System.exit(1); - } + URL resource = ApplicationSettings.loadFile("zoo.cfg"); try { + if (resource == null) { + logger.error("There is no zoo.cfg file in the classpath... Failed to start Zookeeper Server"); + System.exit(1); + } serverConfig.parse(resource.getPath()); + } catch (QuorumPeerConfig.ConfigException e) { logger.error("Error while starting embedded Zookeeper", e); System.exit(2); @@ -193,7 +194,5 @@ public class AiravataZKUtils implements Watcher { } else { return bytesToLong(curatorClient.getData().storingStatIn(exists).forPath(cancelDeliveryTagPath)); } - - } } http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java index 9ce0786..dc7944f 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java @@ -25,9 +25,11 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URL; import java.util.*; import java.util.regex.Matcher; @@ -35,10 +37,12 @@ import java.util.regex.Pattern; public class ApplicationSettings { public static final String SERVER_PROPERTIES="airavata-server.properties"; - - public static String ADDITIONAL_SETTINGS_FILES = "external.settings"; + public static final String AIRAVATA_CONFIG_DIR = "airavata.config.dir"; + + public static String ADDITIONAL_SETTINGS_FILES = "external.settings"; protected Properties properties = new Properties(); + private Exception propertyLoadException; @@ -63,7 +67,6 @@ public class ApplicationSettings { private void loadProperties() { URL url = getPropertyFileURL(); try { - properties.load(url.openStream()); logger.info("Settings loaded from "+url.toString()); URL[] externalSettingsFileURLs = getExternalSettingsFileURLs(); @@ -77,7 +80,7 @@ public class ApplicationSettings { } protected URL getPropertyFileURL() { - return ApplicationSettings.class.getClassLoader().getResource(SERVER_PROPERTIES); + return ApplicationSettings.loadFile(SERVER_PROPERTIES); } protected URL[] getExternalSettingsFileURLs(){ @@ -86,7 +89,7 @@ public class ApplicationSettings { String externalSettingsFileNames = getSettingImpl(ADDITIONAL_SETTINGS_FILES); String[] externalSettingFiles = externalSettingsFileNames.split(","); for (String externalSettingFile : externalSettingFiles) { - URL externalSettingFileURL = ApplicationSettings.class.getClassLoader().getResource(externalSettingFile); + URL externalSettingFileURL = ApplicationSettings.loadFile(externalSettingFile); if (externalSettingFileURL==null){ logger.warn("Could not file external settings file "+externalSettingFile); }else{ @@ -313,11 +316,11 @@ public class ApplicationSettings { * Static methods which will be used by the users */ - public static String getSetting(String key) throws ApplicationSettingsException{ + public static String getSetting(String key) throws ApplicationSettingsException { return getInstance().getSettingImpl(key); } - - public static String getSetting(String key, String defaultValue){ + + public static String getSetting(String key, String defaultValue) { return getInstance().getSettingImpl(key,defaultValue); } @@ -426,4 +429,19 @@ public class ApplicationSettings { public static ShutdownStrategy getShutdownStrategy() throws Exception{ return getInstance().getShutdownStrategyImpl(); } + + public static URL loadFile(String fileName) { + final URL resource = ApplicationSettings.class.getClassLoader().getResource(fileName); + if(resource == null) { + if(System.getProperty(AIRAVATA_CONFIG_DIR) != null) { + final String airavataConfigDir = System.getProperty(AIRAVATA_CONFIG_DIR); + try { + return new File(airavataConfigDir + File.separator + fileName).toURI().toURL(); + } catch (MalformedURLException e) { + logger.error("Error parsing the file from airavata.config.dir", airavataConfigDir); + } + } + } + return resource; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/utils/AwsMetadata.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/AwsMetadata.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/AwsMetadata.java new file mode 100644 index 0000000..f9b4a65 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/AwsMetadata.java @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.common.utils; + + +import com.google.common.base.Preconditions; +import com.google.common.net.InetAddresses; +import com.google.gson.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; + +public class AwsMetadata { + private static final Logger log = LoggerFactory.getLogger(AwsMetadata.class); + private static final String METADATA_URI_BASE = "http://169.254.169.254/"; + private static final String REGION_SUFFIX = "/latest/dynamic/instance-identity/document"; + private static final String ZONE_SUFFIX = "/latest/meta-data/placement/availability-zone"; + private static final String PUBLIC_IPV4_SUFFIX = "/latest/meta-data/public-ipv4"; + private static final String PRIVATE_IPV4_SUFFIX = "latest/meta-data/local-ipv4"; + private static final String HOSTNAME_SUFFIX = "/latest/meta-data/hostname"; + private static final String ID_SUFFIX = "/latest/meta-data/instance-id"; + + private final URI baseUri; + + private String id; + private String region; + private String hostname; + private String zone; + private InetAddress publicIp; + private InetAddress privateIp; + + public AwsMetadata() { + try { + baseUri = new URI(METADATA_URI_BASE); + } catch (URISyntaxException e) { + Preconditions.checkState(false, "Unexpected URI Syntax Exception: {}", e); + throw new RuntimeException(e); + } + } + + public String getRegion() { + if (region == null) { + try { + String dynamicData = getMetadataAt(REGION_SUFFIX); + if (dynamicData != null) { + final JsonObject asJsonObject = new JsonParser().parse(dynamicData).getAsJsonObject(); + region = asJsonObject.get("region").getAsString(); + } + } catch (ClassCastException e) { + log.error("Unable to get region, expecting a JSON Object", e); + } + } + return region; + } + + public String getZoneName() { + if (zone == null) { + zone = getMetadataAt(ZONE_SUFFIX); + } + return zone; + } + + public String getId() { + if (id == null) { + id = getMetadataAt(ID_SUFFIX); + } + + return id; + } + + public String getHostname() { + if (hostname == null) { + hostname = getMetadataAt(HOSTNAME_SUFFIX); + } + return hostname; + } + + public InetAddress getPublicIpAddress() { + if (publicIp == null) { + String ip = getMetadataAt(PUBLIC_IPV4_SUFFIX); + if (ip != null) { + publicIp = InetAddresses.forString(ip); + } + } + return publicIp; + } + + public InetAddress getInternalIpAddress() { + if (privateIp == null) { + String ip = getMetadataAt(PRIVATE_IPV4_SUFFIX); + if (ip != null) { + privateIp = InetAddresses.forString(ip); + } + } + return privateIp; + } + + private String getMetadataAt(String suffix) { + try { + URI resolved = baseUri.resolve(suffix); + StringBuilder builder = new StringBuilder(); + String line = null; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(resolved.toURL().openStream()))) { + while ((line = reader.readLine()) != null) { + builder.append(line); + } + return builder.toString(); + } + } catch (Exception e) { + // ignore for now to make sure local servers don't go verbose + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/utils/JSONUtil.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/JSONUtil.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/JSONUtil.java index 222e5a2..adfc0c2 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/JSONUtil.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/JSONUtil.java @@ -29,7 +29,6 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.google.gson.stream.JsonReader; - import java.io.File; import java.io.FileReader; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index bb11264..3ac2a6e 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; public class ServerSettings extends ApplicationSettings { @@ -108,6 +110,15 @@ public class ServerSettings extends ApplicationSettings { private static final String EMAIL_BASED_MONITOR_STORE_PROTOCOL = "email.based.monitor.store.protocol"; private static final String ENABLE_EMAIL_BASED_MONITORING = "enable.email.based.monitoring"; + private static final String IS_RUNNING_ON_AWS = "isRunningOnAws"; + private static final String ENABLE_KAFKA_LOGGING = "enable.kafka.logging"; + private static final String KAFKA_BROKER_LIST = "kafka.broker.list"; + private static final String KAFKA_TOPIC_PREFIX = "kafka.topic.prefix"; + private static final String SERVER_ROLES = "server.roles"; + + // todo until AIRAVATA-2066 is finished, keep server side list configurations here. + private static Map listConfigurations = new HashMap<>(); + private static boolean stopAllThreads = false; private static boolean emailBaseNotificationEnable; private static String outputLocation; @@ -405,4 +416,27 @@ public class ServerSettings extends ApplicationSettings { public static Boolean isEnableSharing() throws ApplicationSettingsException { return Boolean.parseBoolean(getSetting(ENABLE_SHARING)); } + public static boolean isRunningOnAws() { + return Boolean.valueOf(getSetting(IS_RUNNING_ON_AWS, "false")); + } + + public static String getKafkaBrokerList() { + return getSetting(KAFKA_BROKER_LIST, null); + } + + public static String getKafkaTopicPrefix() { + return getSetting(KAFKA_TOPIC_PREFIX, "all"); + } + + public static boolean isEnabledKafkaLogging() { + return Boolean.valueOf(getSetting(ENABLE_KAFKA_LOGGING, "false")); + } + + public static void setServerRoles(String[] roles) { + listConfigurations.put(SERVER_ROLES, roles); + } + + public static String[] getServerRoles() { + return listConfigurations.get(SERVER_ROLES); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index 2ce3701..a0582f9 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -284,4 +284,10 @@ authorization.policy=airavata-default-xacml-policy #### authorization cache related configuration #### authz.cache.enabled=true authz.cache.manager.class=org.apache.airavata.api.server.security.authzcache.DefaultAuthzCacheManager -in.memory.cache.size=1000 \ No newline at end of file +in.memory.cache.size=1000 + +# Kafka Logging related configuration +isRunningOnAws= false +kafka.broker.list= localhost:9092 +kafka.topic.prefix= local +enable.kafka.logging= false http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml index 78d9bb9..d096739 100644 --- a/modules/distribution/pom.xml +++ b/modules/distribution/pom.xml @@ -164,14 +164,6 @@ jcl-over-slf4j - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - com.amazonaws aws-java-sdk 1.9.0 @@ -559,7 +551,16 @@ curator-framework ${curator.version} - + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + + + ch.qos.logback + logback-classic + ${logback.version} + org.apache.airavata http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java index b287b8e..efd2c5f 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java @@ -144,10 +144,9 @@ public class BESProvider extends AbstractProvider implements GFacProvider, JobDetails jobDetails = new JobDetails(); FactoryClient factory = new FactoryClient(eprt, secProperties); - log.info(String.format("Activity Submitting to %s ... \n", - factoryUrl)); + log.info("Activity Submitting to {} ... \n", factoryUrl); CreateActivityResponseDocument response = factory.createActivity(cad); - log.info(String.format("Activity Submitted to %s \n", factoryUrl)); + log.info("Activity Submitted to {} \n", factoryUrl); EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier(); @@ -353,7 +352,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider, } protected String formatStatusMessage(String activityUrl, String status) { - return String.format("Activity %s is %s.\n", activityUrl, status); + return String.format("Activity {} is {}.\n", activityUrl, status); } protected String subStatusAsString(ActivityStatusType statusType) { http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java b/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java index cf9b82a..90f67a1 100644 --- a/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java +++ b/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java @@ -106,7 +106,7 @@ import static org.junit.Assert.assertTrue; // } // // protected GFacConfiguration getGFACConfig() throws Exception{ -// URL resource = BESProviderTestWithMyProxyAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); +// URL resource = ApplicationSettings.loadFile(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); // System.out.println(resource.getFile()); // GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()),null,null); // return gFacConfiguration; http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index cc1f17b..7e2154e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -809,7 +809,7 @@ public class GFacUtils { public static File createJobFile(TaskContext taskContext, JobDescriptor jobDescriptor, JobManagerConfiguration jobManagerConfiguration) throws GFacException { try { TransformerFactory factory = TransformerFactory.newInstance(); - URL resource = GFacUtils.class.getClassLoader().getResource(jobManagerConfiguration.getJobDescriptionTemplateName()); + URL resource = ApplicationSettings.loadFile(jobManagerConfiguration.getJobDescriptionTemplateName()); if (resource == null) { String error = "System configuration file '" + jobManagerConfiguration.getJobDescriptionTemplateName() http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java index a4dcb5d..b5e1582 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java @@ -151,10 +151,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask { jobDetails.setProcessId(taskContext.getProcessId()); FactoryClient factory = new FactoryClient(eprt, secProperties); - log.info(String.format("Activity Submitting to %s ... \n", - factoryUrl)); + log.info("Activity Submitting to {} ... \n", factoryUrl); CreateActivityResponseDocument response = factory.createActivity(cad); - log.info(String.format("Activity Submitted to %s \n", factoryUrl)); + log.info("Activity Submitted to {} \n", factoryUrl); EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier(); @@ -356,7 +355,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { } protected String formatStatusMessage(String activityUrl, String status) { - return String.format("Activity %s is %s.\n", activityUrl, status); + return String.format("Activity {} is {}.\n", activityUrl, status); } protected void waitUntilDone(EndpointReferenceType factoryEpr, EndpointReferenceType activityEpr, ProcessContext processContext, DefaultClientConfiguration secProperties) throws Exception { http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index b24aa75..bbcd635 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -91,7 +91,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ properties = new Properties(); properties.put("mail.store.protocol", storeProtocol); timer = new Timer("CancelJobHandler", true); - long period = 1000*60*5; // five minute delay between successive task executions. + long period = 1000 * 60 * 5; // five minute delay between successive task executions. timer.schedule(new CancelTimerTask(), 0 , period); } @@ -117,7 +117,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ } @Override public void monitor(String jobId, TaskContext taskContext) { - log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map"); + log.info("[EJM]: Added monitor Id : {} to email based monitor map", jobId); jobMonitorMap.put(jobId, taskContext); taskContext.getParentProcessContext().setPauseTaskExecution(true); } @@ -203,7 +203,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ continue; } else { quite = false; - log.info("[EJM]: " + jobMonitorMap.size() + " job/s in job monitor map"); + log.info("[EJM]: {} job/s in job monitor map", jobMonitorMap.size()); } if (!store.isConnected()) { store.connect(); http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java index 4aecd57..fbad51c 100644 --- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java +++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java @@ -91,7 +91,7 @@ // System.out.println("Use -Dssh.username=xxx -Dssh.password=yyy -Dssh.keypass=zzz " + // "-Dprivate.ssh.key -Dpublic.ssh.key -Dssh.working.directory "); // } -// URL resource = BigRed2TestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); +// URL resource = ApplicationSettings.loadFile(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); // assert resource != null; // System.out.println(resource.getFile()); // GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null); http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java index 4ca5684..9d352a4 100644 --- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java +++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java @@ -90,7 +90,7 @@ // "E.g :- mvn clean install -Dmyproxy.username=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<"); // throw new Exception("Need my proxy user name password to run tests."); // } -// URL resource = GSISSHProviderTestWithMyProxyAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); +// URL resource = ApplicationSettings.loadFile(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); // assert resource != null; // System.out.println(resource.getFile()); // GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null); http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java index 4aa0df1..bdbadda 100644 --- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java +++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java @@ -54,7 +54,7 @@ // @Before // public void setUp() throws Exception { // -// URL resource = SSHProviderTestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); +// URL resource = ApplicationSettings.loadFile(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); // GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()),null); //// gFacConfiguration.s // //have to set InFlwo Handlers and outFlowHandlers http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index cd484bb..db571d7 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -23,6 +23,8 @@ package org.apache.airavata.gfac.server; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.AiravataStartupException; import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.logging.MDCConstants; +import org.apache.airavata.common.logging.MDCUtil; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; @@ -53,6 +55,7 @@ import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; import org.apache.airavata.registry.cpi.RegistryException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; +import org.apache.log4j.MDC; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.CreateMode; @@ -64,6 +67,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Calendar; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -152,13 +156,11 @@ public class GfacServerHandler implements GfacService.Iface { */ public boolean submitProcess(String processId, String gatewayId, String tokenId) throws TException { - requestCount++; - log.info("-----------------------------------" + requestCount + "-----------------------------------------"); - log.info(processId, "GFac Received submit job request for the Process: {} process: {}", processId, - processId); - + MDC.put(MDCConstants.PROCESS_ID, processId); + MDC.put(MDCConstants.GATEWAY_ID, gatewayId); + MDC.put(MDCConstants.TOKEN_ID, tokenId); try { - executorService.execute(new GFacWorker(processId, gatewayId, tokenId)); + executorService.execute(MDCUtil.wrapWithMDC(new GFacWorker(processId, gatewayId, tokenId))); } catch (GFacException e) { log.error("Failed to submit process", e); @@ -184,28 +186,29 @@ public class GfacServerHandler implements GfacService.Iface { } - public void onMessage(MessageContext message) { - log.info(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType()); - if (message.getType().equals(MessageType.LAUNCHPROCESS)) { + public void onMessage(MessageContext messageContext) { + MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId()); + log.info(" Message Received with message id '" + messageContext.getMessageId() + + "' and with message type '" + messageContext.getType()); + if (messageContext.getType().equals(MessageType.LAUNCHPROCESS)) { ProcessStatus status = new ProcessStatus(); status.setState(ProcessState.STARTED); try { ProcessSubmitEvent event = new ProcessSubmitEvent(); - TBase messageEvent = message.getEvent(); + TBase messageEvent = messageContext.getEvent(); byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); ThriftUtils.createThriftFromBytes(bytes, event); - if (message.isRedeliver()) { + if (messageContext.isRedeliver()) { // check the process is already active in this instance. if (Factory.getGfacContext().getProcess(event.getProcessId()) != null) { // update deliver tag try { - updateDeliveryTag(curatorClient, gfacServerName, event, message ); + updateDeliveryTag(curatorClient, gfacServerName, event, messageContext ); return; } catch (Exception e) { log.error("Error while updating delivery tag for redelivery message , messageId : " + - message.getMessageId(), e); - processLaunchSubscriber.sendAck(message.getDeliveryTag()); + messageContext.getMessageId(), e); + processLaunchSubscriber.sendAck(messageContext.getDeliveryTag()); } } else { // read process status from registry @@ -220,8 +223,9 @@ public class GfacServerHandler implements GfacService.Iface { Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event .getProcessId()); publishProcessStatus(event, status); + MDC.put(MDCConstants.EXPERIMENT_ID, event.getExperimentId()); try { - createProcessZKNode(curatorClient, gfacServerName, event, message); + createProcessZKNode(curatorClient, gfacServerName, event, messageContext); boolean isCancel = setCancelWatcher(curatorClient, event.getExperimentId(), event.getProcessId()); if (isCancel) { if (status.getState() == ProcessState.STARTED) { @@ -237,7 +241,7 @@ public class GfacServerHandler implements GfacService.Iface { status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId()); publishProcessStatus(event, status); - processLaunchSubscriber.sendAck(message.getDeliveryTag()); + processLaunchSubscriber.sendAck(messageContext.getDeliveryTag()); return; } else { setCancelData(event.getExperimentId(),event.getProcessId()); @@ -246,7 +250,7 @@ public class GfacServerHandler implements GfacService.Iface { submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); } catch (Exception e) { log.error(e.getMessage(), e); - processLaunchSubscriber.sendAck(message.getDeliveryTag()); + processLaunchSubscriber.sendAck(messageContext.getDeliveryTag()); } } catch (TException e) { log.error(e.getMessage(), e); //nobody is listening so nothing to throw @@ -254,6 +258,8 @@ public class GfacServerHandler implements GfacService.Iface { log.error("Error while updating experiment status", e); } catch (AiravataException e) { log.error("Error while publishing process status", e); + } finally { + MDC.clear(); } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 725a0b1..699e17e 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -23,6 +23,8 @@ package org.apache.airavata.orchestrator.server; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.logging.MDCConstants; +import org.apache.airavata.common.logging.MDCUtil; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; @@ -62,6 +64,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.ZKPaths; +import org.apache.log4j.MDC; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.data.Stat; @@ -225,7 +228,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); log.info("expId: {}, Launched experiment ", experimentId); - OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(new SingleAppExperimentRunner(experimentId, token, gatewayId)); + OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(MDCUtil.wrapWithMDC(new SingleAppExperimentRunner(experimentId, token, gatewayId))); } else if (executionType == ExperimentType.WORKFLOW) { //its a workflow execution experiment log.debug(experimentId, "Launching workflow experiment {}.", experimentId); @@ -591,7 +594,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { @Override public void onMessage(MessageContext messageContext) { - + MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId()); switch (messageContext.getType()) { case EXPERIMENT: launchExperiment(messageContext); @@ -604,6 +607,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { log.error("Orchestrator got un-support message type : " + messageContext.getType()); break; } + MDC.clear(); } private void cancelExperiment(MessageContext messageContext) { @@ -611,6 +615,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent()); ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent(); ThriftUtils.createThriftFromBytes(bytes, expEvent); + log.info("Cancelling experiment with experimentId: %s gateway Id: %s", expEvent.getExperimentId(), expEvent.getGatewayId()); terminateExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); } catch (TException e) { log.error("Experiment cancellation failed due to Thrift conversion error", e); @@ -622,13 +627,16 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { } private void launchExperiment(MessageContext messageContext) { + ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent(); try { - byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent()); - ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent(); - ThriftUtils.createThriftFromBytes(bytes, expEvent); - if (messageContext.isRedeliver()) { + byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent()); + ThriftUtils.createThriftFromBytes(bytes, expEvent); + MDC.put(MDCConstants.EXPERIMENT_ID, expEvent.getExperimentId()); + log.info("Launching experiment with experimentId: %s gateway Id: %s", expEvent.getExperimentId(), expEvent.getGatewayId()); + if (messageContext.isRedeliver()) { ExperimentModel experimentModel = (ExperimentModel) experimentCatalog. get(ExperimentCatalogModelType.EXPERIMENT, expEvent.getExperimentId()); + MDC.put(MDCConstants.EXPERIMENT_NAME, experimentModel.getExperimentName()); if (experimentModel.getExperimentStatus().get(0).getState() == ExperimentState.CREATED) { launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); } @@ -636,11 +644,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); } } catch (TException e) { - log.error("Experiment launch failed due to Thrift conversion error", e); + String logMessage = expEvent.getExperimentId() != null && expEvent.getGatewayId() != null ? + String.format("Experiment launch failed due to Thrift conversion error, experimentId: %s, gatewayId: %s", + expEvent.getExperimentId(), expEvent.getGatewayId()): "Experiment launch failed due to Thrift conversion error"; + log.error(logMessage, e); } catch (RegistryException e) { - log.error("Experiment launch failed due to registry access issue", e); + String logMessage = expEvent.getExperimentId() != null && expEvent.getGatewayId() != null ? + String.format("Experiment launch failed due to registry access issue, experimentId: %s, gatewayId: %s", + expEvent.getExperimentId(), expEvent.getGatewayId()): "Experiment launch failed due to registry access issue"; + log.error(logMessage, e); }finally { experimentSubscriber.sendAck(messageContext.getDeliveryTag()); + MDC.clear(); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/registry/registry-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/pom.xml b/modules/registry/registry-core/pom.xml index aa9a61b..23db8c6 100644 --- a/modules/registry/registry-core/pom.xml +++ b/modules/registry/registry-core/pom.xml @@ -107,33 +107,6 @@ - - org.apache.openjpa - openjpa-maven-plugin - 2.2.0 - - **/entities/*.class - **/entities/XML*.class - true - true - - - - enhancer - process-classes - - enhance - - - - - - org.apache.openjpa - openjpa - 2.2.0 - - - http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java index 99387de..faba1e4 100644 --- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java +++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java @@ -20,13 +20,18 @@ */ package org.apache.airavata.server; +import ch.qos.logback.classic.LoggerContext; +import org.apache.airavata.api.Airavata; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.logging.kafka.KafkaAppender; import org.apache.airavata.common.utils.*; import org.apache.airavata.common.utils.ApplicationSettings.ShutdownStrategy; import org.apache.airavata.common.utils.IServer.ServerStatus; import org.apache.airavata.common.utils.StringUtil.CommandLineParameters; import org.apache.commons.cli.ParseException; import org.apache.zookeeper.server.ServerCnxnFactory; +import org.slf4j.ILoggerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,15 +159,39 @@ public class ServerMain { // }); // } - public static void main(String args[]) throws ParseException, IOException { - CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args); + public static void main(String args[]) throws ParseException, IOException, AiravataException { + ServerSettings.mergeSettingsCommandLineArgs(args); + ServerSettings.setServerRoles(ApplicationSettings.getSetting(SERVERS_KEY, "all").split(",")); + + if (ServerSettings.isEnabledKafkaLogging()) { + final ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory(); + if (iLoggerFactory instanceof LoggerContext) { + final KafkaAppender kafkaAppender = new KafkaAppender(ServerSettings.getKafkaBrokerList(), + ServerSettings.getKafkaTopicPrefix()); + kafkaAppender.setContext((LoggerContext) iLoggerFactory); + kafkaAppender.setName("kafka-appender"); + kafkaAppender.clearAllFilters(); + kafkaAppender.start(); + // Until AIRAVATA-2073 use for airavata and zookeeper, add others if required + ((LoggerContext) iLoggerFactory).getLogger("org.apache.airavata").addAppender(kafkaAppender); + ((LoggerContext) iLoggerFactory).getLogger("org.apache.zookeeper").addAppender(kafkaAppender); + ((LoggerContext) iLoggerFactory).getLogger("org.apache.derby").addAppender(kafkaAppender); + ((LoggerContext) iLoggerFactory).getLogger("org.apache.commons").addAppender(kafkaAppender); + } else { + logger.warn("Kafka logging is enabled but cannot find logback LoggerContext, found", iLoggerFactory.getClass().toString()); + throw new AiravataException("Kafka logging is enabled but cannot find logback LoggerContext"); + } + } else { + logger.info("Kafka logging is disabled in airavata server configurations"); + } + + CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args); if (commandLineParameters.getArguments().contains(STOP_COMMAND_STR)){ performServerStopRequest(commandLineParameters); }else{ AiravataZKUtils.startEmbeddedZK(cnxnFactory); performServerStart(args); } - } @@ -173,7 +202,6 @@ public class ServerMain { for (String string : args) { logger.info("Server Arguments: " + string); } - ServerSettings.mergeSettingsCommandLineArgs(args); String serverNames; try { serverNames = ApplicationSettings.getSetting(SERVERS_KEY); http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java index f7b8e1f..9df1745 100644 --- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java +++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java @@ -21,6 +21,7 @@ package org.apache.airavata.testsuite.multitenantedairavata.utils; +import org.apache.airavata.common.utils.ApplicationSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,7 @@ public class PropertyReader { } protected void loadProperties() throws IOException { - URL airavataURL = PropertyFileType.class.getClassLoader().getResource(TestFrameworkConstants.AIRAVATA_CLIENT_PROPERTIES); + URL airavataURL = ApplicationSettings.loadFile(TestFrameworkConstants.AIRAVATA_CLIENT_PROPERTIES); if (airavataURL != null){ airavataClientProperties.load(airavataURL.openStream()); } http://git-wip-us.apache.org/repos/asf/airavata/blob/f7f1f9c0/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java ---------------------------------------------------------------------- diff --git a/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java b/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java index 83af52b..14a4b9c 100644 --- a/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java +++ b/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java @@ -91,8 +91,7 @@ public abstract class AbstractThriftDeserializer1.15 false 1.5.3 + 0.8.2.2 + 1.1.6 @@ -434,6 +436,16 @@ curator-framework ${curator.version} + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + + + ch.qos.logback + logback-classic + ${logback.version} + @@ -480,6 +492,39 @@ + org.codehaus.mojo + templating-maven-plugin + 1.0.0 + + + filtering-java-templates + + filter-sources + + + + + + com.lukegb.mojo + gitdescribe-maven-plugin + 3.0 + + + + gitdescribe + + git-describe + initialize + + + --tags + + git-describe + + + + + org.apache.maven.plugins maven-remote-resources-plugin @@ -542,6 +587,26 @@ + + com.lukegb.mojo + gitdescribe-maven-plugin + 3.0 + + + + gitdescribe + + git-describe + initialize + + + --tags + + git-describe + + + +