Author: cdouglas Date: Wed Dec 3 17:16:07 2008 New Revision: 723181 URL: http://svn.apache.org/viewvc?rev=723181&view=rev Log: HADOOP-4758. Add a splitter for metrics contexts to support more than one type of collector. Added: hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723181&r1=723180&r2=723181&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Wed Dec 3 17:16:07 2008 @@ -158,6 +158,9 @@ HADOOP-4708. Add support for dfsadmin commands in TestCLI. (Boris Shkolnik via cdouglas) + HADOOP-4758. Add a splitter for metrics contexts to support more than one + type of collector. (cdouglas) + OPTIMIZATIONS HADOOP-3293. Fixes FileInputFormat to do provide locations for splits Modified: hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java?rev=723181&r1=723180&r2=723181&view=diff ============================================================================== --- hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java (original) +++ hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java Wed Dec 3 17:16:07 2008 @@ -45,8 +45,8 @@ private static ContextFactory theFactory = null; private Map attributeMap = new HashMap(); - private Map contextMap = - new HashMap(); + private Map contextMap = + new HashMap(); // Used only when contexts, or the ContextFactory itself, cannot be // created. @@ -119,23 +119,29 @@ * @param contextName the name of the context * @return the named MetricsContext */ - public synchronized MetricsContext getContext(String contextName) - throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException - { - AbstractMetricsContext metricsContext = contextMap.get(contextName); + public synchronized MetricsContext getContext(String refName, String contextName) + throws IOException, ClassNotFoundException, + InstantiationException, IllegalAccessException { + MetricsContext metricsContext = contextMap.get(refName); if (metricsContext == null) { - String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX; + String classNameAttribute = refName + CONTEXT_CLASS_SUFFIX; String className = (String) getAttribute(classNameAttribute); if (className == null) { className = DEFAULT_CONTEXT_CLASSNAME; } Class contextClass = Class.forName(className); - metricsContext = (AbstractMetricsContext) contextClass.newInstance(); + metricsContext = (MetricsContext) contextClass.newInstance(); metricsContext.init(contextName, this); contextMap.put(contextName, metricsContext); } return metricsContext; } + + public synchronized MetricsContext getContext(String contextName) + throws IOException, ClassNotFoundException, InstantiationException, + IllegalAccessException { + return getContext(contextName, contextName); + } /** * Returns a "null" context - one which does nothing. Modified: hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java?rev=723181&r1=723180&r2=723181&view=diff ============================================================================== --- hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java (original) +++ hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java Wed Dec 3 17:16:07 2008 @@ -31,7 +31,14 @@ * Default period in seconds at which data is sent to the metrics system. */ public static final int DEFAULT_PERIOD = 5; - + + /** + * Initialize this context. + * @param contextName The given name for this context + * @param factory The creator of this context + */ + public void init(String contextName, ContextFactory factory); + /** * Returns the context name. * Modified: hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java?rev=723181&r1=723180&r2=723181&view=diff ============================================================================== --- hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java (original) +++ hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java Wed Dec 3 17:16:07 2008 @@ -41,15 +41,20 @@ */ private MetricsUtil() {} + public static MetricsContext getContext(String contextName) { + return getContext(contextName, contextName); + } + /** * Utility method to return the named context. * If the desired context cannot be created for any reason, the exception * is logged, and a null context is returned. */ - public static MetricsContext getContext(String contextName) { + public static MetricsContext getContext(String refName, String contextName) { MetricsContext metricsContext; try { - metricsContext = ContextFactory.getFactory().getContext(contextName); + metricsContext = + ContextFactory.getFactory().getContext(refName, contextName); if (!metricsContext.isMonitoring()) { metricsContext.startMonitoring(); } Modified: hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java?rev=723181&r1=723180&r2=723181&view=diff ============================================================================== --- hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java (original) +++ hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java Wed Dec 3 17:16:07 2008 @@ -208,7 +208,7 @@ * @param recordName the name of the record * @return newly created instance of MetricsRecordImpl or subclass */ - protected MetricsRecordImpl newRecord(String recordName) { + protected MetricsRecord newRecord(String recordName) { return new MetricsRecordImpl(recordName, this); } Added: hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java?rev=723181&view=auto ============================================================================== --- hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java (added) +++ hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java Wed Dec 3 17:16:07 2008 @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics.spi; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.metrics.ContextFactory; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsException; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; + +public class CompositeContext extends AbstractMetricsContext { + + private static final Log LOG = LogFactory.getLog(CompositeContext.class); + private static final String ARITY_LABEL = "arity"; + private static final String SUB_FMT = "%s.sub%d"; + private final ArrayList subctxt = + new ArrayList(); + + public CompositeContext() { + } + + public void init(String contextName, ContextFactory factory) { + super.init(contextName, factory); + int nKids; + try { + String sKids = getAttribute(ARITY_LABEL); + nKids = Integer.valueOf(sKids); + } catch (Exception e) { + LOG.error("Unable to initialize composite metric " + contextName + + ": could not init arity", e); + return; + } + for (int i = 0; i < nKids; ++i) { + MetricsContext ctxt = MetricsUtil.getContext( + String.format(SUB_FMT, contextName, i), contextName); + if (null != ctxt) { + subctxt.add(ctxt); + } + } + } + + @Override + public MetricsRecord newRecord(String recordName) { + return (MetricsRecord) Proxy.newProxyInstance( + MetricsRecord.class.getClassLoader(), + new Class[] { MetricsRecord.class }, + new MetricsRecordDelegator(recordName, subctxt)); + } + + @Override + protected void emitRecord(String contextName, String recordName, + OutputRecord outRec) throws IOException { + for (MetricsContext ctxt : subctxt) { + try { + ((AbstractMetricsContext)ctxt).emitRecord( + contextName, recordName, outRec); + if (contextName == null || recordName == null || outRec == null) { + throw new IOException(contextName + ":" + recordName + ":" + outRec); + } + } catch (IOException e) { + LOG.warn("emitRecord failed: " + ctxt.getContextName(), e); + } + } + } + + @Override + protected void flush() throws IOException { + for (MetricsContext ctxt : subctxt) { + try { + ((AbstractMetricsContext)ctxt).flush(); + } catch (IOException e) { + LOG.warn("flush failed: " + ctxt.getContextName(), e); + } + } + } + + @Override + public void startMonitoring() throws IOException { + for (MetricsContext ctxt : subctxt) { + try { + ctxt.startMonitoring(); + } catch (IOException e) { + LOG.warn("startMonitoring failed: " + ctxt.getContextName(), e); + } + } + } + + @Override + public void stopMonitoring() { + for (MetricsContext ctxt : subctxt) { + ctxt.stopMonitoring(); + } + } + + /** + * Return true if all subcontexts are monitoring. + */ + @Override + public boolean isMonitoring() { + boolean ret = true; + for (MetricsContext ctxt : subctxt) { + ret &= ctxt.isMonitoring(); + } + return ret; + } + + @Override + public void close() { + for (MetricsContext ctxt : subctxt) { + ctxt.close(); + } + } + + @Override + public void registerUpdater(Updater updater) { + for (MetricsContext ctxt : subctxt) { + ctxt.registerUpdater(updater); + } + } + + @Override + public void unregisterUpdater(Updater updater) { + for (MetricsContext ctxt : subctxt) { + ctxt.unregisterUpdater(updater); + } + } + + private static class MetricsRecordDelegator implements InvocationHandler { + private static final Method m_getRecordName = initMethod(); + private static Method initMethod() { + try { + return MetricsRecord.class.getMethod("getRecordName", new Class[0]); + } catch (Exception e) { + throw new RuntimeException("Internal error", e); + } + } + + private final String recordName; + private final ArrayList subrecs; + + MetricsRecordDelegator(String recordName, ArrayList ctxts) { + this.recordName = recordName; + this.subrecs = new ArrayList(ctxts.size()); + for (MetricsContext ctxt : ctxts) { + subrecs.add(ctxt.createRecord(recordName)); + } + } + + public Object invoke(Object p, Method m, Object[] args) throws Throwable { + if (m_getRecordName.equals(m)) { + return recordName; + } + assert Void.TYPE.equals(m.getReturnType()); + for (MetricsRecord rec : subrecs) { + m.invoke(rec, args); + } + return null; + } + } + +}