Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 29637 invoked from network); 2 Mar 2009 14:20:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 2 Mar 2009 14:20:20 -0000 Received: (qmail 25667 invoked by uid 500); 2 Mar 2009 14:20:20 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 25655 invoked by uid 500); 2 Mar 2009 14:20:20 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Delivered-To: moderator for cassandra-commits@incubator.apache.org Received: (qmail 49475 invoked by uid 99); 2 Mar 2009 06:13:30 -0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r749205 [1/16] - in /incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/ config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/ cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/ ... Date: Mon, 02 Mar 2009 06:12:49 -0000 To: cassandra-commits@incubator.apache.org From: pmalik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090302061254.69B372388999@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: pmalik Date: Mon Mar 2 06:12:46 2009 New Revision: 749205 URL: http://svn.apache.org/viewvc?rev=749205&view=rev Log: Added Cassandra sources Added: incubator/cassandra/src/org/apache/cassandra/analytics/ incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java incubator/cassandra/src/org/apache/cassandra/cli/ incubator/cassandra/src/org/apache/cassandra/cli/Cli.g incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java incubator/cassandra/src/org/apache/cassandra/cli/CliLexer.java incubator/cassandra/src/org/apache/cassandra/cli/CliMain.java incubator/cassandra/src/org/apache/cassandra/cli/CliOptions.java incubator/cassandra/src/org/apache/cassandra/cli/CliParser.java incubator/cassandra/src/org/apache/cassandra/cli/CliSessionState.java incubator/cassandra/src/org/apache/cassandra/cli/Cli__.g incubator/cassandra/src/org/apache/cassandra/concurrent/ incubator/cassandra/src/org/apache/cassandra/concurrent/AIOExecutorService.java incubator/cassandra/src/org/apache/cassandra/concurrent/Context.java incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationContext.java incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationStage.java incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java incubator/cassandra/src/org/apache/cassandra/config/ incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java incubator/cassandra/src/org/apache/cassandra/continuations/ incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java incubator/cassandra/src/org/apache/cassandra/cql/ incubator/cassandra/src/org/apache/cassandra/cql/common/ incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java incubator/cassandra/src/org/apache/cassandra/cql/common/CqlResult.java incubator/cassandra/src/org/apache/cassandra/cql/common/DMLPlan.java incubator/cassandra/src/org/apache/cassandra/cql/common/ExplainPlan.java incubator/cassandra/src/org/apache/cassandra/cql/common/OperandDef.java incubator/cassandra/src/org/apache/cassandra/cql/common/Pair.java incubator/cassandra/src/org/apache/cassandra/cql/common/Plan.java incubator/cassandra/src/org/apache/cassandra/cql/common/QueryPlan.java incubator/cassandra/src/org/apache/cassandra/cql/common/RowSourceDef.java incubator/cassandra/src/org/apache/cassandra/cql/common/SetColumnMap.java incubator/cassandra/src/org/apache/cassandra/cql/common/SetSuperColumnMap.java incubator/cassandra/src/org/apache/cassandra/cql/common/SetUniqueKey.java incubator/cassandra/src/org/apache/cassandra/cql/common/SuperColumnMapExpr.java incubator/cassandra/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java incubator/cassandra/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java incubator/cassandra/src/org/apache/cassandra/cql/common/Utils.java incubator/cassandra/src/org/apache/cassandra/cql/compiler/ incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/ incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/CompilerErrorMsg.java incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/CqlCompiler.java incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql.g incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql.tokens incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/CqlLexer.java incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/CqlParser.java incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql__.g incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseError.java incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseException.java incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/ incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java incubator/cassandra/src/org/apache/cassandra/cql/driver/ incubator/cassandra/src/org/apache/cassandra/cql/driver/CqlDriver.java incubator/cassandra/src/org/apache/cassandra/cql/execution/ incubator/cassandra/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java incubator/cassandra/src/org/apache/cassandra/dht/ incubator/cassandra/src/org/apache/cassandra/dht/BootStrapper.java incubator/cassandra/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadata.java incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java incubator/cassandra/src/org/apache/cassandra/dht/BootstrapSourceTarget.java incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java incubator/cassandra/src/org/apache/cassandra/dht/Range.java incubator/cassandra/src/org/apache/cassandra/gms/ incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java incubator/cassandra/src/org/apache/cassandra/gms/Gossiper.java incubator/cassandra/src/org/apache/cassandra/gms/HeartBeatState.java incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetector.java incubator/cassandra/src/org/apache/cassandra/gms/IFailureNotification.java incubator/cassandra/src/org/apache/cassandra/gms/JoinMessage.java incubator/cassandra/src/org/apache/cassandra/gms/PureRandom.java incubator/cassandra/src/org/apache/cassandra/gms/VersionGenerator.java incubator/cassandra/src/org/apache/cassandra/io/ incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java incubator/cassandra/src/org/apache/cassandra/io/SSTable.java incubator/cassandra/src/org/apache/cassandra/io/SequenceFile.java Added: incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java (added) +++ incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,788 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.TreeMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.service.IComponentShutdown; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.LogUtil; +import org.apache.log4j.Logger; + + + +/** + * Context for sending metrics to Ganglia. This class drives the entire metric collection process. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com ) + */ +public class AnalyticsContext implements IComponentShutdown +{ + private static Logger logger_ = Logger.getLogger(AnalyticsContext.class); + + private static final String PERIOD_PROPERTY = "period"; + private static final String SERVERS_PROPERTY = "servers"; + private static final String UNITS_PROPERTY = "units"; + private static final String SLOPE_PROPERTY = "slope"; + private static final String TMAX_PROPERTY = "tmax"; + private static final String DMAX_PROPERTY = "dmax"; + + private static final String DEFAULT_UNITS = ""; + private static final String DEFAULT_SLOPE = "both"; + private static final int DEFAULT_TMAX = 60; + private static final int DEFAULT_DMAX = 0; + private static final int DEFAULT_PORT = 8649; + private static final int BUFFER_SIZE = 1500; // as per libgmond.c + + private static final Map typeTable_ = new HashMap(5); + + private Map bufferedData_ = new HashMap(); + /* Keeps the MetricRecord for each abstraction that implements IAnalyticsSource */ + private Map recordMap_ = new HashMap(); + private Map attributeMap_ = new HashMap(); + private Set updaters = new HashSet(1); + private List metricsServers_; + + private Map unitsTable_; + private Map slopeTable_; + private Map tmaxTable_; + private Map dmaxTable_; + + /* singleton instance */ + private static AnalyticsContext instance_; + /* Used to lock the factory for creation of StorageService instance */ + private static Lock createLock_ = new ReentrantLock(); + + /** + * Default period in seconds at which data is sent to the metrics system. + */ + private static final int DEFAULT_PERIOD = 5; + + /** + * Port to which we should write the data. + */ + private int port_ = DEFAULT_PORT; + + private Timer timer = null; + private int period_ = DEFAULT_PERIOD; + private volatile boolean isMonitoring = false; + private byte[] buffer_ = new byte[BUFFER_SIZE]; + private int offset_; + + private DatagramSocket datagramSocket_; + + static class TagMap extends TreeMap + { + private static final long serialVersionUID = 3546309335061952993L; + TagMap() + { + super(); + } + TagMap(TagMap orig) + { + super(orig); + } + } + + static class MetricMap extends TreeMap + { + private static final long serialVersionUID = -7495051861141631609L; + } + + static class RecordMap extends HashMap + { + private static final long serialVersionUID = 259835619700264611L; + } + + static + { + typeTable_.put(String.class, "string"); + typeTable_.put(Byte.class, "int8"); + typeTable_.put(Short.class, "int16"); + typeTable_.put(Integer.class, "int32"); + typeTable_.put(Float.class, "float"); + } + + + /** + * Creates a new instance of AnalyticsReporter + */ + public AnalyticsContext() + { + StorageService.instance().registerComponentForShutdown(this); + } + + /** + * Initializes the context. + */ + public void init(String contextName, String serverSpecList) + { + String periodStr = getAttribute(PERIOD_PROPERTY); + + if (periodStr != null) + { + int period = 0; + try + { + period = Integer.parseInt(periodStr); + } + catch (NumberFormatException nfe) + { + } + + if (period <= 0) + { + throw new AnalyticsException("Invalid period: " + periodStr); + } + + setPeriod(period); + } + + metricsServers_ = parse(serverSpecList, port_); + unitsTable_ = getAttributeTable(UNITS_PROPERTY); + slopeTable_ = getAttributeTable(SLOPE_PROPERTY); + tmaxTable_ = getAttributeTable(TMAX_PROPERTY); + dmaxTable_ = getAttributeTable(DMAX_PROPERTY); + + try + { + datagramSocket_ = new DatagramSocket(); + } + catch (SocketException se) + { + se.printStackTrace(); + } + } + + /** + * Sends a record to the metrics system. + */ + public void emitRecord(String recordName, OutputRecord outRec) throws IOException + { + // emit each metric in turn + for (String metricName : outRec.getMetricNames()) + { + Object metric = outRec.getMetric(metricName); + String type = (String) typeTable_.get(metric.getClass()); + emitMetric(metricName, type, metric.toString()); + } + } + + /** + * Helper which actually writes the metric in XDR format. + * + * @param name + * @param type + * @param value + * @throws IOException + */ + private void emitMetric(String name, String type, String value) throws IOException + { + String units = getUnits(name); + int slope = getSlope(name); + int tmax = getTmax(name); + int dmax = getDmax(name); + offset_ = 0; + + xdr_int(0); // metric_user_defined + xdr_string(type); + xdr_string(name); + xdr_string(value); + xdr_string(units); + xdr_int(slope); + xdr_int(tmax); + xdr_int(dmax); + + for (InetSocketAddress socketAddress : metricsServers_) + { + DatagramPacket packet = new DatagramPacket(buffer_, offset_, socketAddress); + datagramSocket_.send(packet); + } + } + + private String getUnits(String metricName) + { + String result = (String) unitsTable_.get(metricName); + if (result == null) + { + result = DEFAULT_UNITS; + } + + return result; + } + + private int getSlope(String metricName) + { + String slopeString = (String) slopeTable_.get(metricName); + if (slopeString == null) + { + slopeString = DEFAULT_SLOPE; + } + + return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c + } + + private int getTmax(String metricName) + { + String tmaxString = (String) tmaxTable_.get(metricName); + if (tmaxString == null) + { + return DEFAULT_TMAX; + } + else + { + return Integer.parseInt(tmaxString); + } + } + + private int getDmax(String metricName) + { + String dmaxString = (String) dmaxTable_.get(metricName); + if (dmaxString == null) + { + return DEFAULT_DMAX; + } + else + { + return Integer.parseInt(dmaxString); + } + } + + /** + * Puts a string into the buffer by first writing the size of the string + * as an int, followed by the bytes of the string, padded if necessary to + * a multiple of 4. + */ + private void xdr_string(String s) + { + byte[] bytes = s.getBytes(); + int len = bytes.length; + xdr_int(len); + System.arraycopy(bytes, 0, buffer_, offset_, len); + offset_ += len; + pad(); + } + + /** + * Pads the buffer with zero bytes up to the nearest multiple of 4. + */ + private void pad() + { + int newOffset = ((offset_ + 3) / 4) * 4; + while (offset_ < newOffset) + { + buffer_[offset_++] = 0; + } + } + + /** + * Puts an integer into the buffer as 4 bytes, big-endian. + */ + private void xdr_int(int i) + { + buffer_[offset_++] = (byte) ((i >> 24) & 0xff); + buffer_[offset_++] = (byte) ((i >> 16) & 0xff); + buffer_[offset_++] = (byte) ((i >> 8) & 0xff); + buffer_[offset_++] = (byte) (i & 0xff); + } + + + + /** + * Returns the names of all the factory's attributes. + * + * @return the attribute names + */ + public String[] getAttributeNames() + { + String[] result = new String[attributeMap_.size()]; + int i = 0; + // for (String attributeName : attributeMap.keySet()) { + Iterator it = attributeMap_.keySet().iterator(); + while (it.hasNext()) + { + result[i++] = it.next(); + } + return result; + } + + /** + * Sets the named factory attribute to the specified value, creating it + * if it did not already exist. If the value is null, this is the same as + * calling removeAttribute. + * + * @param attributeName the attribute name + * @param value the new attribute value + */ + public void setAttribute(String attributeName, Object value) + { + attributeMap_.put(attributeName, value); + } + + /** + * Removes the named attribute if it exists. + * + * @param attributeName the attribute name + */ + public void removeAttribute(String attributeName) + { + attributeMap_.remove(attributeName); + } + + /** + * Returns the value of the named attribute, or null if there is no + * attribute of that name. + * + * @param attributeName the attribute name + * @return the attribute value + */ + public String getAttribute(String attributeName) + { + return (String)attributeMap_.get(attributeName); + } + + + /** + * Returns an attribute-value map derived from the factory attributes + * by finding all factory attributes that begin with + * contextName.tableName. The returned map consists of + * those attributes with the contextName and tableName stripped off. + */ + protected Map getAttributeTable(String tableName) + { + String prefix = tableName + "."; + Map result = new HashMap(); + for (String attributeName : getAttributeNames()) + { + if (attributeName.startsWith(prefix)) + { + String name = attributeName.substring(prefix.length()); + String value = (String) getAttribute(attributeName); + result.put(name, value); + } + } + return result; + } + + /** + * Starts or restarts monitoring, the emitting of metrics records. + */ + public void startMonitoring() throws IOException { + if (!isMonitoring) + { + startTimer(); + isMonitoring = true; + } + } + + /** + * Stops monitoring. This does not free buffered data. + * @see #close() + */ + public void stopMonitoring() { + if (isMonitoring) + { + shutdown(); + isMonitoring = false; + } + } + + /** + * Returns true if monitoring is currently in progress. + */ + public boolean isMonitoring() { + return isMonitoring; + } + + /** + * Stops monitoring and frees buffered data, returning this + * object to its initial state. + */ + public void close() + { + stopMonitoring(); + clearUpdaters(); + } + + /** + * Creates a new AbstractMetricsRecord instance with the given recordName. + * Throws an exception if the metrics implementation is configured with a fixed + * set of record names and recordName is not in that set. + * + * @param recordName the name of the record + * @throws AnalyticsException if recordName conflicts with configuration data + */ + public final void createRecord(String recordName) + { + if (bufferedData_.get(recordName) == null) + { + bufferedData_.put(recordName, new RecordMap()); + } + recordMap_.put(recordName, new MetricsRecord(recordName, this)); + } + + /** + * Return the MetricsRecord associated with this record name. + * @param recordName the name of the record + * @return newly created instance of MetricsRecordImpl or subclass + */ + public MetricsRecord getMetricsRecord(String recordName) + { + return recordMap_.get(recordName); + } + + /** + * Registers a callback to be called at time intervals determined by + * the configuration. + * + * @param updater object to be run periodically; it should update + * some metrics records + */ + public void registerUpdater(final IAnalyticsSource updater) + { + if (!updaters.contains(updater)) { + updaters.add(updater); + } + } + + /** + * Removes a callback, if it exists. + * + * @param updater object to be removed from the callback list + */ + public void unregisterUpdater(IAnalyticsSource updater) + { + updaters.remove(updater); + } + + private void clearUpdaters() + { + updaters.clear(); + } + + /** + * Starts timer if it is not already started + */ + private void startTimer() + { + if (timer == null) + { + timer = new Timer("Timer thread for monitoring AnalyticsContext", true); + TimerTask task = new TimerTask() + { + public void run() + { + try + { + timerEvent(); + } + catch (IOException ioe) + { + ioe.printStackTrace(); + } + } + }; + long millis = period_ * 1000; + timer.scheduleAtFixedRate(task, millis, millis); + } + } + + /** + * Stops timer if it is running + */ + public void shutdown() + { + if (timer != null) + { + timer.cancel(); + timer = null; + } + } + + /** + * Timer callback. + */ + private void timerEvent() throws IOException + { + if (isMonitoring) + { + Collection myUpdaters; + + // we dont need to synchronize as there will not be any + // addition or removal of listeners + myUpdaters = new ArrayList(updaters); + + // Run all the registered updates without holding a lock + // on this context + for (IAnalyticsSource updater : myUpdaters) + { + try + { + updater.doUpdates(this); + } + catch (Throwable throwable) + { + throwable.printStackTrace(); + } + } + emitRecords(); + } + } + + /** + * Emits the records. + */ + private void emitRecords() throws IOException + { + for (String recordName : bufferedData_.keySet()) + { + RecordMap recordMap = bufferedData_.get(recordName); + synchronized (recordMap) + { + for (TagMap tagMap : recordMap.keySet()) + { + MetricMap metricMap = recordMap.get(tagMap); + OutputRecord outRec = new OutputRecord(tagMap, metricMap); + emitRecord(recordName, outRec); + } + } + } + flush(); + } + + /** + * Called each period after all records have been emitted, this method does nothing. + * Subclasses may override it in order to perform some kind of flush. + */ + protected void flush() throws IOException + { + } + + /** + * Called by MetricsRecordImpl.update(). Creates or updates a row in + * the internal table of metric data. + */ + protected void update(MetricsRecord record) + { + String recordName = record.getRecordName(); + TagMap tagTable = record.getTagTable(); + Map metricUpdates = record.getMetricTable(); + + RecordMap recordMap = getRecordMap(recordName); + synchronized (recordMap) + { + MetricMap metricMap = recordMap.get(tagTable); + if (metricMap == null) + { + metricMap = new MetricMap(); + TagMap tagMap = new TagMap(tagTable); // clone tags + recordMap.put(tagMap, metricMap); + } + for (String metricName : metricUpdates.keySet()) + { + MetricValue updateValue = metricUpdates.get(metricName); + Number updateNumber = updateValue.getNumber(); + Number currentNumber = metricMap.get(metricName); + if (currentNumber == null || updateValue.isAbsolute()) + { + metricMap.put(metricName, updateNumber); + } + else + { + Number newNumber = sum(updateNumber, currentNumber); + metricMap.put(metricName, newNumber); + } + } + } + } + + private RecordMap getRecordMap(String recordName) + { + return bufferedData_.get(recordName); + } + + /** + * Adds two numbers, coercing the second to the type of the first. + * + */ + private Number sum(Number a, Number b) + { + if (a instanceof Integer) + { + return new Integer(a.intValue() + b.intValue()); + } + else if (a instanceof Float) + { + return new Float(a.floatValue() + b.floatValue()); + } + else if (a instanceof Short) + { + return new Short((short)(a.shortValue() + b.shortValue())); + } + else if (a instanceof Byte) + { + return new Byte((byte)(a.byteValue() + b.byteValue())); + } + else + { + // should never happen + throw new AnalyticsException("Invalid number type"); + } + } + + /** + * Called by MetricsRecordImpl.remove(). Removes any matching row in + * the internal table of metric data. A row matches if it has the same + * tag names and tag values. + */ + protected void remove(MetricsRecord record) + { + String recordName = record.getRecordName(); + TagMap tagTable = record.getTagTable(); + + RecordMap recordMap = getRecordMap(recordName); + + recordMap.remove(tagTable); + } + + /** + * Returns the timer period. + */ + public int getPeriod() + { + return period_; + } + + /** + * Sets the timer period + */ + protected void setPeriod(int period) + { + this.period_ = period; + } + + /** + * Sets the default port to listen on + */ + public void setPort(int port) + { + port_ = port; + } + + /** + * Parses a space and/or comma separated sequence of server specifications + * of the form hostname or hostname:port. If + * the specs string is null, defaults to localhost:defaultPort. + * + * @return a list of InetSocketAddress objects. + */ + private static List parse(String specs, int defaultPort) + { + List result = new ArrayList(1); + if (specs == null) { + result.add(new InetSocketAddress("localhost", defaultPort)); + } + else { + String[] specStrings = specs.split("[ ,]+"); + for (String specString : specStrings) { + int colon = specString.indexOf(':'); + if (colon < 0 || colon == specString.length() - 1) + { + result.add(new InetSocketAddress(specString, defaultPort)); + } else + { + String hostname = specString.substring(0, colon); + int port = Integer.parseInt(specString.substring(colon+1)); + result.add(new InetSocketAddress(hostname, port)); + } + } + } + return result; + } + + /** + * Starts up the analytics context and registers the VM metrics. + */ + public void start() + { + // register the vm analytics object with the analytics context to update the data + registerUpdater(new VMAnalyticsSource()); + + + init("analyticsContext", DatabaseDescriptor.getGangliaServers()); + + try + { + startMonitoring(); + } + catch(IOException e) + { + logger_.error(LogUtil.throwableToString(e)); + } + } + + public void stop() + { + close(); + } + + /** + * Factory method that gets an instance of the StorageService + * class. + */ + public static AnalyticsContext instance() + { + if ( instance_ == null ) + { + AnalyticsContext.createLock_.lock(); + try + { + if ( instance_ == null ) + { + instance_ = new AnalyticsContext(); + } + } + finally + { + createLock_.unlock(); + } + } + return instance_; + } +} Added: incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java (added) +++ incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +/** + * General-purpose, unchecked metrics exception. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com ) + */ +public class AnalyticsException extends RuntimeException +{ + + private static final long serialVersionUID = -1643257498540498497L; + + /** + * Creates a new instance of MetricsException + */ + public AnalyticsException() + { + } + + /** Creates a new instance of MetricsException + * + * @param message an error message + */ + public AnalyticsException(String message) + { + super(message); + } + + } Added: incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java (added) +++ incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class sets up the analytics package to report metrics into + * Ganglia for the various DB operations such as: reads per second, + * average read latency, writes per second, average write latency. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com ) + */ +public class DBAnalyticsSource implements IAnalyticsSource +{ + private static final String METRIC_READ_OPS = "Read Operations"; + private static final String RECORD_READ_OPS = "ReadOperationsRecord"; + private static final String TAG_READOPS = "ReadOperationsTag"; + private static final String TAG_READ_OPS = "ReadOperationsTagValue"; + + private static final String METRIC_READ_AVG = "Average Read Latency"; + private static final String RECORD_READ_AVG = "ReadLatencyRecord"; + private static final String TAG_READAVG = "AverageReadLatencyTag"; + private static final String TAG_READ_AVG = "ReadLatencyTagValue"; + + private static final String METRIC_WRITE_OPS = "Write Operations"; + private static final String RECORD_WRITE_OPS = "WriteOperationsRecord"; + private static final String TAG_WRITEOPS = "WriteOperationsTag"; + private static final String TAG_WRITE_OPS = "WriteOperationsTagValue"; + + private static final String METRIC_WRITE_AVG = "Average Write Latency"; + private static final String RECORD_WRITE_AVG = "WriteLatencyRecord"; + private static final String TAG_WRITEAVG = "AverageWriteLatencyTag"; + private static final String TAG_WRITE_AVG = "WriteLatencyTagValue"; + + /* keep track of the number of read operations */ + private AtomicInteger readOperations_ = new AtomicInteger(0); + + /* keep track of the number of read latencies */ + private AtomicLong readLatencies_ = new AtomicLong(0); + + /* keep track of the number of write operations */ + private AtomicInteger writeOperations_ = new AtomicInteger(0); + + /* keep track of the number of write latencies */ + private AtomicLong writeLatencies_ = new AtomicLong(0); + + /** + * Create all the required records we intend to display, and + * register with the AnalyticsContext. + */ + public DBAnalyticsSource() + { + /* register with the AnalyticsContext */ + AnalyticsContext.instance().registerUpdater(this); + /* set the units for the metric type */ + AnalyticsContext.instance().setAttribute("units." + METRIC_READ_OPS, "r/s"); + /* create the record */ + AnalyticsContext.instance().createRecord(RECORD_READ_OPS); + + /* set the units for the metric type */ + AnalyticsContext.instance().setAttribute("units." + METRIC_READ_AVG, "ms"); + /* create the record */ + AnalyticsContext.instance().createRecord(RECORD_READ_AVG); + + /* set the units for the metric type */ + AnalyticsContext.instance().setAttribute("units." + METRIC_WRITE_OPS, "w/s"); + /* create the record */ + AnalyticsContext.instance().createRecord(RECORD_WRITE_OPS); + + /* set the units for the metric type */ + AnalyticsContext.instance().setAttribute("units." + METRIC_WRITE_AVG, "ms"); + /* create the record */ + AnalyticsContext.instance().createRecord(RECORD_WRITE_AVG); + } + + /** + * Update each of the records with the relevant data + * + * @param context the reference to the context which has called this callback + */ + public void doUpdates(AnalyticsContext context) + { + // update the read operations record + MetricsRecord readUsageRecord = context.getMetricsRecord(RECORD_READ_OPS); + int period = context.getPeriod(); + + if(readUsageRecord != null) + { + if ( readOperations_.get() > 0 ) + { + readUsageRecord.setTag(TAG_READOPS, TAG_READ_OPS); + readUsageRecord.setMetric(METRIC_READ_OPS, readOperations_.get() / period); + readUsageRecord.update(); + } + } + + // update the read latency record + MetricsRecord readLatencyRecord = context.getMetricsRecord(RECORD_READ_AVG); + if(readLatencyRecord != null) + { + if ( readOperations_.get() > 0 ) + { + readLatencyRecord.setTag(TAG_READAVG, TAG_READ_AVG); + readLatencyRecord.setMetric(METRIC_READ_AVG, readLatencies_.get() / readOperations_.get() ); + readLatencyRecord.update(); + } + } + + // update the write operations record + MetricsRecord writeUsageRecord = context.getMetricsRecord(RECORD_WRITE_OPS); + if(writeUsageRecord != null) + { + if ( writeOperations_.get() > 0 ) + { + writeUsageRecord.setTag(TAG_WRITEOPS, TAG_WRITE_OPS); + writeUsageRecord.setMetric(METRIC_WRITE_OPS, writeOperations_.get() / period); + writeUsageRecord.update(); + } + } + + // update the write latency record + MetricsRecord writeLatencyRecord = context.getMetricsRecord(RECORD_WRITE_AVG); + if(writeLatencyRecord != null) + { + if ( writeOperations_.get() > 0 ) + { + writeLatencyRecord.setTag(TAG_WRITEAVG, TAG_WRITE_AVG); + writeLatencyRecord.setMetric(METRIC_WRITE_AVG, writeLatencies_.get() / writeOperations_.get() ); + writeLatencyRecord.update(); + } + } + + clear(); + } + + /** + * Reset all the metric records + */ + private void clear() + { + readOperations_.set(0); + readLatencies_.set(0); + writeOperations_.set(0); + writeLatencies_.set(0); + } + + /** + * Update the read statistics. + */ + public void updateReadStatistics(long latency) + { + readOperations_.incrementAndGet(); + readLatencies_.addAndGet(latency); + } + + /** + * Update the write statistics. + */ + public void updateWriteStatistics(long latency) + { + writeOperations_.incrementAndGet(); + writeLatencies_.addAndGet(latency); + } +} Added: incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java (added) +++ incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +/** + * Call-back interface. See AnalyticsContext.registerUpdater(). + * This callback is called at a regular (pre-registered time interval) in + * order to update the metric values. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com ) + */ +public interface IAnalyticsSource +{ + /** + * Timer-based call-back from the metric library. + */ + public abstract void doUpdates(AnalyticsContext context); + +} Added: incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java (added) +++ incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + + +/** + * A Number that is either an absolute or an incremental amount. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com ) + */ +public class MetricValue +{ + public static final boolean ABSOLUTE = false; + public static final boolean INCREMENT = true; + + private boolean isIncrement; + private Number number; + + /** + * Creates a new instance of MetricValue + * + * @param number this initializes the initial value of this metric + * @param isIncrement sets if the metric can be incremented or only set + */ + public MetricValue(Number number, boolean isIncrement) + { + this.number = number; + this.isIncrement = isIncrement; + } + + /** + * Checks if this metric can be incremented. + * + * @return true if the value of this metric can be incremented, false otherwise + */ + public boolean isIncrement() + { + return isIncrement; + } + + /** + * Checks if the value of this metric is always an absolute value. This is the + * inverse of isIncrement. + * + * @return true if the + */ + public boolean isAbsolute() + { + return !isIncrement; + } + + /** + * Returns the current number value of the metric. + * + * @return the Number value of this metric + */ + public Number getNumber() + { + return number; + } +} Added: incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java (added) +++ incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.LinkedHashMap; +import java.util.Map; + + +/** + * This class keeps a back-pointer to the AnalyticsContext + * and delegates back to it on update and remove(). + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com ) + */ +public class MetricsRecord { + + private AnalyticsContext.TagMap tagTable = new AnalyticsContext.TagMap(); + private Map metricTable = new LinkedHashMap(); + + private String recordName; + private AnalyticsContext context; + + + /** + * Creates a new instance of MetricsRecord + * + * @param recordName name of this record + * @param context the context which this record is a part of + */ + protected MetricsRecord(String recordName, AnalyticsContext context) + { + this.recordName = recordName; + this.context = context; + } + + /** + * Returns the record name. + * + * @return the record name + */ + public String getRecordName() { + return recordName; + } + + /** + * Sets the named tag to the specified value. + * + * @param tagName name of the tag + * @param tagValue new value of the tag + * @throws MetricsException if the tagName conflicts with the configuration + */ + public void setTag(String tagName, String tagValue) { + if (tagValue == null) { + tagValue = ""; + } + tagTable.put(tagName, tagValue); + } + + /** + * Sets the named tag to the specified value. + * + * @param tagName name of the tag + * @param tagValue new value of the tag + * @throws MetricsException if the tagName conflicts with the configuration + */ + public void setTag(String tagName, int tagValue) { + tagTable.put(tagName, new Integer(tagValue)); + } + + /** + * Sets the named tag to the specified value. + * + * @param tagName name of the tag + * @param tagValue new value of the tag + * @throws MetricsException if the tagName conflicts with the configuration + */ + public void setTag(String tagName, short tagValue) { + tagTable.put(tagName, new Short(tagValue)); + } + + /** + * Sets the named tag to the specified value. + * + * @param tagName name of the tag + * @param tagValue new value of the tag + * @throws MetricsException if the tagName conflicts with the configuration + */ + public void setTag(String tagName, byte tagValue) + { + tagTable.put(tagName, new Byte(tagValue)); + } + + /** + * Sets the named metric to the specified value. + * + * @param metricName name of the metric + * @param metricValue new value of the metric + * @throws MetricsException if the metricName or the type of the metricValue + * conflicts with the configuration + */ + public void setMetric(String metricName, int metricValue) + { + setAbsolute(metricName, new Integer(metricValue)); + } + + /** + * Sets the named metric to the specified value. + * + * @param metricName name of the metric + * @param metricValue new value of the metric + * @throws MetricsException if the metricName or the type of the metricValue + * conflicts with the configuration + */ + public void setMetric(String metricName, short metricValue) + { + setAbsolute(metricName, new Short(metricValue)); + } + + /** + * Sets the named metric to the specified value. + * + * @param metricName name of the metric + * @param metricValue new value of the metric + * @throws MetricsException if the metricName or the type of the metricValue + * conflicts with the configuration + */ + public void setMetric(String metricName, byte metricValue) + { + setAbsolute(metricName, new Byte(metricValue)); + } + + /** + * Sets the named metric to the specified value. + * + * @param metricName name of the metric + * @param metricValue new value of the metric + * @throws MetricsException if the metricName or the type of the metricValue + * conflicts with the configuration + */ + public void setMetric(String metricName, float metricValue) + { + setAbsolute(metricName, new Float(metricValue)); + } + + /** + * Increments the named metric by the specified value. + * + * @param metricName name of the metric + * @param metricValue incremental value + * @throws MetricsException if the metricName or the type of the metricValue + * conflicts with the configuration + */ + public void incrMetric(String metricName, int metricValue) + { + setIncrement(metricName, new Integer(metricValue)); + } + + /** + * Increments the named metric by the specified value. + * + * @param metricName name of the metric + * @param metricValue incremental value + * @throws MetricsException if the metricName or the type of the metricValue + * conflicts with the configuration + */ + public void incrMetric(String metricName, short metricValue) + { + setIncrement(metricName, new Short(metricValue)); + } + + /** + * Increments the named metric by the specified value. + * + * @param metricName name of the metric + * @param metricValue incremental value + * @throws MetricsException if the metricName or the type of the metricValue + * conflicts with the configuration + */ + public void incrMetric(String metricName, byte metricValue) + { + setIncrement(metricName, new Byte(metricValue)); + } + + /** + * Increments the named metric by the specified value. + * + * @param metricName name of the metric + * @param metricValue incremental value + * @throws MetricsException if the metricName or the type of the metricValue + * conflicts with the configuration + */ + public void incrMetric(String metricName, float metricValue) + { + setIncrement(metricName, new Float(metricValue)); + } + + /** + * Sets the value of the metric identified by metricName with the + * number metricValue. + * + * @param metricName name of the metric + * @param metricValue number value to which it should be updated + */ + private void setAbsolute(String metricName, Number metricValue) + { + metricTable.put(metricName, new MetricValue(metricValue, MetricValue.ABSOLUTE)); + } + + /** + * Increments the value of the metric identified by metricName with the + * number metricValue. + * + * @param metricName name of the metric + * @param metricValue number value by which it should be incremented + */ + private void setIncrement(String metricName, Number metricValue) + { + metricTable.put(metricName, new MetricValue(metricValue, MetricValue.INCREMENT)); + } + + /** + * Updates the table of buffered data which is to be sent periodically. + * If the tag values match an existing row, that row is updated; + * otherwise, a new row is added. + */ + public void update() + { + context.update(this); + } + + /** + * Removes the row, if it exists, in the buffered data table having tags + * that equal the tags that have been set on this record. + */ + public void remove() + { + context.remove(this); + } + + AnalyticsContext.TagMap getTagTable() + { + return tagTable; + } + + Map getMetricTable() + { + return metricTable; + } +} Added: incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java (added) +++ incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.Collections; +import java.util.Set; + +/** + * Represents a record of metric data to be sent to a metrics system. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com ) + */ +public class OutputRecord +{ + private AnalyticsContext.TagMap tagMap; + private AnalyticsContext.MetricMap metricMap; + + /** + * Creates a new instance of OutputRecord + */ + OutputRecord(AnalyticsContext.TagMap tagMap, AnalyticsContext.MetricMap metricMap) + { + this.tagMap = tagMap; + this.metricMap = metricMap; + } + + /** + * Returns the set of tag names. + */ + public Set getTagNames() + { + return Collections.unmodifiableSet(tagMap.keySet()); + } + + /** + * Returns a tag object which is can be a String, Integer, Short or Byte. + * + * @return the tag value, or null if there is no such tag + */ + public Object getTag(String name) + { + return tagMap.get(name); + } + + /** + * Returns the set of metric names. + * + * @return the set of metric names + */ + public Set getMetricNames() + { + return Collections.unmodifiableSet(metricMap.keySet()); + } + + /** + * Returns the metric object which can be a Float, Integer, Short or Byte. + * + * @param name name of the metric for which the value is being requested + * @return return the tag value, or null if there is no such tag + */ + public Number getMetric(String name) + { + return (Number) metricMap.get(name); + } + +} \ No newline at end of file Added: incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java (added) +++ incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.HashMap; +import java.util.Map; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.LogUtil; +import org.apache.log4j.Logger; + + +/** + * This class sets up the analytics package to report metrics into + * Ganglia for VM heap utilization. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com ) + */ + +public class VMAnalyticsSource implements IAnalyticsSource +{ + private static final String METRIC_MEMUSAGE = "VM Heap Utilization"; + private static final String RECORD_MEMUSAGE = "MemoryUsageRecord"; + private static final String TAG_MEMUSAGE = "MemoryUsageTag"; + private static final String TAG_MEMUSAGE_MEMUSED = "MemoryUsedTagValue"; + + /** + * Setup the Ganglia record to display the VM heap utilization. + */ + public VMAnalyticsSource() + { + // set the units for the metric type + AnalyticsContext.instance().setAttribute("units." + METRIC_MEMUSAGE, "MB"); + // create the record + AnalyticsContext.instance().createRecord(RECORD_MEMUSAGE); + } + + /** + * Update the VM heap utilization record with the relevant data. + * + * @param context the reference to the context which has called this callback + */ + public void doUpdates(AnalyticsContext context) + { + // update the memory used record + MetricsRecord memUsageRecord = context.getMetricsRecord(RECORD_MEMUSAGE); + if(memUsageRecord != null) + { + updateUsedMemory(memUsageRecord); + } + } + + private void updateUsedMemory(MetricsRecord memUsageRecord) + { + memUsageRecord.setTag(TAG_MEMUSAGE, TAG_MEMUSAGE_MEMUSED); + memUsageRecord.setMetric(METRIC_MEMUSAGE, getMemoryUsed()); + memUsageRecord.update(); + } + + private float getMemoryUsed() + { + MemoryMXBean memoryMxBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memUsage = memoryMxBean.getHeapMemoryUsage(); + return (float)memUsage.getUsed()/(1024 * 1024); + } +} Added: incubator/cassandra/src/org/apache/cassandra/cli/Cli.g URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/Cli.g?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/cli/Cli.g (added) +++ incubator/cassandra/src/org/apache/cassandra/cli/Cli.g Mon Mar 2 06:12:46 2009 @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// ANTLR Grammar for the Cassandra Command Line Interface (CLI). +// +// Note: This grammar handles all but the CQL statements. CQL +// statements are detected separately (based on the first token) +// and directly sent to server-side for processing. +// + +grammar Cli; + +options { + output=AST; + ASTLabelType=CommonTree; + backtrack=true; +} + +// +// Nodes in the AST +// +tokens { + // + // Top-level nodes. These typically correspond to + // various top-level CLI statements. + // + NODE_CONNECT; + NODE_DESCRIBE_TABLE; + NODE_EXIT; + NODE_HELP; + NODE_NO_OP; + NODE_SHOW_CLUSTER_NAME; + NODE_SHOW_CONFIG_FILE; + NODE_SHOW_VERSION; + NODE_SHOW_TABLES; + NODE_THRIFT_GET; + NODE_THRIFT_SET; + + // Internal Nodes. + NODE_COLUMN_ACCESS; + NODE_ID_LIST; +} + +@parser::header { +package com.facebook.infrastructure.cli; +} + +@lexer::header { +package com.facebook.infrastructure.cli; +} + +// +// Parser Section +// + +// the root node +root: stmt SEMICOLON? EOF -> stmt; + +stmt + : connectStmt + | exitStmt + | describeTable + | getStmt + | helpStmt + | setStmt + | showStmt + | -> ^(NODE_NO_OP) + ; + +connectStmt + : K_CONNECT host SLASH port -> ^(NODE_CONNECT host port) + ; + +helpStmt + : K_HELP -> ^(NODE_HELP) + | '?' -> ^(NODE_HELP) + ; + +exitStmt + : K_QUIT -> ^(NODE_EXIT) + | K_EXIT -> ^(NODE_EXIT) + ; + +getStmt + : K_THRIFT K_GET columnFamilyExpr -> ^(NODE_THRIFT_GET columnFamilyExpr) + ; + +setStmt + : K_THRIFT K_SET columnFamilyExpr '=' value -> ^(NODE_THRIFT_SET columnFamilyExpr value) + ; + +showStmt + : showClusterName + | showVersion + | showConfigFile + | showTables + ; + +showClusterName + : K_SHOW K_CLUSTER K_NAME -> ^(NODE_SHOW_CLUSTER_NAME) + ; + +showConfigFile + : K_SHOW K_CONFIG K_FILE -> ^(NODE_SHOW_CONFIG_FILE) + ; + +showVersion + : K_SHOW K_VERSION -> ^(NODE_SHOW_VERSION) + ; + +showTables + : K_SHOW K_TABLES -> ^(NODE_SHOW_TABLES) + ; + +describeTable + : K_DESCRIBE K_TABLE table -> ^(NODE_DESCRIBE_TABLE table); + +columnFamilyExpr + : table DOT columnFamily '[' rowKey ']' + ( '[' a+=columnOrSuperColumn ']' + ('[' a+=columnOrSuperColumn ']')? + )? + -> ^(NODE_COLUMN_ACCESS table columnFamily rowKey ($a+)?) + ; + +table: Identifier; + +columnFamily: Identifier; + +rowKey: StringLiteral; + +value: StringLiteral; + +columnOrSuperColumn: StringLiteral; + +host: id+=Identifier (id+=DOT id+=Identifier)* -> ^(NODE_ID_LIST $id+); + +port: IntegerLiteral; + +// +// Lexer Section +// + +// +// Keywords (in alphabetical order for convenience) +// +// CLI is case-insensitive with respect to these keywords. +// However, they MUST be listed in upper case here. +// +K_CONFIG: 'CONFIG'; +K_CONNECT: 'CONNECT'; +K_CLUSTER: 'CLUSTER'; +K_DESCRIBE: 'DESCRIBE'; +K_GET: 'GET'; +K_HELP: 'HELP'; +K_EXIT: 'EXIT'; +K_FILE: 'FILE'; +K_NAME: 'NAME'; +K_QUIT: 'QUIT'; +K_SET: 'SET'; +K_SHOW: 'SHOW'; +K_TABLE: 'TABLE'; +K_TABLES: 'TABLES'; +K_THRIFT: 'THRIFT'; +K_VERSION: 'VERSION'; + +// private syntactic rules +fragment +Letter + : 'a'..'z' + | 'A'..'Z' + ; + +fragment +Digit + : '0'..'9' + ; + +// syntactic Elements +Identifier + : Letter ( Letter | Digit | '_')* + ; + + +// literals +StringLiteral + : + '\'' (~'\'')* '\'' ( '\'' (~'\'')* '\'' )* + ; + +IntegerLiteral + : Digit+; + + +// +// syntactic elements +// + +DOT + : '.' + ; + +SLASH + : '/' + ; + +SEMICOLON + : ';' + ; + +WS + : (' '|'\r'|'\t'|'\n') {$channel=HIDDEN;} // whitepace + ; + +COMMENT + : '--' (~('\n'|'\r'))* { $channel=HIDDEN; } + | '/*' (options {greedy=false;} : .)* '*/' { $channel=HIDDEN; } + ; Added: incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens (added) +++ incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens Mon Mar 2 06:12:46 2009 @@ -0,0 +1,43 @@ +NODE_SHOW_CONFIG_FILE=10 +K_TABLES=32 +K_VERSION=31 +K_EXIT=22 +NODE_EXIT=6 +K_FILE=30 +K_GET=24 +K_CONNECT=18 +K_CONFIG=29 +SEMICOLON=17 +Digit=40 +Identifier=36 +NODE_THRIFT_GET=13 +K_SET=25 +StringLiteral=37 +NODE_HELP=7 +NODE_NO_OP=8 +NODE_THRIFT_SET=14 +K_DESCRIBE=33 +NODE_SHOW_VERSION=11 +NODE_ID_LIST=16 +WS=41 +NODE_CONNECT=4 +SLASH=19 +K_THRIFT=23 +NODE_SHOW_TABLES=12 +K_CLUSTER=27 +K_HELP=20 +K_SHOW=26 +NODE_DESCRIBE_TABLE=5 +K_TABLE=34 +IntegerLiteral=38 +NODE_SHOW_CLUSTER_NAME=9 +COMMENT=42 +DOT=35 +K_NAME=28 +Letter=39 +NODE_COLUMN_ACCESS=15 +K_QUIT=21 +'?'=43 +'='=44 +'['=45 +']'=46 Added: incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java?rev=749205&view=auto ============================================================================== --- incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java (added) +++ incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java Mon Mar 2 06:12:46 2009 @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cli; + +import com.facebook.thrift.*; + +import org.antlr.runtime.tree.*; +import org.apache.cassandra.cql.common.Utils; +import org.apache.cassandra.service.Cassandra; +import org.apache.cassandra.service.CassandraException; +import org.apache.cassandra.service.CqlResult_t; +import org.apache.cassandra.service.column_t; +import org.apache.cassandra.service.Cassandra.Client; +import org.apache.cassandra.utils.LogUtil; + +import java.util.*; + +// Cli Client Side Library +public class CliClient +{ + private Cassandra.Client thriftClient_ = null; + private CliSessionState css_ = null; + + public CliClient(CliSessionState css, Cassandra.Client thriftClient) + { + css_ = css; + thriftClient_ = thriftClient; + } + + // Execute a CLI Statement + public void executeCLIStmt(String stmt) throws TException + { + CommonTree ast = null; + + ast = CliCompiler.compileQuery(stmt); + + switch (ast.getType()) { + case CliParser.NODE_EXIT: + cleanupAndExit(); + break; + case CliParser.NODE_THRIFT_GET: + executeGet(ast); + break; + case CliParser.NODE_HELP: + printCmdHelp(); + break; + case CliParser.NODE_THRIFT_SET: + executeSet(ast); + break; + case CliParser.NODE_SHOW_CLUSTER_NAME: + executeShowProperty(ast, "cluster name"); + break; + case CliParser.NODE_SHOW_CONFIG_FILE: + executeShowProperty(ast, "config file"); + break; + case CliParser.NODE_SHOW_VERSION: + executeShowProperty(ast, "version"); + break; + case CliParser.NODE_SHOW_TABLES: + executeShowTables(ast); + break; + case CliParser.NODE_DESCRIBE_TABLE: + executeDescribeTable(ast); + break; + case CliParser.NODE_CONNECT: + executeConnect(ast); + break; + case CliParser.NODE_NO_OP: + // comment lines come here; they are treated as no ops. + break; + default: + css_.err.println("Invalid Statement (Type: " + ast.getType() + ")"); + break; + } + } + + private void printCmdHelp() + { + css_.out.println("List of all CLI commands:"); + css_.out.println("? Same as help."); + css_.out.println("connect / Connect to Cassandra's thrift service."); + css_.out.println("describe table Describe table."); + css_.out.println("exit Exit CLI."); + css_.out.println("explain plan [||