Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5AEF110DD3 for ; Wed, 19 Mar 2014 02:06:42 +0000 (UTC) Received: (qmail 46748 invoked by uid 500); 19 Mar 2014 02:06:40 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 46703 invoked by uid 500); 19 Mar 2014 02:06:40 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 46688 invoked by uid 99); 19 Mar 2014 02:06:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Mar 2014 02:06:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1EB9298396D; Wed, 19 Mar 2014 02:06:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zzhang@apache.org To: commits@helix.apache.org Date: Wed, 19 Mar 2014 02:06:40 -0000 Message-Id: <279d47410d5c45659d0a655c3b4c4e4a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: [HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server [HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c0b1780d Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c0b1780d Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c0b1780d Branch: refs/heads/helix-monitoring Commit: c0b1780dc472ac3234dbbb6488015211c3e66ed2 Parents: db4c10a Author: zzhang Authored: Tue Mar 18 19:06:19 2014 -0700 Committer: zzhang Committed: Tue Mar 18 19:06:19 2014 -0700 ---------------------------------------------------------------------- .../helix/monitoring/MonitoringClient.java | 12 +- .../helix/monitoring/MonitoringEvent.java | 96 +++- .../org/apache/helix/MonitoringTestHelper.java | 114 ---- .../monitoring/RiemannMonitoringClient.java | 555 ------------------- .../helix/monitoring/riemann/ClientUtil.java | 69 +++ .../monitoring/riemann/RawRiemannClient.java | 229 ++++++++ .../riemann/RiemannClientWrapper.java | 234 ++++++++ helix-monitor-server/pom.xml | 5 - .../apache/helix/monitoring/RiemannAgent.java | 137 ----- .../monitoring/RiemannAgentStateModel.java | 54 -- .../RiemannAgentStateModelFactory.java | 29 - .../helix/monitoring/RiemannAlertProxy.java | 111 ---- .../apache/helix/monitoring/RiemannConfigs.java | 116 ---- .../monitoring/RiemannMonitoringServer.java | 73 --- .../monitoring/riemann/HelixAlertMessenger.java | 112 ++++ .../helix/monitoring/riemann/RiemannAgent.java | 169 ++++++ .../monitoring/riemann/RiemannConfigs.java | 116 ++++ .../riemann/RiemannMonitoringServer.java | 74 +++ .../helix/monitoring/IntegrationTest.java | 206 ------- .../helix/monitoring/MonitoringTestHelper.java | 136 +++++ .../monitoring/TestClientServerMonitoring.java | 188 ------- .../helix/monitoring/TestRiemannAgent.java | 127 ----- .../helix/monitoring/TestRiemannAlertProxy.java | 105 ---- .../monitoring/TestRiemannMonitoringServer.java | 78 --- .../monitoring/riemann/IntegrationTest.java | 199 +++++++ .../riemann/TestClientServerMonitoring.java | 181 ++++++ .../riemann/TestHelixAlertMessenger.java | 110 ++++ .../monitoring/riemann/TestRiemannAgent.java | 117 ++++ .../riemann/TestRiemannClientWrapper.java | 134 +++++ .../riemann/TestRiemannMonitoringServer.java | 82 +++ 30 files changed, 2047 insertions(+), 1921 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java index 743f8b4..a055354 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java @@ -21,8 +21,6 @@ package org.apache.helix.monitoring; import java.util.concurrent.TimeUnit; -import org.apache.helix.api.id.ResourceId; - /** * Interface for a client that can register with a monitoring server and send events for monitoring */ @@ -40,30 +38,26 @@ public interface MonitoringClient { /** * Send an event - * @param resource * @param e the event - * @param batch true if this should be part of a batch operation * @return true if the event was sent (or queued for batching), false otherwise */ - boolean send(ResourceId resource, MonitoringEvent e, boolean batch); + boolean send(MonitoringEvent e); /** * Send an event and flush any outstanding messages - * @param resource * @param e the event * @return true if events were successfully sent, false otherwise */ - boolean sendAndFlush(ResourceId resource, MonitoringEvent e); + boolean sendAndFlush(MonitoringEvent e); /** * Schedule an operation to run - * @param resource * @param interval the frequency * @param delay the amount of time to wait before the first execution * @param unit the unit of time to use * @param r the code to run */ - void every(ResourceId resource, long interval, long delay, TimeUnit unit, Runnable r); + void every(long interval, long delay, TimeUnit unit, Runnable r); /** * Check if there is a valid connection to a monitoring server http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java index 80006fb..2044a3a 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java @@ -19,10 +19,14 @@ package org.apache.helix.monitoring; * under the License. */ +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.helix.api.Scope; +import org.apache.helix.api.Scope.ScopeType; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.ControllerId; import org.apache.helix.api.id.ParticipantId; @@ -32,6 +36,7 @@ import org.apache.helix.api.id.SpectatorId; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * A generic monitoring event based on Helix constructs. This is based on Riemann's EventDSL. @@ -49,8 +54,10 @@ public class MonitoringEvent { private Float _floatMetric; private Double _doubleMetric; private Float _ttl; - private List _tags; - private Map _attributes; + private final List _tags; + private final Map _attributes; + private String _shardingStr; + private final Set _shardingScopes; /** * Create an empty MonitoringEvent @@ -70,6 +77,8 @@ public class MonitoringEvent { _ttl = null; _tags = Lists.newLinkedList(); _attributes = Maps.newHashMap(); + _shardingStr = null; + _shardingScopes = Sets.newHashSet(); } /** @@ -253,13 +262,69 @@ public class MonitoringEvent { return this; } - // below are a set of package-private getters for each of the fields + /** + * Set sharding key using string + * @param shardingStr + * @return MonitoringEvent + */ + public MonitoringEvent shardingString(String shardingStr) { + _shardingStr = shardingStr; + return this; + } + + /** + * Set sharding key using scopes + * @param scopes + * @return MonitoringEvent + */ + public MonitoringEvent shardingScopes(ScopeType... scopes) { + _shardingScopes.clear(); + _shardingScopes.addAll(Arrays.asList(scopes)); + return this; + } + + /** + * Return sharding key which is used by MonitoringClient to choose MonitoringServer + * @return sharding key + */ + public String shardingKey() { + // if shardingStr exists, use shardingStr + if (_shardingStr != null) { + return _shardingStr; + } + + // if shardingStr doesn't exist, use shardingScopes + if (_shardingScopes.isEmpty()) { + _shardingScopes.addAll(Arrays.asList(ScopeType.CLUSTER, ScopeType.RESOURCE)); + } - String host() { + StringBuilder sb = new StringBuilder(); + if (_shardingScopes.contains(ScopeType.CLUSTER)) { + sb.append(_clusterId == null ? "%" : _clusterId.stringify()); + } + if (_shardingScopes.contains(ScopeType.RESOURCE)) { + sb.append("|"); + sb.append(_resourceId == null ? "%" : _resourceId.stringify()); + } + if (_shardingScopes.contains(ScopeType.PARTITION)) { + sb.append("|"); + sb.append(_partitionId == null ? "%" : _partitionId.stringify()); + } + if (_shardingScopes.contains(ScopeType.PARTICIPANT)) { + sb.append("|"); + sb.append(_host == null ? "%" : _host); + } + + return sb.toString(); + } + + // below are used for converting MonitoringEvent to Riemann EventDSL + + public String host() { return _host; } - String service() { + public String service() { if (_clusterId == null) { _clusterId = ClusterId.from("%"); } @@ -269,42 +334,45 @@ public class MonitoringEvent { if (_partitionId == null) { _partitionId = PartitionId.from("%"); } + if (_name == null) { + _name = "%"; + } return String.format("%s|%s|%s|%s", _clusterId, _resourceId, _partitionId, _name); } - String eventState() { + public String eventState() { return _eventState; } - String description() { + public String description() { return _description; } - Long time() { + public Long time() { return _time; } - Long longMetric() { + public Long longMetric() { return _longMetric; } - Float floatMetric() { + public Float floatMetric() { return _floatMetric; } - Double doubleMetric() { + public Double doubleMetric() { return _doubleMetric; } - Float ttl() { + public Float ttl() { return _ttl; } - List tags() { + public List tags() { return _tags; } - Map attributes() { + public Map attributes() { return _attributes; } } http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java b/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java deleted file mode 100644 index c17dab0..0000000 --- a/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java +++ /dev/null @@ -1,114 +0,0 @@ -package org.apache.helix; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.net.ServerSocket; - -import org.I0Itec.zkclient.NetworkUtil; - -public class MonitoringTestHelper { - static final int MAX_PORT = 65535; - - /** - * generate a default riemann.config - * @param riemannPort - * @return - */ - public static String getRiemannConfigString(int riemannPort) { - StringBuilder sb = new StringBuilder(); - sb.append("(logging/init :file \"/dev/null\")\n\n") - .append("(tcp-server :host \"0.0.0.0\" :port " + riemannPort + ")\n\n") - .append("(instrumentation {:interval 1})\n\n") - .append("; (udp-server :host \"0.0.0.0\")\n") - .append("; (ws-server :host \"0.0.0.0\")\n") - .append("; (repl-server :host \"0.0.0.0\")\n\n") - .append("(periodically-expire 1)\n\n") - .append( - "(let [index (default :ttl 3 (update-index (index)))]\n (streams\n (expired prn)\n index))\n"); - - return sb.toString(); - } - - /** - * generate a test config for checking latency - * @param proxyPort - * @return - */ - public static String getLatencyCheckConfigString(int proxyPort) - { - StringBuilder sb = new StringBuilder(); - sb.append("(require 'riemann.config)\n") - .append("(require 'clj-http.client)\n\n") - .append("(defn parse-double\n \"Convert a string into a double\"\n ") - .append("[instr]\n (Double/parseDouble instr))\n\n") - .append("(defn check-95th-latency\n \"Check if the 95th percentile latency is within expectations\"\n ") - .append("[e]\n (let [latency (parse-double (:latency95 e))]\n ") - .append("(if (> latency 1.0) \n ; Report if the 95th percentile latency exceeds 1.0s\n ") - .append("(do (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency)\n ") - .append("(let [alert-name-str (str \"(\" (:cluster e) \".%.\" (:host e) \")(latency95)>(1000)\" )\n ") - .append("proxy-url (str \"http://localhost:\" " + proxyPort + " )]\n ") - .append("(clj-http.client/post proxy-url {:body alert-name-str }))))))\n\n") - .append("(streams\n (where\n ; Only process services containing LatencyReport\n ") - .append("(and (service #\".*LatencyReport.*\") (not (state \"expired\")))\n ") - .append("check-95th-latency))\n"); - - return sb.toString(); - } - - /** - * find an available tcp port - * @return - */ - public static int availableTcpPort() { - ServerSocket ss = null; - try { - ss = new ServerSocket(0); - ss.setReuseAddress(true); - return ss.getLocalPort(); - } catch (IOException e) { - // ok - } finally { - if (ss != null) { - try { - ss.close(); - } catch (IOException e) { - // should not be thrown - } - } - } - return -1; - } - - /** - * find the first available port starting from startPort inclusive - * @param startPort - * @return - */ - public static int availableTcpPort(int startPort) { - int port = startPort; - for (; port <= MAX_PORT; port++) { - if (NetworkUtil.isPortFree(port)) - break; - } - - return port > MAX_PORT ? -1 : port; - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java ---------------------------------------------------------------------- diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java deleted file mode 100644 index 591bcb9..0000000 --- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java +++ /dev/null @@ -1,555 +0,0 @@ -package org.apache.helix.monitoring; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.api.id.ClusterId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.spectator.RoutingTableProvider; -import org.apache.log4j.Logger; - -import com.aphyr.riemann.client.AbstractRiemannClient; -import com.aphyr.riemann.client.EventDSL; -import com.aphyr.riemann.client.RiemannBatchClient; -import com.aphyr.riemann.client.RiemannClient; -import com.aphyr.riemann.client.UnsupportedJVMException; -import com.google.common.collect.Lists; - -/** - * A Riemann-based monitoring client Thread safety note: connect and disconnect are serialized to - * ensure that there is no attempt to connect or disconnect with an inconsistent state. The send - * routines are not protected for performance reasons, and so a single send/flush may fail. - */ -public class RiemannMonitoringClient implements MonitoringClient { - private static final Logger LOG = Logger.getLogger(RiemannMonitoringClient.class); - public static final String DEFAULT_MONITORING_SERVICE_NAME = "MonitoringService"; - - /** - * Contains information about a RiemannClient inside a MonitoringClient - */ - class MonitoringClientInfo { - /** - * host/port of riemann server to which this client connects - */ - String _host; - int _port; - - /** - * riemann client - */ - RiemannClient _client; - - /** - * batch rieman client, null if batch is not enabled - */ - RiemannBatchClient _batchClient; - - /** - * list of periodic tasks scheduled on this riemann client - */ - final List _scheduledItems; - - public MonitoringClientInfo() { - _host = null; - _port = -1; - _client = null; - _batchClient = null; - _scheduledItems = Lists.newArrayList(); - } - - } - - private int _batchSize; - private final ResourceId _monitoringServiceName; - private final ClusterId _monitoringCluster; - private int _monitoringServicePartitionNum; - - private final HelixManager _spectator; - private final RoutingTableProvider _routingTableProvider; - private final Map _clientMap; - - /** - * Create a non-batched monitoring client - * @param zkAddr - * @param monitoringClusterId - */ - public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId) { - this(zkAddr, monitoringClusterId, ResourceId.from(DEFAULT_MONITORING_SERVICE_NAME), 1); - } - - /** - * Create a monitoring client that supports batching - * @param clusterId - * the cluster to monitor - * @param accessor - * an accessor for the cluster - * @param batchSize - * the number of events in a batch - * @throws Exception - */ - public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId, - ResourceId monitoringServiceName, int batchSize) { - _batchSize = batchSize > 0 ? batchSize : 1; - _monitoringServiceName = monitoringServiceName; - _monitoringCluster = monitoringClusterId; - _monitoringServicePartitionNum = 0; - _clientMap = new ConcurrentHashMap(); - - _spectator = - HelixManagerFactory.getZKHelixManager(monitoringClusterId.stringify(), null, - InstanceType.SPECTATOR, zkAddr); - _routingTableProvider = new RoutingTableProvider(); - } - - @Override - public void connect() throws Exception { - if (isConnected()) { - LOG.error("Already connected to Riemann!"); - return; - } - - // Connect spectator to the cluster being monitored - _spectator.connect(); - _spectator.addExternalViewChangeListener(_routingTableProvider); - - // Get partition number of monitoring service - HelixDataAccessor accessor = _spectator.getHelixDataAccessor(); - IdealState idealState = - accessor.getProperty(accessor.keyBuilder().idealStates(_monitoringServiceName.stringify())); - if (idealState == null) { - throw new IllegalArgumentException("Resource for MonitoringService: " - + _monitoringServiceName + " doesn't exist in cluster: " + _monitoringCluster); - } - - _monitoringServicePartitionNum = idealState.getNumPartitions(); - - if (_monitoringServicePartitionNum <= 0) { - throw new IllegalArgumentException("Invalid partition number of MonitoringService: " - + _monitoringServiceName + " in cluster: " + _monitoringCluster + ", was " - + _monitoringServicePartitionNum); - } - } - - @Override - public void disconnect() { - // disconnect internal riemann clients - for (ResourceId resource : _clientMap.keySet()) { - disconnectInternal(resource); - } - - _spectator.disconnect(); - _monitoringServicePartitionNum = 0; - } - - @Override - public boolean isConnected() { - return _spectator.isConnected(); - } - - /** - * Flush a riemann client for a resource - * @param resource - * @return - */ - private boolean flush(ResourceId resource) { - if (!isConnected()) { - LOG.error("Tried to flush a Riemann client that is not connected!"); - return false; - } - - AbstractRiemannClient c = getClient(resource, true); - if (c == null) { - LOG.warn("Fail to get riemann client for resource: " + resource); - return false; - } - - try { - c.flush(); - return true; - } catch (IOException e) { - LOG.error("Problem flushing the Riemann event queue for resource: " + resource, e); - } - return false; - } - - @Override - public boolean flush() { - boolean succeed = true; - for (ResourceId resource : _clientMap.keySet()) { - succeed = succeed && flush(resource); - } - - return succeed; - } - - @Override - public boolean send(ResourceId resource, MonitoringEvent event, boolean batch) { - if (!isConnected()) { - LOG.error("Riemann connection must be active in order to send an event!"); - return false; - } - - if (!isConnected(resource)) { - connect(resource, null, event); - } else { - AbstractRiemannClient c = getClient(resource, batch); - convertEvent(c, event).send(); - } - - return true; - } - - @Override - public boolean sendAndFlush(ResourceId resource, MonitoringEvent event) { - - boolean sendResult = send(resource, event, true); - if (sendResult) { - return flush(resource); - } - return false; - } - - /** - * Batch should be enabled for either all or none of riemann clients - */ - @Override - public boolean isBatchingEnabled() { - return _batchSize > 1; - } - - @Override - public int getBatchSize() { - return _batchSize; - } - - /** - * Check if a riemann client for given resource is connected - * @param resource - * @return true if riemann client is connected, false otherwise - */ - private boolean isConnected(ResourceId resource) { - if (!isConnected()) { - return false; - } - - MonitoringClientInfo clientInfo = _clientMap.get(resource); - return clientInfo != null && clientInfo._client != null && clientInfo._client.isConnected(); - } - - @Override - public synchronized void every(ResourceId resource, long interval, long delay, TimeUnit unit, - Runnable r) { - if (!isConnected()) { - LOG.error("Riemann client must be connected in order to send events!"); - return; - } - - ScheduledItem scheduledItem = new ScheduledItem(); - scheduledItem.interval = interval; - scheduledItem.delay = delay; - scheduledItem.unit = unit; - scheduledItem.r = r; - - if (isConnected(resource)) { - MonitoringClientInfo clientInfo = _clientMap.get(resource); - clientInfo._scheduledItems.add(scheduledItem); - getClient(resource).every(interval, delay, unit, r); - } else { - connect(resource, scheduledItem, null); - } - } - - /** - * Connect a riemann client to riemann server given a resource - * @param resource - * @param scheduledItem - * @param pendingEvent - */ - private void connect(ResourceId resource, ScheduledItem scheduledItem, - MonitoringEvent pendingEvent) { - // Hash by resourceId - int partitionKey = resource.hashCode() % _monitoringServicePartitionNum; - List instances = - _routingTableProvider.getInstances(_monitoringServiceName.stringify(), - _monitoringServiceName + "_" + partitionKey, "ONLINE"); - - if (instances.size() == 0) { - LOG.error("Riemann monitoring server for resource: " + resource + " at partitionKey: " - + partitionKey + " is not available"); - return; - } - - InstanceConfig instanceConfig = instances.get(0); - String host = instanceConfig.getHostName(); - int port = Integer.parseInt(instanceConfig.getPort()); - - // Do the connect asynchronously as a tcp establishment could take time - doConnectAsync(resource, host, port, scheduledItem, pendingEvent); - } - - /** - * Get a raw, non-batched Riemann client. WARNING: do not cache this, as it may be disconnected - * without notice - * @return RiemannClient - */ - private RiemannClient getClient(ResourceId resource) { - MonitoringClientInfo clientInfo = _clientMap.get(resource); - return clientInfo == null ? null : clientInfo._client; - } - - /** - * Get a batched Riemann client (if batching is supported) WARNING: do not cache this, as it may - * be disconnected without notice - * @return RiemannBatchClient - */ - private RiemannBatchClient getBatchClient(ResourceId resource) { - MonitoringClientInfo clientInfo = _clientMap.get(resource); - return clientInfo == null ? null : clientInfo._batchClient; - } - - /** - * Get a Riemann client WARNING: do not cache this, as it may be disconnected without notice - * @param batch - * true if the client is preferred to support batching, false otherwise - * @return AbstractRiemannClient - */ - private AbstractRiemannClient getClient(ResourceId resource, boolean batch) { - if (batch && isBatchingEnabled()) { - return getBatchClient(resource); - } else { - return getClient(resource); - } - } - - /** - * Based on the contents of the leader node, connect to a Riemann server - * @param leader - * node containing host/port - */ - private void doConnectAsync(final ResourceId resource, final String host, final int port, - final ScheduledItem scheduledItem, final MonitoringEvent pendingEvent) { - new Thread() { - @Override - public void run() { - synchronized (RiemannMonitoringClient.this) { - if (resource != null && host != null && port != -1) { - connectInternal(resource, host, port, scheduledItem, pendingEvent); - } else { - LOG.error("Fail to doConnectAsync becaue of invalid arguments, resource: " + resource - + ", host: " + host + ", port: " + port); - } - } - } - }.start(); - } - - /** - * Establishment of a connection to a Riemann server - * @param resource - * @param host - * @param port - * @param scheduledItem - * @param pendingEvent - */ - private synchronized void connectInternal(ResourceId resource, String host, int port, - ScheduledItem scheduledItem, MonitoringEvent pendingEvent) { - MonitoringClientInfo clientInfo = _clientMap.get(resource); - if (clientInfo != null && clientInfo._host.equals(host) && clientInfo._port == port - && clientInfo._client != null && clientInfo._client.isConnected()) { - LOG.info("Riemann client for resource: " + resource + " already connected on " + host + ":" - + port); - - // We might have to reschedule tasks - if (scheduledItem != null) { - clientInfo._scheduledItems.add(scheduledItem); - clientInfo._client.every(scheduledItem.interval, scheduledItem.delay, scheduledItem.unit, - scheduledItem.r); - } - - // Sending over pending event - if (pendingEvent != null) { - convertEvent(clientInfo._client, pendingEvent).send(); - } - - return; - } - - // Disconnect from previous riemann server - disconnectInternal(resource); - - // Connect to new riemann server - RiemannClient client = null; - RiemannBatchClient batchClient = null; - try { - client = RiemannClient.tcp(host, port); - client.connect(); - } catch (IOException e) { - LOG.error("Error establishing a connection!", e); - - } - - if (client != null && getBatchSize() > 1) { - try { - batchClient = new RiemannBatchClient(_batchSize, client); - } catch (UnknownHostException e) { - _batchSize = 1; - LOG.error("Could not resolve host", e); - } catch (UnsupportedJVMException e) { - _batchSize = 1; - LOG.warn("Batching not enabled because of incompatible JVM", e); - } - } - - if (clientInfo == null) { - clientInfo = new MonitoringClientInfo(); - } - - clientInfo._host = host; - clientInfo._port = port; - clientInfo._client = client; - clientInfo._batchClient = batchClient; - if (scheduledItem != null) { - clientInfo._scheduledItems.add(scheduledItem); - } - _clientMap.put(resource, clientInfo); - - // We might have to reschedule tasks - for (ScheduledItem item : clientInfo._scheduledItems) { - client.every(item.interval, item.delay, item.unit, item.r); - } - - // Send over pending event - if (pendingEvent != null) { - convertEvent(client, pendingEvent).send(); - } - } - - /** - * Teardown of a connection to a Riemann server - */ - private synchronized void disconnectInternal(ResourceId resource) { - MonitoringClientInfo clientInfo = _clientMap.get(resource); - if (clientInfo == null) { - return; - } - - RiemannBatchClient batchClient = clientInfo._batchClient; - RiemannClient client = clientInfo._client; - - clientInfo._batchClient = null; - clientInfo._client = null; - - try { - if (batchClient != null && batchClient.isConnected()) { - batchClient.scheduler().shutdown(); - batchClient.disconnect(); - } else if (client != null && client.isConnected()) { - client.scheduler().shutdown(); - client.disconnect(); - } - } catch (IOException e) { - LOG.error("Disconnection error", e); - } - } - - /** - * Change a helix monitoring event into a Riemann event - * @param c Riemann client - * @param helixEvent helix event - * @return Riemann EventDSL - */ - private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) { - EventDSL event = c.event(); - if (helixEvent.host() != null) { - event.host(helixEvent.host()); - } - if (helixEvent.service() != null) { - event.service(helixEvent.service()); - } - if (helixEvent.eventState() != null) { - event.state(helixEvent.eventState()); - } - if (helixEvent.description() != null) { - event.description(helixEvent.description()); - } - if (helixEvent.time() != null) { - event.time(helixEvent.time()); - } - if (helixEvent.ttl() != null) { - event.ttl(helixEvent.ttl()); - } - if (helixEvent.longMetric() != null) { - event.metric(helixEvent.longMetric()); - } else if (helixEvent.floatMetric() != null) { - event.metric(helixEvent.floatMetric()); - } else if (helixEvent.doubleMetric() != null) { - event.metric(helixEvent.doubleMetric()); - } - if (!helixEvent.tags().isEmpty()) { - event.tags(helixEvent.tags()); - } - if (!helixEvent.attributes().isEmpty()) { - event.attributes.putAll(helixEvent.attributes()); - } - return event; - } - - /** - * Wrapper for a task that should be run to a schedule - */ - private static class ScheduledItem { - long interval; - long delay; - TimeUnit unit; - Runnable r; - - @Override - public boolean equals(Object other) { - if (other instanceof ScheduledItem) { - ScheduledItem that = (ScheduledItem) other; - return interval == that.interval && delay == that.delay && unit == that.unit && r == that.r; - } - return false; - } - - @Override - public int hashCode() { - return toString().hashCode(); - } - - @Override - public String toString() { - return String.format("interval: %d|delay: %d|timeunit: %s|runnable: %s", interval, delay, - unit.toString(), r.toString()); - } - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java ---------------------------------------------------------------------- diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java new file mode 100644 index 0000000..0c12ca3 --- /dev/null +++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java @@ -0,0 +1,69 @@ +package org.apache.helix.monitoring.riemann; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.monitoring.MonitoringEvent; + +import com.aphyr.riemann.client.AbstractRiemannClient; +import com.aphyr.riemann.client.EventDSL; + +public class ClientUtil { + /** + * Change a helix monitoring event into a Riemann event + * @param c Riemann client + * @param helixEvent helix event + * @return Riemann EventDSL + */ + public static EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) { + EventDSL event = c.event(); + if (helixEvent.host() != null) { + event.host(helixEvent.host()); + } + if (helixEvent.service() != null) { + event.service(helixEvent.service()); + } + if (helixEvent.eventState() != null) { + event.state(helixEvent.eventState()); + } + if (helixEvent.description() != null) { + event.description(helixEvent.description()); + } + if (helixEvent.time() != null) { + event.time(helixEvent.time()); + } + if (helixEvent.ttl() != null) { + event.ttl(helixEvent.ttl()); + } + if (helixEvent.longMetric() != null) { + event.metric(helixEvent.longMetric()); + } else if (helixEvent.floatMetric() != null) { + event.metric(helixEvent.floatMetric()); + } else if (helixEvent.doubleMetric() != null) { + event.metric(helixEvent.doubleMetric()); + } + if (!helixEvent.tags().isEmpty()) { + event.tags(helixEvent.tags()); + } + if (!helixEvent.attributes().isEmpty()) { + event.attributes.putAll(helixEvent.attributes()); + } + return event; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java ---------------------------------------------------------------------- diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java new file mode 100644 index 0000000..4fe5902 --- /dev/null +++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java @@ -0,0 +1,229 @@ +package org.apache.helix.monitoring.riemann; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.monitoring.MonitoringEvent; +import org.apache.log4j.Logger; + +import com.aphyr.riemann.client.AbstractRiemannClient; +import com.aphyr.riemann.client.RiemannBatchClient; +import com.aphyr.riemann.client.RiemannClient; +import com.aphyr.riemann.client.UnsupportedJVMException; + +/** + * Simple wrapper around RiemannClient that does auto reconnect + */ +class RawRiemannClient { + private static final Logger LOG = Logger.getLogger(RawRiemannClient.class); + private static final int HEARTBEAT_PERIOD = 10; + private static final int TIMEOUT_LIMIT = 3; + + enum State { + DISCONNECTED, + CONNECTED, + RECONNECTING + } + + private RiemannClient _rclient; + private RiemannBatchClient _brclient; + private volatile State _state = State.DISCONNECTED; + private final String _host; + private final int _port; + private int _batchSize; + private Thread _reconnectThread; + + public RawRiemannClient(String host, int port) { + this(host, port, 1); + } + + public RawRiemannClient(String host, int port, int batchSize) { + _host = host; + _port = port; + _batchSize = batchSize > 0 ? batchSize : 1; + } + + private synchronized boolean doConnect() { + if (_state == State.CONNECTED) { + return true; + } + + try { + _rclient = RiemannClient.tcp(_host, _port); + _rclient.connect(); + if (_rclient != null && _batchSize > 1) { + try { + _brclient = new RiemannBatchClient(_batchSize, _rclient); + } catch (UnknownHostException e) { + _batchSize = 1; + LOG.error("Could not resolve host", e); + } catch (UnsupportedJVMException e) { + _batchSize = 1; + LOG.warn("Batching not enabled because of incompatible JVM", e); + } + } + + Random random = new Random(); + _rclient.every(HEARTBEAT_PERIOD, random.nextInt(HEARTBEAT_PERIOD), TimeUnit.SECONDS, + new Runnable() { + + @Override + public void run() { + try { + _rclient.event().service("heartbeat").ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD) + .sendWithAck(); + _state = State.CONNECTED; + } catch (Exception e) { + LOG.error("Exception in send heatbeat to riemann server: " + _host + ":" + _port, e); + _state = State.RECONNECTING; + } + } + }); + _state = State.CONNECTED; + return true; + } catch (IOException e) { + LOG.error("Fail to connect to riemann server: " + _host + ":" + _port); + } + + return false; + } + + /** + * Make a connection to Riemann server; if fails, start a thread for retrying + */ + public synchronized void connect() { + boolean success = doConnect(); + if (!success) { + _reconnectThread = new Thread(new Runnable() { + + @Override + public void run() { + LOG.info("Start reconnect thread"); + Random random = new Random(); + try { + while (!Thread.currentThread().isInterrupted()) { + boolean success = doConnect(); + if (success) { + break; + } + + TimeUnit.SECONDS.sleep(HEARTBEAT_PERIOD + random.nextInt() % HEARTBEAT_PERIOD); + + } + } catch (InterruptedException e) { + LOG.info("Reconnect thread is interrupted"); + } finally { + LOG.info("Terminate reconnect thread"); + } + } + }); + + _reconnectThread.start(); + } + } + + /** + * Disconnect from Riemann server + */ + public synchronized void disconnect() { + try { + if (_reconnectThread != null) { + _reconnectThread.interrupt(); + } + + if (_rclient != null) { + _rclient.scheduler().shutdown(); + _rclient.disconnect(); + } + } catch (IOException e) { + LOG.error("Fail to disconnect rclient for " + _host + ":" + _port, e); + } + _state = State.DISCONNECTED; + } + + public boolean isConnected() { + return _state == State.CONNECTED; + } + + private AbstractRiemannClient client() { + if (isBatchEnabled()) { + return _brclient; + } else { + return _rclient; + } + } + + public boolean send(MonitoringEvent event) { + if (!isConnected()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Fail to send event: " + event + " to " + _host + ":" + _port + + ", because state is not connected, was: " + _state); + } + return false; + } + + try { + ClientUtil.convertEvent(client(), event).send(); + return true; + } catch (Exception e) { + LOG.error("Fail to send event: " + event + " to " + _host + ":" + _port, e); + } + return false; + } + + public boolean flush() { + if (!isConnected()) { + return false; + } + + try { + client().flush(); + return true; + } catch (IOException e) { + LOG.error("Problem flushing the Riemann event queue", e); + } + return false; + } + + public boolean sendAndFlush(MonitoringEvent event) { + boolean success = send(event); + if (success) { + return flush(); + } + + return false; + } + + public int getBatchSize() { + return _batchSize; + } + + public boolean isBatchEnabled() { + return _batchSize > 1; + } + + public State getState() { + return _state; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java ---------------------------------------------------------------------- diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java new file mode 100644 index 0000000..bd6517f --- /dev/null +++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java @@ -0,0 +1,234 @@ +package org.apache.helix.monitoring.riemann; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.monitoring.MonitoringClient; +import org.apache.helix.monitoring.MonitoringEvent; +import org.apache.log4j.Logger; + +/** + * Wrapper around a list of RawRiemannClients; if one client fails, try the next one in list + */ +public class RiemannClientWrapper implements MonitoringClient { + private static final Logger LOG = Logger.getLogger(RiemannClientWrapper.class); + + /** + * A list of "host:port" addresses for Riemann servers + */ + private final List _rsHosts; + private boolean _isConnected; + private List _rclients; + private int _batchSize; + + private ScheduledThreadPoolExecutor _pool; + + public RiemannClientWrapper(List rsHosts) { + this(rsHosts, 1); + } + + public RiemannClientWrapper(List rsHosts, int batchSize) { + _rsHosts = rsHosts; + Collections.sort(_rsHosts); + _batchSize = batchSize > 0 ? batchSize : 1; + _isConnected = false; + } + + // Returns the pool for this client. Creates the pool on first use + private synchronized ScheduledThreadPoolExecutor pool() { + if (_pool == null) { + _pool = new ScheduledThreadPoolExecutor(1); + _pool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + _pool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + } + + return _pool; + } + + @Override + public synchronized void connect() throws Exception { + if (_isConnected) { + LOG.warn("Client already connected"); + return; + } + + _rclients = new ArrayList(); + for (String rsHost : _rsHosts) { + String[] splits = rsHost.split(":"); + + if (splits == null || splits.length != 2) { + throw new IllegalArgumentException("Invalid Riemann server: " + rsHost); + } + + String host = splits[0]; + int port = Integer.parseInt(splits[1]); + + RawRiemannClient rclient = new RawRiemannClient(host, port, _batchSize); + rclient.connect(); + + /** + * If any Riemann client doesn't support batch, set it to 1 + */ + if (rclient.isConnected() && rclient.getBatchSize() == 1) { + _batchSize = 1; + } + _rclients.add(rclient); + } + + _isConnected = true; + } + + @Override + public synchronized void disconnect() { + if (!_isConnected || _rclients == null) { + LOG.warn("Client already disconnected"); + return; + } + + for (RawRiemannClient rclient : _rclients) { + rclient.disconnect(); + } + _rclients = null; + _isConnected = false; + } + + /** + * Get raw client based on event's sharding key + * @param event + * @return + */ + private RawRiemannClient client(MonitoringEvent event) { + String shardingKey = event.shardingKey(); + int baseIdx = shardingKey.hashCode() % _rsHosts.size(); + + // find the first rclient in CONNECTED state and send + for (int i = 0; i < _rsHosts.size(); i++) { + int idx = (baseIdx + i) % _rsHosts.size(); + RawRiemannClient rclient = _rclients.get(idx); + if (rclient.isConnected()) { + return rclient; + } + } + return null; + } + + @Override + public boolean send(MonitoringEvent event) { + if (!_isConnected || _rclients == null) { + LOG.warn("Client is not connected. Fail to send event: " + event); + return false; + } + + RawRiemannClient rclient = client(event); + if (rclient != null) { + return rclient.send(event); + } + + LOG.error("Fail to send event: " + event + ", no rclient is available"); + return false; + } + + @Override + public void every(long interval, long delay, TimeUnit unit, Runnable r) { + pool().scheduleAtFixedRate(r, delay, interval, unit); + } + + @Override + public boolean sendAndFlush(MonitoringEvent event) { + if (!_isConnected || _rclients == null) { + LOG.warn("Client is not connected. Fail to send event: " + event); + return false; + } + + RawRiemannClient rclient = client(event); + if (rclient != null) { + boolean success = rclient.send(event); + if (success) { + return rclient.flush(); + } + } + LOG.error("Fail to send event: " + event + ", no rclient is available"); + return false; + } + + @Override + public boolean isConnected() { + return _isConnected; + } + + @Override + public boolean isBatchingEnabled() { + if (!_isConnected || _rclients == null) { + LOG.warn("Client is not connected"); + return false; + } + + /** + * Batch should be enabled for all or none raw clients + */ + for (RawRiemannClient rclient : _rclients) { + if (rclient.isConnected()) { + return rclient.isBatchEnabled(); + } + } + return false; + } + + /** + * Return batch size if connected or 1 otherwise + */ + @Override + public int getBatchSize() { + if (!_isConnected || _rclients == null) { + LOG.warn("Client is not connected"); + return 1; + } + + /** + * Batch size should be the same for all raw clients + */ + for (RawRiemannClient rclient : _rclients) { + if (rclient.isConnected()) { + return rclient.getBatchSize(); + } + } + + return 1; + } + + @Override + public boolean flush() { + if (!_isConnected || _rclients == null) { + LOG.warn("Client is not connected"); + return false; + } + + boolean success = true; + for (RawRiemannClient rclient : _rclients) { + success &= rclient.flush(); + } + return success; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/pom.xml ---------------------------------------------------------------------- diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml index 2cbddfd..041a390 100644 --- a/helix-monitor-server/pom.xml +++ b/helix-monitor-server/pom.xml @@ -54,11 +54,6 @@ under the License. 0.2.4 - org.eclipse.jetty.aggregate - jetty-all-server - 8.1.14.v20131031 - - factual clj-helix 0.1.0 http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java deleted file mode 100644 index 61e3d6c..0000000 --- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java +++ /dev/null @@ -1,137 +0,0 @@ -package org.apache.helix.monitoring; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.net.InetAddress; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.log4j.Logger; - -import com.aphyr.riemann.client.RiemannClient; - -public class RiemannAgent { - private static final Logger LOG = Logger.getLogger(RiemannAgent.class); - - static final String STATEMODEL_NAME = "OnlineOffline"; - - final Random _random; - final String _zkAddr; - final String _clusterName; - final String _instanceName; - final int _riemannPort; - final HelixManager _participant; - final RiemannClient _client; - - public RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException { - _random = new Random(); - _zkAddr = zkAddr; - _clusterName = clusterName; - _instanceName = - String.format("%s_%d", InetAddress.getLocalHost().getCanonicalHostName(), riemannPort); - _riemannPort = riemannPort; - _participant = - HelixManagerFactory.getZKHelixManager(clusterName, _instanceName, InstanceType.PARTICIPANT, - zkAddr); - _client = RiemannClient.tcp("localhost", riemannPort); - } - - public void start() throws Exception { - LOG.info("Starting RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName - + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort); - - // Wait until riemann port is connected - int timeout = 30 * 1000; - long startT = System.currentTimeMillis(); - while ((System.currentTimeMillis() - startT) < timeout) { - try { - _client.connect(); - _client.event().service("heartbeat").state("running").ttl(20).sendWithAck(); - break; - } catch (IOException e) { - int sleep = _random.nextInt(3000) + 3000; - LOG.info("Wait " + sleep + "ms for riemann server to come up"); - TimeUnit.MILLISECONDS.sleep(sleep); - } - } - - if (!_client.isConnected()) { - String err = - "Fail to connect to reimann server on localhost:" + _riemannPort + " in " + timeout - + "ms"; - LOG.error(err); - throw new RuntimeException(err); - } - LOG.info("RiemannAgent connected to local riemann server on port: " + _riemannPort); - - // Start helix participant - _participant.getStateMachineEngine().registerStateModelFactory(STATEMODEL_NAME, - new RiemannAgentStateModelFactory()); - _participant.connect(); - - // Monitor riemann server - _client.every(10, 0, TimeUnit.SECONDS, new Runnable() { - - @Override - public void run() { - try { - // Send heartbeat metrics - _client.event().service("heartbeat").state("running").ttl(20).sendWithAck(); - } catch (Exception e) { - LOG.error("Exception in send heatbeat to local riemann server, shutdown RiemannAgent: " - + _instanceName, e); - asyncShutdown(); - } - } - }); - - } - - /** - * Do shutdown asynchronously - */ - void asyncShutdown() { - new Thread(new Runnable() { - - @Override - public void run() { - shutdown(); - } - }).start(); - } - - public void shutdown() { - LOG.info("Shutting down RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName - + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort); - - try { - _client.scheduler().shutdown(); - _client.disconnect(); - } catch (IOException e) { - LOG.error("Exception in disconnect riemann client", e); - } - - _participant.disconnect(); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java deleted file mode 100644 index 2e32cef..0000000 --- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.apache.helix.monitoring; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.NotificationContext; -import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; -import org.apache.helix.participant.statemachine.StateModelInfo; -import org.apache.helix.participant.statemachine.Transition; -import org.apache.log4j.Logger; - -@StateModelInfo(initialState = "OFFLINE", states = { - "DROPPED", "OFFLINE", "ONLINE" -}) -public class RiemannAgentStateModel extends StateModel { - private static final Logger LOG = Logger.getLogger(RiemannAgentStateModel.class); - - void logTransition(Message message) { - String toState = message.getToState(); - String fromState = message.getFromState(); - String resourceName = message.getResourceName(); - String partittionName = message.getPartitionName(); - - LOG.info("Become " + toState + " from " + fromState + " for resource: " + resourceName - + ", partition: " + partittionName); - } - - @Transition(to = "ONLINE", from = "OFFLINE") - public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { - logTransition(message); - } - - @Transition(to = "OFFLINE", from = "ONLINE") - public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { - logTransition(message); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java deleted file mode 100644 index a5865ad..0000000 --- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.helix.monitoring; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.participant.statemachine.StateModelFactory; - -public class RiemannAgentStateModelFactory extends StateModelFactory { - @Override - public RiemannAgentStateModel createNewStateModel(String partitionName) { - return new RiemannAgentStateModel(); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java deleted file mode 100644 index 5fe8ebe..0000000 --- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java +++ /dev/null @@ -1,111 +0,0 @@ -package org.apache.helix.monitoring; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; -import java.nio.charset.Charset; -import java.util.UUID; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.commons.io.IOUtils; -import org.apache.helix.BaseDataAccessor; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; -import org.apache.helix.ZNRecord; -import org.apache.helix.controller.alert.AlertName; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.model.Message; -import org.apache.helix.model.Message.MessageType; -import org.apache.log4j.Logger; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.AbstractHandler; - -/** - * Accept alerts from local riemann server and forward it to helix-controller - */ -public class RiemannAlertProxy { - private static final Logger LOG = Logger.getLogger(RiemannAlertProxy.class); - - class RiemannAlertProxyHandler extends AbstractHandler { - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, - HttpServletResponse response) throws IOException, ServletException { - // Read content-body - InputStream inputStream = request.getInputStream(); - StringWriter writer = new StringWriter(); - IOUtils.copy(inputStream, writer, Charset.defaultCharset().toString()); - String alertNameStr = writer.toString(); - LOG.info("Handling alert: " + alertNameStr); - - // Send alert message to the controller of cluster being monitored - try { - AlertName alertName = AlertName.from(alertNameStr); - String clusterName = alertName.getScope().getClusterId().stringify(); - HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - Message message = new Message(MessageType.ALERT, UUID.randomUUID().toString()); - message.setAttribute(Message.Attributes.ALERT_NAME, alertNameStr); - message.setTgtSessionId("*"); - message.setTgtName("controller"); - accessor.setProperty(keyBuilder.controllerMessage(message.getId()), message); - } catch (Exception e) { - LOG.error("Fail to send alert to cluster being monitored: " + alertNameStr, e); - } - - // return ok - response.setStatus(HttpServletResponse.SC_OK); - baseRequest.setHandled(true); - } - } - - final int _proxyPort; - final Server _server; - final BaseDataAccessor _baseAccessor; - final AbstractHandler _handler; - - public RiemannAlertProxy(int proxyPort, BaseDataAccessor baseAccessor) { - _proxyPort = proxyPort; - _server = new Server(proxyPort); - _baseAccessor = baseAccessor; - _handler = new RiemannAlertProxyHandler(); - } - - public void start() throws Exception { - LOG.info("Starting RiemannAlertProxy on port: " + _proxyPort); - _server.setHandler(_handler); - _server.start(); - - } - - public void shutdown() { - try { - LOG.info("Stopping RiemannAlertProxy on port: " + _proxyPort); - _server.stop(); - } catch (Exception e) { - LOG.error("Fail to stop RiemannAlertProxy", e); - } - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java deleted file mode 100644 index 193b763..0000000 --- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java +++ /dev/null @@ -1,116 +0,0 @@ -package org.apache.helix.monitoring; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.PrintWriter; -import java.util.List; - -import org.apache.helix.model.MonitoringConfig; -import org.apache.log4j.Logger; - -import com.google.common.collect.Lists; - -/** - * Riemann configs - */ -public class RiemannConfigs { - private static final Logger LOG = Logger.getLogger(RiemannConfigs.class); - private static final String DEFAULT_CONFIG_DIR = "riemannconfigs"; - public static final String DEFAULT_RIEMANN_CONFIG = "riemann.config"; - - private final String _configDir; - private final List _configs; - - RiemannConfigs(String configDir, List configs) { - _configDir = configDir; - _configs = configs; - } - - /** - * persist configs to riemann config dir - */ - public void persistConfigs() { - // create the directory - File dir = new File(_configDir); - if (!dir.exists()) { - dir.mkdir(); - } - - for (MonitoringConfig config : _configs) { - String configData = config.getConfig(); - String fileName = _configDir + "/" + config.getId(); - try { - PrintWriter writer = new PrintWriter(fileName); - writer.println(configData); - writer.close(); - - // make sure this is cleaned up eventually - File file = new File(fileName); - file.deleteOnExit(); - } catch (FileNotFoundException e) { - LOG.error("Could not write " + config.getId(), e); - } - } - } - - public String getConfigDir() { - return _configDir; - } - - public static class Builder { - private final List _configs; - private final String _configDir; - - /** - * By default, configs will be placed in "{systemTmpDir}/riemannconfigs" - */ - public Builder() { - this(System.getProperty("java.io.tmpdir") + "/" + DEFAULT_CONFIG_DIR); - } - - public Builder(String configDir) { - _configDir = configDir; - _configs = Lists.newArrayList(); - } - - public Builder addConfig(MonitoringConfig monitoringConfig) { - _configs.add(monitoringConfig); - return this; - } - - public Builder addConfigs(List monitoringConfigs) { - _configs.addAll(monitoringConfigs); - return this; - } - - public RiemannConfigs build() { - // Check default riemann config exists - for (MonitoringConfig config : _configs) { - if (config.getId().equals(DEFAULT_RIEMANN_CONFIG)) { - return new RiemannConfigs(_configDir, _configs); - } - } - throw new IllegalArgumentException("Missing default riemann config: " - + DEFAULT_RIEMANN_CONFIG); - } - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java deleted file mode 100644 index d4f11a5..0000000 --- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.helix.monitoring; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.log4j.Logger; - -import clojure.lang.RT; -import clojure.lang.Symbol; - -/** - * A monitoring server implementation that uses Riemann - */ -public class RiemannMonitoringServer implements MonitoringServer { - private static final Logger LOG = Logger.getLogger(RiemannMonitoringServer.class); - - private volatile boolean _isStarted; - private final RiemannConfigs _config; - - /** - * Create a monitoring server - * @param config - */ - public RiemannMonitoringServer(RiemannConfigs config) { - LOG.info("Construct RiemannMonitoringServer with configDir: " + config.getConfigDir()); - _config = config; - config.persistConfigs(); - _isStarted = false; - } - - @Override - public synchronized void start() { - LOG.info("Starting Riemann server with configDir: " + _config.getConfigDir()); - - // start Riemann - RT.var("clojure.core", "require").invoke(Symbol.intern("riemann.bin")); - RT.var("clojure.core", "require").invoke(Symbol.intern(RiemannConfigs.DEFAULT_RIEMANN_CONFIG)); - RT.var("riemann.bin", "-main").invoke(_config.getConfigDir()); - _isStarted = true; - } - - @Override - public synchronized void stop() { - if (!_isStarted) { - LOG.error("Tried to stop Riemann when not started!"); - return; - } - LOG.info("Stopping Riemann server"); - RT.var("riemann.config", "stop!").invoke(); - _isStarted = false; - } - - @Override - public boolean isStarted() { - return _isStarted; - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java new file mode 100644 index 0000000..cbc209a --- /dev/null +++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java @@ -0,0 +1,112 @@ +package org.apache.helix.monitoring.riemann; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.LinkedList; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.alert.AlertName; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageType; +import org.apache.log4j.Logger; + +/** + * Accept alerts from local Riemann server and forward it to helix-controller + */ +public class HelixAlertMessenger { + private static final Logger LOG = Logger.getLogger(HelixAlertMessenger.class); + private static final int DEFAULT_MAX_ALERT_COUNT = 1; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MINUTES; + + private final ZkClient _zkclient; + private final BaseDataAccessor _baseAccessor; + + /** + * A queue that keeps track of timestamps (in millisecond) of last N alerts sent + */ + private final Queue _queue; + private final int _maxAlertCount; + private final TimeUnit _timeUnit; + + public HelixAlertMessenger(String zkHosts) { + this(zkHosts, DEFAULT_MAX_ALERT_COUNT, DEFAULT_TIME_UNIT); + } + + public HelixAlertMessenger(String zkHosts, int maxAlertCount, TimeUnit timeUnit) { + _zkclient = + new ZkClient(zkHosts, ZkClient.DEFAULT_SESSION_TIMEOUT, + ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); + _baseAccessor = new ZkBaseDataAccessor(_zkclient); + _queue = new LinkedList(); + _maxAlertCount = maxAlertCount; + _timeUnit = timeUnit; + } + + /** + * Send alert to helix controller; throttle if necessary + * not thread-safe + * @param alertNameStr + */ + public void onAlert(String alertNameStr) { + LOG.info("Handling alert: " + alertNameStr); + + // throttling + long now = System.currentTimeMillis(); + if (_queue.size() >= _maxAlertCount) { + if (_queue.peek() + _timeUnit.toMillis(_maxAlertCount) > now) { + if (LOG.isDebugEnabled()) { + LOG.debug("Throttling alert: " + alertNameStr); + } + return; + } else { + _queue.remove(); + } + } + + // Send alert message to the controller of cluster being monitored + try { + AlertName alertName = AlertName.from(alertNameStr); + String clusterName = alertName.getScope().getClusterId().stringify(); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Message message = new Message(MessageType.ALERT, UUID.randomUUID().toString()); + message.setAttribute(Message.Attributes.ALERT_NAME, alertNameStr); + message.setTgtSessionId("*"); + message.setTgtName("controller"); + accessor.setProperty(keyBuilder.controllerMessage(message.getId()), message); + + // record the timestamp + _queue.add(now); + + } catch (Exception e) { + LOG.error("Fail to send alert to cluster being monitored: " + alertNameStr, e); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java new file mode 100644 index 0000000..3e7efca --- /dev/null +++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java @@ -0,0 +1,169 @@ +package org.apache.helix.monitoring.riemann; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.log4j.Logger; + +import com.aphyr.riemann.client.RiemannClient; + +/** + * Start a Helix participant that joins cluster and represents local Riemann server + */ +public class RiemannAgent { + private static final Logger LOG = Logger.getLogger(RiemannAgent.class); + private static final int HEARTBEAT_PERIOD = 10; + private static final int TIMEOUT_LIMIT = 3; + + private final String _zkAddr; + private final String _clusterName; + private final String _instanceName; + private final int _riemannPort; + private HelixManager _participant; + private final RiemannClient _client; + private Thread _reconnectThread; + + public RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException { + _zkAddr = zkAddr; + _clusterName = clusterName; + _instanceName = + String.format("%s_%d", InetAddress.getLocalHost().getCanonicalHostName(), riemannPort); + _riemannPort = riemannPort; + _client = RiemannClient.tcp("localhost", riemannPort); + } + + private synchronized boolean doStart() throws Exception { + try { + _client.connect(); + _client.event().service("heartbeat").state("running").ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD) + .sendWithAck(); + LOG.info("RiemannAgent connected to local riemann server on localhost:" + _riemannPort); + _participant = + HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName, + InstanceType.PARTICIPANT, _zkAddr); + _participant.connect(); + + // Monitoring Riemann server + Random random = new Random(); + _client.every(HEARTBEAT_PERIOD, random.nextInt(HEARTBEAT_PERIOD), TimeUnit.SECONDS, + new Runnable() { + + @Override + public void run() { + try { + // Send heartbeat metrics + _client.event().service("heartbeat").state("running") + .ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD).sendWithAck(); + if (_participant == null) { + _participant = + HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName, + InstanceType.PARTICIPANT, _zkAddr); + _participant.connect(); + } + } catch (Exception e) { + LOG.error( + "Exception in send heatbeat to local riemann server, shutdown RiemannAgent: " + + _instanceName, e); + + if (_participant != null) { + _participant.disconnect(); + _participant = null; + } + } + } + }); + + return true; + } catch (IOException e) { + LOG.error("Fail to connect to Riemann server on localhost:" + _riemannPort); + } + return false; + } + + /** + * Try connect local Riemann server; if fails, start a thread to retry async + * @throws Exception + */ + public synchronized void start() throws Exception { + LOG.info("Starting RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName + + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort); + + boolean success = doStart(); + if (!success) { + _reconnectThread = new Thread(new Runnable() { + + @Override + public void run() { + LOG.info("Start reconnect thread"); + Random random = new Random(); + try { + while (!Thread.currentThread().isInterrupted()) { + boolean success = doStart(); + if (success) { + break; + } + + TimeUnit.SECONDS.sleep(HEARTBEAT_PERIOD + random.nextInt() % HEARTBEAT_PERIOD); + + } + } catch (InterruptedException e) { + LOG.info("Reconnect thread is interrupted"); + } catch (Exception e) { + LOG.error("Fail to start RiemannAgent", e); + } finally { + LOG.info("Terminate reconnect thread"); + } + + } + }); + _reconnectThread.start(); + } + + } + + public synchronized void shutdown() { + LOG.info("Shutting down RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName + + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort); + + if (_reconnectThread != null) { + _reconnectThread.interrupt(); + _reconnectThread = null; + } + + try { + _client.scheduler().shutdown(); + _client.disconnect(); + } catch (IOException e) { + LOG.error("Exception in disconnect riemann client", e); + } + + if (_participant != null) { + _participant.disconnect(); + _participant = null; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java new file mode 100644 index 0000000..43b957e --- /dev/null +++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java @@ -0,0 +1,116 @@ +package org.apache.helix.monitoring.riemann; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.util.List; + +import org.apache.helix.model.MonitoringConfig; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; + +/** + * Riemann configs + */ +public class RiemannConfigs { + private static final Logger LOG = Logger.getLogger(RiemannConfigs.class); + public static final String DEFAULT_CONFIG_DIR = "riemannconfigs"; + public static final String DEFAULT_RIEMANN_CONFIG = "riemann.config"; + + private final String _configDir; + private final List _configs; + + RiemannConfigs(String configDir, List configs) { + _configDir = configDir; + _configs = configs; + } + + /** + * persist configs to riemann config dir + */ + public void persistConfigs() { + // create the directory + File dir = new File(_configDir); + if (!dir.exists()) { + dir.mkdir(); + } + + for (MonitoringConfig config : _configs) { + String configData = config.getConfig(); + String fileName = _configDir + "/" + config.getId(); + try { + PrintWriter writer = new PrintWriter(fileName); + writer.println(configData); + writer.close(); + + // make sure this is cleaned up eventually + File file = new File(fileName); + file.deleteOnExit(); + } catch (FileNotFoundException e) { + LOG.error("Could not write " + config.getId(), e); + } + } + } + + public String getConfigDir() { + return _configDir; + } + + public static class Builder { + private final List _configs; + private final String _configDir; + + /** + * By default, configs will be placed in "{systemTmpDir}/riemannconfigs" + */ + public Builder() { + this(System.getProperty("java.io.tmpdir") + "/" + DEFAULT_CONFIG_DIR); + } + + public Builder(String configDir) { + _configDir = configDir; + _configs = Lists.newArrayList(); + } + + public Builder addConfig(MonitoringConfig monitoringConfig) { + _configs.add(monitoringConfig); + return this; + } + + public Builder addConfigs(List monitoringConfigs) { + _configs.addAll(monitoringConfigs); + return this; + } + + public RiemannConfigs build() { + // Check default riemann config exists + for (MonitoringConfig config : _configs) { + if (config.getId().equals(DEFAULT_RIEMANN_CONFIG)) { + return new RiemannConfigs(_configDir, _configs); + } + } + throw new IllegalArgumentException("Missing default riemann config: " + + DEFAULT_RIEMANN_CONFIG); + } + } +}