Return-Path: X-Original-To: apmail-brooklyn-commits-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 048241821C for ; Wed, 19 Aug 2015 22:54:22 +0000 (UTC) Received: (qmail 48570 invoked by uid 500); 19 Aug 2015 22:54:21 -0000 Delivered-To: apmail-brooklyn-commits-archive@brooklyn.apache.org Received: (qmail 48546 invoked by uid 500); 19 Aug 2015 22:54:21 -0000 Mailing-List: contact commits-help@brooklyn.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.incubator.apache.org Delivered-To: mailing list commits@brooklyn.incubator.apache.org Received: (qmail 48537 invoked by uid 99); 19 Aug 2015 22:54:21 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 22:54:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 515841AA82C for ; Wed, 19 Aug 2015 22:54:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.775 X-Spam-Level: * X-Spam-Status: No, score=1.775 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id QDmPecW3smgC for ; Wed, 19 Aug 2015 22:54:07 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 219104C0FB for ; Wed, 19 Aug 2015 22:53:50 +0000 (UTC) Received: (qmail 46643 invoked by uid 99); 19 Aug 2015 22:53:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 22:53:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 508BCE7151; Wed, 19 Aug 2015 22:53:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: heneveld@apache.org To: commits@brooklyn.incubator.apache.org Date: Wed, 19 Aug 2015 22:54:08 -0000 Message-Id: <9e48ab247fad4bc7b0834a7683ffaa7a@git.apache.org> In-Reply-To: <65cc203565d44c9592e8c87850030791@git.apache.org> References: <65cc203565d44c9592e8c87850030791@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [20/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to o.a.b.feed and o.a.b.core.feed http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxFeed.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxFeed.java b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxFeed.java new file mode 100644 index 0000000..b3b2994 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxFeed.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.jmx; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import javax.management.Notification; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; +import javax.management.ObjectName; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.feed.AbstractFeed; +import org.apache.brooklyn.core.feed.AttributePollHandler; +import org.apache.brooklyn.core.feed.DelegatingPollHandler; +import org.apache.brooklyn.core.feed.PollHandler; +import org.apache.brooklyn.core.feed.Poller; +import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; + + +/** + * Provides a feed of attribute values, by polling or subscribing over jmx. + * + * Example usage (e.g. in an entity that extends {@link SoftwareProcessImpl}): + *
+ * {@code
+ * private JmxFeed feed;
+ * 
+ * //@Override
+ * protected void connectSensors() {
+ *   super.connectSensors();
+ *   
+ *   feed = JmxFeed.builder()
+ *       .entity(this)
+ *       .period(500, TimeUnit.MILLISECONDS)
+ *       .pollAttribute(new JmxAttributePollConfig(ERROR_COUNT)
+ *           .objectName(requestProcessorMbeanName)
+ *           .attributeName("errorCount"))
+ *       .pollAttribute(new JmxAttributePollConfig(SERVICE_UP)
+ *           .objectName(serverMbeanName)
+ *           .attributeName("Started")
+ *           .onError(Functions.constant(false)))
+ *       .build();
+ * }
+ * 
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ *   super.disconnectSensors();
+ *   if (feed != null) feed.stop();
+ * }
+ * }
+ * 
+ * + * @author aled + */ +public class JmxFeed extends AbstractFeed { + + public static final Logger log = LoggerFactory.getLogger(JmxFeed.class); + + public static final long JMX_CONNECTION_TIMEOUT_MS = 120*1000; + + public static final ConfigKey HELPER = ConfigKeys.newConfigKey(JmxHelper.class, "helper"); + public static final ConfigKey OWN_HELPER = ConfigKeys.newBooleanConfigKey("ownHelper"); + public static final ConfigKey JMX_URI = ConfigKeys.newStringConfigKey("jmxUri"); + public static final ConfigKey JMX_CONNECTION_TIMEOUT = ConfigKeys.newLongConfigKey("jmxConnectionTimeout"); + + @SuppressWarnings("serial") + public static final ConfigKey>> ATTRIBUTE_POLLS = ConfigKeys.newConfigKey( + new TypeToken>>() {}, + "attributePolls"); + + @SuppressWarnings("serial") + public static final ConfigKey, JmxOperationPollConfig>> OPERATION_POLLS = ConfigKeys.newConfigKey( + new TypeToken, JmxOperationPollConfig>>() {}, + "operationPolls"); + + @SuppressWarnings("serial") + public static final ConfigKey>> NOTIFICATION_SUBSCRIPTIONS = ConfigKeys.newConfigKey( + new TypeToken>>() {}, + "notificationPolls"); + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private EntityLocal entity; + private JmxHelper helper; + private long jmxConnectionTimeout = JMX_CONNECTION_TIMEOUT_MS; + private long period = 500; + private TimeUnit periodUnits = TimeUnit.MILLISECONDS; + private List> attributePolls = Lists.newArrayList(); + private List> operationPolls = Lists.newArrayList(); + private List> notificationSubscriptions = Lists.newArrayList(); + private String uniqueTag; + private volatile boolean built; + + public Builder entity(EntityLocal val) { + this.entity = val; + return this; + } + public Builder helper(JmxHelper val) { + this.helper = val; + return this; + } + public Builder period(Duration duration) { + return period(duration.toMilliseconds(), TimeUnit.MILLISECONDS); + } + public Builder period(long millis) { + return period(millis, TimeUnit.MILLISECONDS); + } + public Builder period(long val, TimeUnit units) { + this.period = val; + this.periodUnits = units; + return this; + } + public Builder pollAttribute(JmxAttributePollConfig config) { + attributePolls.add(config); + return this; + } + public Builder pollOperation(JmxOperationPollConfig config) { + operationPolls.add(config); + return this; + } + public Builder subscribeToNotification(JmxNotificationSubscriptionConfig config) { + notificationSubscriptions.add(config); + return this; + } + public Builder uniqueTag(String uniqueTag) { + this.uniqueTag = uniqueTag; + return this; + } + public JmxFeed build() { + built = true; + JmxFeed result = new JmxFeed(this); + result.setEntity(checkNotNull(entity, "entity")); + result.start(); + return result; + } + @Override + protected void finalize() { + if (!built) log.warn("JmxFeed.Builder created, but build() never called"); + } + } + + private final SetMultimap notificationListeners = HashMultimap.create(); + + /** + * For rebind; do not call directly; use builder + */ + public JmxFeed() { + } + + protected JmxFeed(Builder builder) { + super(); + if (builder.helper != null) { + JmxHelper helper = builder.helper; + setConfig(HELPER, helper); + setConfig(OWN_HELPER, false); + setConfig(JMX_URI, helper.getUrl()); + } + setConfig(JMX_CONNECTION_TIMEOUT, builder.jmxConnectionTimeout); + + SetMultimap> attributePolls = HashMultimap.>create(); + for (JmxAttributePollConfig config : builder.attributePolls) { + if (!config.isEnabled()) continue; + @SuppressWarnings({ "rawtypes", "unchecked" }) + JmxAttributePollConfig configCopy = new JmxAttributePollConfig(config); + if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); + attributePolls.put(configCopy.getObjectName().getCanonicalName() + configCopy.getAttributeName(), configCopy); + } + setConfig(ATTRIBUTE_POLLS, attributePolls); + + SetMultimap, JmxOperationPollConfig> operationPolls = HashMultimap.,JmxOperationPollConfig>create(); + for (JmxOperationPollConfig config : builder.operationPolls) { + if (!config.isEnabled()) continue; + @SuppressWarnings({ "rawtypes", "unchecked" }) + JmxOperationPollConfig configCopy = new JmxOperationPollConfig(config); + if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); + operationPolls.put(configCopy.buildOperationIdentity(), configCopy); + } + setConfig(OPERATION_POLLS, operationPolls); + + SetMultimap> notificationSubscriptions = HashMultimap.create(); + for (JmxNotificationSubscriptionConfig config : builder.notificationSubscriptions) { + if (!config.isEnabled()) continue; + notificationSubscriptions.put(config.getNotificationFilter(), config); + } + setConfig(NOTIFICATION_SUBSCRIPTIONS, notificationSubscriptions); + initUniqueTag(builder.uniqueTag, attributePolls, operationPolls, notificationSubscriptions); + } + + @Override + public void setEntity(EntityLocal entity) { + if (getConfig(HELPER) == null) { + JmxHelper helper = new JmxHelper(entity); + setConfig(HELPER, helper); + setConfig(OWN_HELPER, true); + setConfig(JMX_URI, helper.getUrl()); + } + super.setEntity(entity); + } + + public String getJmxUri() { + return getConfig(JMX_URI); + } + + protected JmxHelper getHelper() { + return getConfig(HELPER); + } + + @SuppressWarnings("unchecked") + protected Poller getPoller() { + return (Poller) super.getPoller(); + } + + @Override + protected boolean isConnected() { + return super.isConnected() && getHelper().isConnected(); + } + + @Override + protected void preStart() { + /* + * All actions on the JmxHelper are done async (through the poller's threading) so we don't + * block on start/rebind if the entity is unreachable + * (without this we get a 120s pause in JmxHelper.connect restarting) + */ + final SetMultimap> notificationSubscriptions = getConfig(NOTIFICATION_SUBSCRIPTIONS); + final SetMultimap, JmxOperationPollConfig> operationPolls = getConfig(OPERATION_POLLS); + final SetMultimap> attributePolls = getConfig(ATTRIBUTE_POLLS); + + getPoller().submit(new Callable() { + public Void call() { + getHelper().connect(getConfig(JMX_CONNECTION_TIMEOUT)); + return null; + } + @Override public String toString() { return "Connect JMX "+getHelper().getUrl(); } + }); + + for (final NotificationFilter filter : notificationSubscriptions.keySet()) { + getPoller().submit(new Callable() { + public Void call() { + // TODO Could config.getObjectName have wildcards? Is this code safe? + Set> configs = notificationSubscriptions.get(filter); + NotificationListener listener = registerNotificationListener(configs); + ObjectName objectName = Iterables.get(configs, 0).getObjectName(); + notificationListeners.put(objectName, listener); + return null; + } + @Override public String toString() { return "Register JMX notifications: "+notificationSubscriptions.get(filter); } + }); + } + + // Setup polling of sensors + for (final String jmxAttributeName : attributePolls.keys()) { + registerAttributePoller(attributePolls.get(jmxAttributeName)); + } + + // Setup polling of operations + for (final List operationIdentifier : operationPolls.keys()) { + registerOperationPoller(operationPolls.get(operationIdentifier)); + } + } + + @Override + protected void preStop() { + super.preStop(); + + for (Map.Entry entry : notificationListeners.entries()) { + unregisterNotificationListener(entry.getKey(), entry.getValue()); + } + notificationListeners.clear(); + } + + @Override + protected void postStop() { + super.postStop(); + JmxHelper helper = getHelper(); + Boolean ownHelper = getConfig(OWN_HELPER); + if (helper != null && ownHelper) helper.terminate(); + } + + /** + * Registers to poll a jmx-operation for an ObjectName, where all the given configs are for the same ObjectName + operation + parameters. + */ + private void registerOperationPoller(Set> configs) { + Set> handlers = Sets.newLinkedHashSet(); + long minPeriod = Integer.MAX_VALUE; + + final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); + final String operationName = Iterables.get(configs, 0).getOperationName(); + final List signature = Iterables.get(configs, 0).getSignature(); + final List params = Iterables.get(configs, 0).getParams(); + + for (JmxOperationPollConfig config : configs) { + handlers.add(new AttributePollHandler(config, getEntity(), this)); + if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); + } + + getPoller().scheduleAtFixedRate( + new Callable() { + public Object call() throws Exception { + if (log.isDebugEnabled()) log.debug("jmx operation polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), operationName}); + if (signature.size() == params.size()) { + return getHelper().operation(objectName, operationName, signature, params); + } else { + return getHelper().operation(objectName, operationName, params.toArray()); + } + } + }, + new DelegatingPollHandler(handlers), minPeriod); + } + + /** + * Registers to poll a jmx-attribute for an ObjectName, where all the given configs are for that same ObjectName + attribute. + */ + private void registerAttributePoller(Set> configs) { + Set> handlers = Sets.newLinkedHashSet(); + long minPeriod = Integer.MAX_VALUE; + + final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); + final String jmxAttributeName = Iterables.get(configs, 0).getAttributeName(); + + for (JmxAttributePollConfig config : configs) { + handlers.add(new AttributePollHandler(config, getEntity(), this)); + if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); + } + + // TODO Not good calling this holding the synchronization lock + getPoller().scheduleAtFixedRate( + new Callable() { + public Object call() throws Exception { + if (log.isTraceEnabled()) log.trace("jmx attribute polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), jmxAttributeName}); + return getHelper().getAttribute(objectName, jmxAttributeName); + } + }, + new DelegatingPollHandler(handlers), minPeriod); + } + + /** + * Registers to subscribe to notifications for an ObjectName, where all the given configs are for that same ObjectName + filter. + */ + private NotificationListener registerNotificationListener(Set> configs) { + final List> handlers = Lists.newArrayList(); + + final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); + final NotificationFilter filter = Iterables.get(configs, 0).getNotificationFilter(); + + for (final JmxNotificationSubscriptionConfig config : configs) { + AttributePollHandler handler = new AttributePollHandler(config, getEntity(), this) { + @Override protected Object transformValueOnSuccess(javax.management.Notification val) { + if (config.getOnNotification() != null) { + return config.getOnNotification().apply(val); + } else { + Object result = super.transformValueOnSuccess(val); + if (result instanceof javax.management.Notification) + return ((javax.management.Notification)result).getUserData(); + return result; + } + } + }; + handlers.add(handler); + } + final PollHandler compoundHandler = new DelegatingPollHandler(handlers); + + NotificationListener listener = new NotificationListener() { + @Override public void handleNotification(Notification notification, Object handback) { + compoundHandler.onSuccess(notification); + } + }; + getHelper().addNotificationListener(objectName, listener, filter); + + return listener; + } + + private void unregisterNotificationListener(ObjectName objectName, NotificationListener listener) { + try { + getHelper().removeNotificationListener(objectName, listener); + } catch (RuntimeException e) { + log.warn("Failed to unregister listener: "+objectName+", "+listener+"; continuing...", e); + } + } + + @Override + public String toString() { + return "JmxFeed["+(getManagementContext()!=null&&getManagementContext().isRunning()?getJmxUri():"mgmt-not-running")+"]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxHelper.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxHelper.java b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxHelper.java new file mode 100644 index 0000000..652ac76 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxHelper.java @@ -0,0 +1,724 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.jmx; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; +import groovy.time.TimeDuration; + +import java.io.IOException; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.InvalidAttributeValueException; +import javax.management.JMX; +import javax.management.ListenerNotFoundException; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.entity.java.JmxSupport; +import org.apache.brooklyn.entity.java.UsesJmx; +import org.apache.brooklyn.util.core.crypto.SecureKeys; +import org.apache.brooklyn.util.crypto.SslTrustUtils; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; +import org.apache.brooklyn.util.jmx.jmxmp.JmxmpAgent; +import org.apache.brooklyn.util.repeat.Repeater; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +public class JmxHelper { + + private static final Logger LOG = LoggerFactory.getLogger(JmxHelper.class); + + public static final String JMX_URL_FORMAT = "service:jmx:rmi:///jndi/rmi://%s:%d/%s"; + // first host:port may be ignored, so above is sufficient, but not sure + public static final String RMI_JMX_URL_FORMAT = "service:jmx:rmi://%s:%d/jndi/rmi://%s:%d/%s"; + // jmxmp + public static final String JMXMP_URL_FORMAT = "service:jmx:jmxmp://%s:%d"; + + // Tracks the MBeans we have failed to find, with a set keyed off the url + private static final Map> notFoundMBeansByUrl = Collections.synchronizedMap(new WeakHashMap>()); + + public static final Map CLASSES = ImmutableMap.builder() + .put("Integer", Integer.TYPE.getName()) + .put("Long", Long.TYPE.getName()) + .put("Boolean", Boolean.TYPE.getName()) + .put("Byte", Byte.TYPE.getName()) + .put("Character", Character.TYPE.getName()) + .put("Double", Double.TYPE.getName()) + .put("Float", Float.TYPE.getName()) + .put("GStringImpl", String.class.getName()) + .put("LinkedHashMap", Map.class.getName()) + .put("TreeMap", Map.class.getName()) + .put("HashMap", Map.class.getName()) + .put("ConcurrentHashMap", Map.class.getName()) + .put("TabularDataSupport", TabularData.class.getName()) + .put("CompositeDataSupport", CompositeData.class.getName()) + .build(); + + /** constructs a JMX URL suitable for connecting to the given entity, being smart about JMX/RMI vs JMXMP */ + public static String toJmxUrl(EntityLocal entity) { + String url = entity.getAttribute(UsesJmx.JMX_URL); + if (url != null) { + return url; + } else { + new JmxSupport(entity, null).setJmxUrl(); + url = entity.getAttribute(UsesJmx.JMX_URL); + return Preconditions.checkNotNull(url, "Could not find URL for "+entity); + } + } + + /** constructs an RMI/JMX URL with the given inputs + * (where the RMI Registry Port should be non-null, and at least one must be non-null) */ + public static String toRmiJmxUrl(String host, Integer jmxRmiServerPort, Integer rmiRegistryPort, String context) { + if (rmiRegistryPort != null && rmiRegistryPort > 0) { + if (jmxRmiServerPort!=null && jmxRmiServerPort > 0 && jmxRmiServerPort!=rmiRegistryPort) { + // we have an explicit known JMX RMI server port (e.g. because we are using the agent), + // distinct from the RMI registry port + // (if the ports are the same, it is a short-hand, and don't use this syntax!) + return String.format(RMI_JMX_URL_FORMAT, host, jmxRmiServerPort, host, rmiRegistryPort, context); + } + return String.format(JMX_URL_FORMAT, host, rmiRegistryPort, context); + } else if (jmxRmiServerPort!=null && jmxRmiServerPort > 0) { + LOG.warn("No RMI registry port set for "+host+"; attempting to use JMX port for RMI lookup"); + return String.format(JMX_URL_FORMAT, host, jmxRmiServerPort, context); + } else { + LOG.warn("No RMI/JMX details set for "+host+"; returning null"); + return null; + } + } + + /** constructs a JMXMP URL for connecting to the given host and port */ + public static String toJmxmpUrl(String host, Integer jmxmpPort) { + return "service:jmx:jmxmp://"+host+(jmxmpPort!=null ? ":"+jmxmpPort : ""); + } + + final EntityLocal entity; + final String url; + final String user; + final String password; + + private volatile transient JMXConnector connector; + private volatile transient MBeanServerConnection connection; + private transient boolean triedConnecting; + private transient boolean failedReconnecting; + private transient long failedReconnectingTime; + private int minTimeBetweenReconnectAttempts = 1000; + private final AtomicBoolean terminated = new AtomicBoolean(); + + // Tracks the MBeans we have failed to find for this JmsHelper's connection URL (so can log just once for each) + private final Set notFoundMBeans; + + public JmxHelper(EntityLocal entity) { + this(toJmxUrl(entity), entity, entity.getAttribute(UsesJmx.JMX_USER), entity.getAttribute(UsesJmx.JMX_PASSWORD)); + + if (entity.getAttribute(UsesJmx.JMX_URL) == null) { + entity.setAttribute(UsesJmx.JMX_URL, url); + } + } + + // TODO split this in to two classes, one for entities, and one entity-neutral + // (simplifying set of constructors below) + + public JmxHelper(String url) { + this(url, null, null); + } + + public JmxHelper(String url, String user, String password) { + this(url, null, user, password); + } + + public JmxHelper(String url, EntityLocal entity, String user, String password) { + this.url = url; + this.entity = entity; + this.user = user; + this.password = password; + + synchronized (notFoundMBeansByUrl) { + Set set = notFoundMBeansByUrl.get(url); + if (set == null) { + set = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap())); + notFoundMBeansByUrl.put(url, set); + } + notFoundMBeans = set; + } + } + + public void setMinTimeBetweenReconnectAttempts(int val) { + minTimeBetweenReconnectAttempts = val; + } + + public String getUrl(){ + return url; + } + + // ============== connection related calls ======================= + + //for tesing purposes + protected MBeanServerConnection getConnection() { + return connection; + } + + /** + * Checks if the JmxHelper is connected. Returned value could be stale as soon + * as it is received. + * + * This method is thread safe. + * + * @return true if connected, false otherwise. + */ + public boolean isConnected() { + return connection!=null; + } + + /** + * Reconnects. If it already is connected, it disconnects first. + * + * @throws IOException + */ + public synchronized void reconnectWithRetryDampened() throws IOException { + // If we've already tried reconnecting very recently, don't try again immediately + if (failedReconnecting) { + long timeSince = (System.currentTimeMillis() - failedReconnectingTime); + if (timeSince < minTimeBetweenReconnectAttempts) { + String msg = "Not reconnecting to JMX at "+url+" because attempt failed "+Time.makeTimeStringRounded(timeSince)+" ago"; + throw new IllegalStateException(msg); + } + } + + reconnect(); + } + + public synchronized void reconnect() throws IOException { + disconnect(); + + try { + connect(); + failedReconnecting = false; + } catch (Exception e) { + if (failedReconnecting) { + if (LOG.isDebugEnabled()) LOG.debug("unable to re-connect to JMX url (repeated failure): {}: {}", url, e); + } else { + LOG.debug("unable to re-connect to JMX url {} (rethrowing): {}", url, e); + failedReconnecting = true; + } + failedReconnectingTime = System.currentTimeMillis(); + throw Throwables.propagate(e); + } + } + + /** attempts to connect immediately */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public synchronized void connect() throws IOException { + if (terminated.get()) throw new IllegalStateException("JMX Helper "+this+" already terminated"); + if (connection != null) return; + + triedConnecting = true; + if (connector != null) connector.close(); + JMXServiceURL serviceUrl = new JMXServiceURL(url); + Map env = getConnectionEnvVars(); + try { + connector = JMXConnectorFactory.connect(serviceUrl, env); + } catch (NullPointerException npe) { + //some software -- eg WSO2 -- will throw an NPE exception if the JMX connection can't be created, instead of an IOException. + //this is a break of contract with the JMXConnectorFactory.connect method, so this code verifies if the NPE is + //thrown by a known offender (wso2) and if so replaces the bad exception by a new IOException. + //ideally WSO2 will fix this bug and we can remove this code. + boolean thrownByWso2 = npe.getStackTrace()[0].toString().contains("org.wso2.carbon.core.security.CarbonJMXAuthenticator.authenticate"); + if (thrownByWso2) { + throw new IOException("Failed to connect to url "+url+". NullPointerException is thrown, but replaced by an IOException to fix a WSO2 JMX problem", npe); + } else { + throw npe; + } + } catch (IOException e) { + Exceptions.propagateIfFatal(e); + if (terminated.get()) { + throw new IllegalStateException("JMX Helper "+this+" already terminated", e); + } else { + throw e; + } + } + connection = connector.getMBeanServerConnection(); + + if (terminated.get()) { + disconnectNow(); + throw new IllegalStateException("JMX Helper "+this+" already terminated"); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public Map getConnectionEnvVars() { + Map env = new LinkedHashMap(); + + if (groovyTruth(user) && groovyTruth(password)) { + String[] creds = new String[] {user, password}; + env.put(JMXConnector.CREDENTIALS, creds); + } + + if (entity!=null && groovyTruth(entity.getConfig(UsesJmx.JMX_SSL_ENABLED))) { + env.put("jmx.remote.profiles", JmxmpAgent.TLS_JMX_REMOTE_PROFILES); + + PrivateKey key = entity.getConfig(UsesJmx.JMX_SSL_ACCESS_KEY); + Certificate cert = entity.getConfig(UsesJmx.JMX_SSL_ACCESS_CERT); + KeyStore ks = SecureKeys.newKeyStore(); + try { + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + if (key!=null) { + ks.setKeyEntry("brooklyn-jmx-access", key, "".toCharArray(), new Certificate[] { cert }); + } + kmf.init(ks, "".toCharArray()); + + TrustManager tms = + // TODO use root cert for trusting server + //trustStore!=null ? SecureKeys.getTrustManager(trustStore) : + SslTrustUtils.TRUST_ALL; + + SSLContext ctx = SSLContext.getInstance("TLSv1"); + ctx.init(kmf.getKeyManagers(), new TrustManager[] { tms }, null); + SSLSocketFactory ssf = ctx.getSocketFactory(); + env.put(JmxmpAgent.TLS_SOCKET_FACTORY_PROPERTY, ssf); + + } catch (Exception e) { + LOG.warn("Error setting key "+key+" for "+entity+": "+e, e); + } + } + + return env; + } + + /** + * Continuously attempts to connect for at least the indicated amount of time; or indefinitely if -1. This method + * is useful when you are not sure if the system you are trying to connect to already is up and running. + * + * This method doesn't throw an Exception, but returns true on success, false otherwise. + * + * TODO: What happens if already connected? + * + * @param timeoutMs + * @return + */ + public boolean connect(long timeoutMs) { + if (LOG.isDebugEnabled()) LOG.debug("Connecting to JMX URL: {} ({})", url, ((timeoutMs == -1) ? "indefinitely" : timeoutMs+"ms timeout")); + long startMs = System.currentTimeMillis(); + long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs); + long currentTime = startMs; + Throwable lastError = null; + int attempt = 0; + while (currentTime <= endMs) { + currentTime = System.currentTimeMillis(); + if (attempt != 0) sleep(100); //sleep 100 to prevent thrashing and facilitate interruption + if (LOG.isTraceEnabled()) LOG.trace("trying connection to {} at time {}", url, currentTime); + + try { + connect(); + return true; + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + if (!terminated.get() && shouldRetryOn(e)) { + if (LOG.isDebugEnabled()) LOG.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, url, e.getMessage()}); + lastError = e; + } else { + throw Exceptions.propagate(e); + } + } + attempt++; + } + LOG.warn("unable to connect to JMX url: "+url, lastError); + return false; + } + + private boolean shouldRetryOn(Exception e) { + // Expect SecurityException, IOException, etc. + // But can also see things like javax.naming.ServiceUnavailableException with WSO2 app-servers. + // So let's not try to second guess strange behaviours that future entities will exhibit. + // + // However, if it was our request that was invalid then not worth retrying. + + if (e instanceof AttributeNotFoundException) return false; + if (e instanceof InstanceAlreadyExistsException) return false; + if (e instanceof InstanceNotFoundException) return false; + if (e instanceof InvalidAttributeValueException) return false; + if (e instanceof ListenerNotFoundException) return false; + if (e instanceof MalformedObjectNameException) return false; + if (e instanceof NotCompliantMBeanException) return false; + if (e instanceof InterruptedException) return false; + if (e instanceof RuntimeInterruptedException) return false; + + return true; + } + + /** + * A thread-safe version of {@link #disconnectNow()}. + * + * This method is threadsafe. + */ + public synchronized void disconnect() { + disconnectNow(); + } + + /** + * Disconnects, preventing subsequent connections to be made. Method doesn't throw an exception. + * + * Can safely be called if already disconnected. + * + * This method is not threadsafe, but will thus not block if + * another thread is taking a long time for connections to timeout. + * + * Any concurrent requests will likely get an IOException - see + * {@linkplain http://docs.oracle.com/javase/7/docs/api/javax/management/remote/JMXConnector.html#close()}. + * + */ + public void terminate() { + terminated.set(true); + disconnectNow(); + } + + protected void disconnectNow() { + triedConnecting = false; + if (connector != null) { + if (LOG.isDebugEnabled()) LOG.debug("Disconnecting from JMX URL {}", url); + try { + connector.close(); + } catch (Exception e) { + // close attempts to connect to close cleanly; and if it can't, it throws; + // often we disconnect as part of shutdown, even if the other side has already stopped -- + // so swallow exceptions (no situations known where we need a clean closure on the remote side) + if (LOG.isDebugEnabled()) LOG.debug("Caught exception disconnecting from JMX at {} ({})", url, e.getMessage()); + if (LOG.isTraceEnabled()) LOG.trace("Details for exception disconnecting JMX", e); + } finally { + connector = null; + connection = null; + } + } + } + + /** + * Gets a usable MBeanServerConnection. + * + * Method is threadsafe. + * + * @returns the MBeanServerConnection + * @throws IllegalStateException if not connected. + */ + private synchronized MBeanServerConnection getConnectionOrFail() { + if (isConnected()) + return getConnection(); + + if (triedConnecting) { + throw new IllegalStateException("Failed to connect to JMX at "+url); + } else { + String msg = "Not connected (and not attempted to connect) to JMX at "+url+ + (failedReconnecting ? (" (last reconnect failure at "+ Time.makeDateString(failedReconnectingTime) + ")") : ""); + throw new IllegalStateException(msg); + } + } + + private T invokeWithReconnect(Callable task) { + try { + return task.call(); + } catch (Exception e) { + if (shouldRetryOn(e)) { + try { + reconnectWithRetryDampened(); + return task.call(); + } catch (Exception e2) { + throw Throwables.propagate(e2); + } + } else { + throw Throwables.propagate(e); + } + } + } + + // ====================== query related calls ======================================= + + /** + * Converts from an object name pattern to a real object name, by querying with findMBean; + * if no matching MBean can be found (or if more than one match found) then returns null. + * If the supplied object name is not a pattern then just returns that. If the + */ + public ObjectName toLiteralObjectName(ObjectName objectName) { + if (checkNotNull(objectName, "objectName").isPattern()) { + ObjectInstance bean = findMBean(objectName); + return (bean != null) ? bean.getObjectName() : null; + } else { + return objectName; + } + } + + public Set findMBeans(final ObjectName objectName) { + return invokeWithReconnect(new Callable>() { + public Set call() throws Exception { + return getConnectionOrFail().queryMBeans(objectName, null); + }}); + } + + public ObjectInstance findMBean(ObjectName objectName) { + Set beans = findMBeans(objectName); + if (beans.size() == 1) { + notFoundMBeans.remove(objectName); + return Iterables.getOnlyElement(beans); + } else { + boolean changed = notFoundMBeans.add(objectName); + + if (beans.size() > 1) { + if (changed) { + LOG.warn("JMX object name query returned {} values for {} at {}; ignoring all", + new Object[] {beans.size(), objectName.getCanonicalName(), url}); + } else { + if (LOG.isDebugEnabled()) LOG.debug("JMX object name query returned {} values for {} at {} (repeating); ignoring all", + new Object[] {beans.size(), objectName.getCanonicalName(), url}); + } + } else { + if (changed) { + LOG.warn("JMX object {} not found at {}", objectName.getCanonicalName(), url); + } else { + if (LOG.isDebugEnabled()) LOG.debug("JMX object {} not found at {} (repeating)", objectName.getCanonicalName(), url); + } + } + return null; + } + } + + public Set doesMBeanExistsEventually(final ObjectName objectName, Duration timeout) { + return doesMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + public Set doesMBeanExistsEventually(final ObjectName objectName, TimeDuration timeout) { + return doesMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + + public Set doesMBeanExistsEventually(final ObjectName objectName, long timeoutMillis) { + return doesMBeanExistsEventually(objectName, timeoutMillis, TimeUnit.MILLISECONDS); + } + + public Set doesMBeanExistsEventually(String objectName, Duration timeout) { + return doesMBeanExistsEventually(createObjectName(objectName), timeout); + } + public Set doesMBeanExistsEventually(String objectName, TimeDuration timeout) { + return doesMBeanExistsEventually(createObjectName(objectName), timeout); + } + + public Set doesMBeanExistsEventually(String objectName, long timeout, TimeUnit timeUnit) { + return doesMBeanExistsEventually(createObjectName(objectName), timeout, timeUnit); + } + + /** returns set of beans found, with retry, empty set if none after timeout */ + public Set doesMBeanExistsEventually(final ObjectName objectName, long timeout, TimeUnit timeUnit) { + final long timeoutMillis = timeUnit.toMillis(timeout); + final AtomicReference> beans = new AtomicReference>(ImmutableSet.of()); + try { + Repeater.create("Wait for "+objectName) + .limitTimeTo(timeout, timeUnit) + .every(500, TimeUnit.MILLISECONDS) + .until(new Callable() { + public Boolean call() { + connect(timeoutMillis); + beans.set(findMBeans(objectName)); + return !beans.get().isEmpty(); + }}) + .rethrowException() + .run(); + return beans.get(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + + public void assertMBeanExistsEventually(ObjectName objectName, Duration timeout) { + assertMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + public void assertMBeanExistsEventually(ObjectName objectName, TimeDuration timeout) { + assertMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + + public void assertMBeanExistsEventually(ObjectName objectName, long timeoutMillis) { + assertMBeanExistsEventually(objectName, timeoutMillis, TimeUnit.MILLISECONDS); + } + + public void assertMBeanExistsEventually(ObjectName objectName, long timeout, TimeUnit timeUnit) { + Set beans = doesMBeanExistsEventually(objectName, timeout, timeUnit); + if (beans.size() != 1) { + throw new IllegalStateException("MBean "+objectName+" not found within "+timeout+ + (beans.size() > 1 ? "; found multiple matches: "+beans : "")); + } + } + + /** + * Returns a specific attribute for a JMX {@link ObjectName}. + */ + public Object getAttribute(ObjectName objectName, final String attribute) { + final ObjectName realObjectName = toLiteralObjectName(objectName); + + if (realObjectName != null) { + Object result = invokeWithReconnect(new Callable() { + public Object call() throws Exception { + return getConnectionOrFail().getAttribute(realObjectName, attribute); + }}); + + if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx attribute {}.{}, got value {}", new Object[] {url, objectName.getCanonicalName(), attribute, result}); + return result; + } else { + return null; + } + } + + public void setAttribute(String objectName, String attribute, Object val) { + setAttribute(createObjectName(objectName), attribute, val); + } + + public void setAttribute(ObjectName objectName, final String attribute, final Object val) { + final ObjectName realObjectName = toLiteralObjectName(objectName); + + if (realObjectName != null) { + invokeWithReconnect(new Callable() { + public Void call() throws Exception { + getConnectionOrFail().setAttribute(realObjectName, new javax.management.Attribute(attribute, val)); + return null; + }}); + if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx attribute {}.{}, set value {}", new Object[] {url, objectName.getCanonicalName(), attribute, val}); + } else { + if (LOG.isDebugEnabled()) LOG.debug("From {}, cannot set attribute {}.{}, because mbean not found", new Object[] {url, objectName.getCanonicalName(), attribute}); + } + } + + /** @see #operation(ObjectName, String, Object ...) */ + public Object operation(String objectName, String method, Object... arguments) { + return operation(createObjectName(objectName), method, arguments); + } + + /** + * Executes an operation on a JMX {@link ObjectName}. + */ + public Object operation(ObjectName objectName, final String method, final Object... arguments) { + final ObjectName realObjectName = toLiteralObjectName(objectName); + final String[] signature = new String[arguments.length]; + for (int i = 0; i < arguments.length; i++) { + Class clazz = arguments[i].getClass(); + signature[i] = (CLASSES.containsKey(clazz.getSimpleName()) ? CLASSES.get(clazz.getSimpleName()) : clazz.getName()); + } + + Object result = invokeWithReconnect(new Callable() { + public Object call() throws Exception { + return getConnectionOrFail().invoke(realObjectName, method, arguments, signature); + }}); + + if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx operation {}.{}({}), got value {}", new Object[] {url, realObjectName.getCanonicalName(), method, Arrays.asList(arguments), + result}); + return result; + } + + public void addNotificationListener(String objectName, NotificationListener listener) { + addNotificationListener(createObjectName(objectName), listener, null); + } + + public void addNotificationListener(String objectName, NotificationListener listener, NotificationFilter filter) { + addNotificationListener(createObjectName(objectName), listener, filter); + } + + public void addNotificationListener(ObjectName objectName, NotificationListener listener) { + addNotificationListener(objectName, listener, null); + } + + public void addNotificationListener(final ObjectName objectName, final NotificationListener listener, final NotificationFilter filter) { + invokeWithReconnect(new Callable() { + public Void call() throws Exception { + getConnectionOrFail().addNotificationListener(objectName, listener, filter, null); + return null; + }}); + } + + public void removeNotificationListener(String objectName, NotificationListener listener) { + removeNotificationListener(createObjectName(objectName), listener); + } + + public void removeNotificationListener(final ObjectName objectName, final NotificationListener listener) { + removeNotificationListener(objectName, listener, null); + } + + public void removeNotificationListener(final ObjectName objectName, final NotificationListener listener, final NotificationFilter filter) { + if (isConnected()) invokeWithReconnect(new Callable() { + public Void call() throws Exception { + getConnectionOrFail().removeNotificationListener(objectName, listener, filter, null); + return null; + }}); + } + + public M getProxyObject(String objectName, Class mbeanInterface) { + return getProxyObject(createObjectName(objectName), mbeanInterface); + } + + public M getProxyObject(ObjectName objectName, Class mbeanInterface) { + MBeanServerConnection connection = getConnectionOrFail(); + return JMX.newMBeanProxy(connection, objectName, mbeanInterface, false); + } + + public static ObjectName createObjectName(String name) { + try { + return new ObjectName(name); + } catch (MalformedObjectNameException e) { + throw Throwables.propagate(e); + } + } + + private static void sleep(long sleepTimeMillis) { + try { + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + throw new RuntimeInterruptedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxNotificationFilters.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxNotificationFilters.java b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxNotificationFilters.java new file mode 100644 index 0000000..4b5d3b6 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxNotificationFilters.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.jmx; + +import javax.management.Notification; +import javax.management.NotificationFilter; +import javax.management.NotificationFilterSupport; + +public class JmxNotificationFilters { + + private JmxNotificationFilters() {} // instead use static utility methods + + /** + * Matches the given notification type. + * @see {@link NotificationFilterSupport#enableType(String)} + */ + public static NotificationFilter matchesType(String type) { + return matchesTypes(type); + } + + /** + * Matches any of the given notification types. + * @see {@link NotificationFilterSupport#enableType(String)} + */ + public static NotificationFilter matchesTypes(String... types) { + NotificationFilterSupport result = new NotificationFilterSupport(); + for (String type : types) { + result.enableType(type); + } + return result; + } + + /** + * @deprecated since 0.6.0; + * only works if this brooklyn class is on the classpath of the JVM that your + * subscribing to notifications on (because it tries to push the filter instance + * to that JVM). So of very limited use in real-world java processes to be managed. + * Therefore this will be deleted to avoid people hitting this surprising behaviour. + */ + @SuppressWarnings("serial") + public static NotificationFilter matchesTypeRegex(final String typeRegex) { + return new NotificationFilter() { + @Override public boolean isNotificationEnabled(Notification notif) { + return notif.getType().matches(typeRegex); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxNotificationSubscriptionConfig.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxNotificationSubscriptionConfig.java b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxNotificationSubscriptionConfig.java new file mode 100644 index 0000000..844bacb --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxNotificationSubscriptionConfig.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.jmx; + +import javax.management.MalformedObjectNameException; +import javax.management.Notification; +import javax.management.NotificationFilter; +import javax.management.ObjectName; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.feed.FeedConfig; +import org.apache.brooklyn.util.collections.MutableList; + +import com.google.common.base.Function; +import com.google.common.base.Functions; + +public class JmxNotificationSubscriptionConfig extends FeedConfig>{ + + private ObjectName objectName; + private NotificationFilter notificationFilter; + private Function onNotification; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public JmxNotificationSubscriptionConfig(AttributeSensor sensor) { + super(sensor); + onSuccess((Function)Functions.identity()); + } + + public JmxNotificationSubscriptionConfig(JmxNotificationSubscriptionConfig other) { + super(other); + this.objectName = other.objectName; + this.notificationFilter = other.notificationFilter; + this.onNotification = other.onNotification; + } + + public ObjectName getObjectName() { + return objectName; + } + + public NotificationFilter getNotificationFilter() { + return notificationFilter; + } + + public Function getOnNotification() { + return onNotification; + } + + public JmxNotificationSubscriptionConfig objectName(ObjectName val) { + this.objectName = val; return this; + } + + public JmxNotificationSubscriptionConfig objectName(String val) { + try { + return objectName(new ObjectName(val)); + } catch (MalformedObjectNameException e) { + throw new IllegalArgumentException("Invalid object name ("+val+")", e); + } + } + + public JmxNotificationSubscriptionConfig notificationFilter(NotificationFilter val) { + this.notificationFilter = val; return this; + } + + public JmxNotificationSubscriptionConfig onNotification(Function val) { + this.onNotification = val; return this; + } + + @Override + protected Object toStringPollSource() { + return objectName; + } + + @Override + protected MutableList equalsFields() { + return super.equalsFields() + .appendIfNotNull(notificationFilter).appendIfNotNull(onNotification); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxOperationPollConfig.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxOperationPollConfig.java b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxOperationPollConfig.java new file mode 100644 index 0000000..107401d --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxOperationPollConfig.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.jmx; + +import java.util.Collections; +import java.util.List; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.feed.PollConfig; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class JmxOperationPollConfig extends PollConfig>{ + + private ObjectName objectName; + private String operationName; + private List signature = Collections.emptyList(); + private List params = Collections.emptyList(); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public JmxOperationPollConfig(AttributeSensor sensor) { + super(sensor); + onSuccess((Function)Functions.identity()); + } + + public JmxOperationPollConfig(JmxOperationPollConfig other) { + super(other); + this.objectName = other.objectName; + this.operationName = other.operationName; + this.signature = other.signature != null ? ImmutableList.copyOf(other.signature) : null; + this.params = other.params != null ? ImmutableList.copyOf(other.params) : null; + } + + public ObjectName getObjectName() { + return objectName; + } + + public String getOperationName() { + return operationName; + } + + public List getSignature() { + return signature; + } + + public List getParams() { + return params; + } + + public JmxOperationPollConfig objectName(ObjectName val) { + this.objectName = val; return this; + } + + public JmxOperationPollConfig objectName(String val) { + try { + return objectName(new ObjectName(val)); + } catch (MalformedObjectNameException e) { + throw new IllegalArgumentException("Invalid object name ("+val+")", e); + } + } + + public JmxOperationPollConfig operationName(String val) { + this.operationName = val; return this; + } + + public JmxOperationPollConfig operationSignature(List val) { + this.signature = val; return this; + } + + public JmxOperationPollConfig operationParams(List val) { + this.params = val; return this; + } + + public List buildOperationIdentity() { + // FIXME Have a build() method for ensuring signature is set, and making class subsequently immutable? + return ImmutableList.of(operationName, buildSignature(), params); + } + + private List buildSignature() { + if (signature != null && signature.size() == params.size()) { + return signature; + } else { + List derivedSignature = Lists.newLinkedList(); + for (Object param : params) { + Class clazz = (param != null) ? param.getClass() : null; + String clazzName = (clazz != null) ? + (JmxHelper.CLASSES.containsKey(clazz.getSimpleName()) ? + JmxHelper.CLASSES.get(clazz.getSimpleName()) : clazz.getName()) : + Object.class.getName(); + derivedSignature.add(clazzName); + } + return derivedSignature; + } + } + + @Override protected String toStringBaseName() { return "jmx"; } + @Override protected String toStringPollSource() { return objectName+":"+operationName+(params!=null ? params : "[]"); } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxValueFunctions.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxValueFunctions.java b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxValueFunctions.java new file mode 100644 index 0000000..5741099 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxValueFunctions.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.jmx; + +import java.util.List; +import java.util.Map; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Maps; + +public class JmxValueFunctions { + + private static final Logger log = LoggerFactory.getLogger(JmxValueFunctions.class); + + /** + * @return a closure that converts a TabularDataSupport to a map. + */ + public static Function tabularDataToMap() { + return new Function() { + @Override public Map apply(TabularData input) { + return tabularDataToMap(input); + }}; + } + + public static Function tabularDataToMapOfMaps() { + return new Function() { + @Override public Map apply(TabularData input) { + return tabularDataToMapOfMaps(input); + }}; + } + + public static Function compositeDataToMap() { + return new Function() { + @Override public Map apply(CompositeData input) { + return compositeDataToMap(input); + }}; + } + + public static Map tabularDataToMap(TabularData table) { + Map result = Maps.newLinkedHashMap(); + for (Object entry : table.values()) { + CompositeData data = (CompositeData) entry; //.getValue() + for (String key : data.getCompositeType().keySet()) { + Object old = result.put(key, data.get(key)); + if (old != null) { + log.warn("tablularDataToMap has overwritten key {}", key); + } + } + } + return result; + } + + public static Map, Map> tabularDataToMapOfMaps(TabularData table) { + Map, Map> result = Maps.newLinkedHashMap(); + for (Object k : table.keySet()) { + final Object[] kValues = ((List)k).toArray(); + CompositeData v = (CompositeData) table.get(kValues); + result.put((List)k, compositeDataToMap(v)); + } + return result; + } + + public static Map compositeDataToMap(CompositeData data) { + Map result = Maps.newLinkedHashMap(); + for (String key : data.getCompositeType().keySet()) { + Object old = result.put(key, data.get(key)); + if (old != null) { + log.warn("compositeDataToMap has overwritten key {}", key); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxAttributePollConfig.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxAttributePollConfig.java b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxAttributePollConfig.java deleted file mode 100644 index 02bbbeb..0000000 --- a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxAttributePollConfig.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.jmx; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.feed.PollConfig; - -import com.google.common.base.Function; -import com.google.common.base.Functions; - -public class JmxAttributePollConfig extends PollConfig>{ - - private ObjectName objectName; - private String attributeName; - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public JmxAttributePollConfig(AttributeSensor sensor) { - super(sensor); - onSuccess((Function)Functions.identity()); - } - - public JmxAttributePollConfig(JmxAttributePollConfig other) { - super(other); - this.objectName = other.objectName; - this.attributeName = other.attributeName; - } - - public ObjectName getObjectName() { - return objectName; - } - - public String getAttributeName() { - return attributeName; - } - - public JmxAttributePollConfig objectName(ObjectName val) { - this.objectName = val; return this; - } - - public JmxAttributePollConfig objectName(String val) { - try { - return objectName(new ObjectName(val)); - } catch (MalformedObjectNameException e) { - throw new IllegalArgumentException("Invalid object name ("+val+")", e); - } - } - - public JmxAttributePollConfig attributeName(String val) { - this.attributeName = val; return this; - } - - @Override protected String toStringBaseName() { return "jmx"; } - @Override protected String toStringPollSource() { return objectName+":"+attributeName; } - -}