tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/2] TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)
Date Sat, 14 Dec 2013 03:33:48 GMT
Updated Branches:
  refs/heads/master 1d0d458bf -> 62c49c05f


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
new file mode 100644
index 0000000..c9d25c5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.metrics.GroupNameMetricsFilter;
+import org.apache.tajo.util.metrics.MetricsFilterList;
+import org.apache.tajo.util.metrics.RegexpMetricsFilter;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class TajoMetricsScheduledReporter extends TajoMetricsReporter implements
Closeable {
+  private static final Log LOG = LogFactory.getLog(TajoMetricsScheduledReporter.class);
+
+  public static final String PERIOD_KEY = "period";
+
+  protected MetricRegistry registry;
+  protected ScheduledExecutorService executor;
+  protected MetricFilter filter;
+  protected double durationFactor;
+  protected String durationUnit;
+  protected double rateFactor;
+  protected String rateUnit;
+  protected Map<String, String> metricsProperties;
+  protected String metricsName;
+  protected String metricsPropertyKey;
+  protected String hostAndPort;
+  protected long period;
+
+  protected abstract String getReporterName();
+  protected abstract void afterInit();
+
+  private static class NamedThreadFactory implements ThreadFactory {
+    private final ThreadGroup group;
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+    private final String namePrefix;
+
+    private NamedThreadFactory(String name) {
+      final SecurityManager s = System.getSecurityManager();
+      this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+      this.namePrefix = "metrics-" + name + "-thread-";
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+      final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),
0);
+      t.setDaemon(true);
+      if (t.getPriority() != Thread.NORM_PRIORITY) {
+        t.setPriority(Thread.NORM_PRIORITY);
+      }
+      return t;
+    }
+  }
+
+  public long getPeriod() {
+    return period;
+  }
+
+  public void init(MetricRegistry registry,
+                   String metricsName,
+                   String hostAndPort,
+                   Map<String, String> metricsProperties) {
+    this.registry = registry;
+    this.metricsName = metricsName;
+    this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(metricsName));
+    this.rateFactor = TimeUnit.SECONDS.toSeconds(1);
+    this.rateUnit = calculateRateUnit(TimeUnit.MILLISECONDS);
+    this.durationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    this.durationUnit = TimeUnit.MILLISECONDS.toString().toLowerCase(Locale.US);
+    this.metricsProperties = metricsProperties;
+    this.metricsPropertyKey = metricsName + "." + getReporterName() + ".";
+    this.hostAndPort = hostAndPort;
+
+    MetricsFilterList filterList = new MetricsFilterList();
+    filterList.addMetricFilter(new GroupNameMetricsFilter(metricsName));
+
+    String regexpFilterKey = metricsPropertyKey + "regexp.";
+    Set<String> regexpExpressions = new HashSet<String>();
+
+    for(Map.Entry<String, String> entry: metricsProperties.entrySet()) {
+      String key = entry.getKey();
+      if(key.indexOf(regexpFilterKey) == 0) {
+        regexpExpressions.add(entry.getValue());
+      }
+    }
+
+    if(!regexpExpressions.isEmpty()) {
+      filterList.addMetricFilter(new RegexpMetricsFilter(regexpExpressions));
+    }
+    this.filter = filterList;
+
+    this.period = 60;
+    if(metricsProperties != null && metricsProperties.get(metricsPropertyKey + PERIOD_KEY)
!= null) {
+      this.period = Integer.parseInt(metricsProperties.get(metricsPropertyKey + PERIOD_KEY));
+    }
+    afterInit();
+  }
+
+  public void start() {
+    start(period, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Starts the reporter polling at the given period.
+   *
+   * @param period the amount of time between polls
+   * @param unit   the unit for {@code period}
+   */
+  public void start(long period, TimeUnit unit) {
+    this.period = period;
+    executor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          report();
+        } catch (Exception e) {
+          if(LOG.isDebugEnabled()) {
+            LOG.warn("Metric report error:" + e.getMessage(), e);
+          } else {
+            LOG.warn("Metric report error:" + e.getMessage(), e);
+          }
+        }
+      }
+    }, period, period, unit);
+  }
+
+  /**
+   * Stops the reporter and shuts down its thread of execution.
+   */
+  public void stop() {
+    executor.shutdown();
+    try {
+      executor.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException ignored) {
+      // do nothing
+    }
+  }
+
+  /**
+   * Stops the reporter and shuts down its thread of execution.
+   */
+  @Override
+  public void close() {
+    stop();
+  }
+
+  /**
+   * Report the current values of all metrics in the registry.
+   */
+  public void report() {
+    report(registry.getGauges(filter),
+        registry.getCounters(filter),
+        registry.getHistograms(filter),
+        registry.getMeters(filter),
+        registry.getTimers(filter));
+  }
+
+  protected String getRateUnit() {
+    return rateUnit;
+  }
+
+  protected String getDurationUnit() {
+    return durationUnit;
+  }
+
+  protected double convertDuration(double duration) {
+    return duration * durationFactor;
+  }
+
+  protected double convertRate(double rate) {
+    return rate * rateFactor;
+  }
+
+  private String calculateRateUnit(TimeUnit unit) {
+    final String s = unit.toString().toLowerCase(Locale.US);
+    return s.substring(0, s.length() - 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 1f3445a..986f453 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.codahale.metrics.Gauge;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,6 +51,7 @@ import org.apache.tajo.storage.v2.DiskUtil;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.webapp.StaticHttpServer;
 
 import java.io.*;
@@ -121,6 +123,8 @@ public class TajoWorker extends CompositeService {
 
   private DeletionService deletionService;
 
+  private TajoSystemMetrics workerSystemMetrics;
+
   public TajoWorker() throws Exception {
     super(TajoWorker.class.getName());
   }
@@ -261,6 +265,33 @@ public class TajoWorker extends CompositeService {
 
   }
 
+  private void initWorkerMetrics() {
+    workerSystemMetrics = new TajoSystemMetrics(systemConf, "worker", workerContext.getWorkerName());
+    workerSystemMetrics.start();
+
+    workerSystemMetrics.register("querymaster", "runningQueries", new Gauge<Integer>()
{
+      @Override
+      public Integer getValue() {
+        if(queryMasterManagerService != null) {
+          return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size();
+        } else {
+          return 0;
+        }
+      }
+    });
+
+    workerSystemMetrics.register("task", "runningTasks", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        if(taskRunnerManager != null) {
+          return taskRunnerManager.getNumTasks();
+        } else {
+          return 0;
+        }
+      }
+    });
+  }
+
   public WorkerContext getWorkerContext() {
     return workerContext;
   }
@@ -268,6 +299,7 @@ public class TajoWorker extends CompositeService {
   @Override
   public void start() {
     super.start();
+    initWorkerMetrics();
   }
 
   @Override
@@ -302,6 +334,10 @@ public class TajoWorker extends CompositeService {
       }
     }
 
+    if(workerSystemMetrics != null) {
+      workerSystemMetrics.stop();
+    }
+
     if(deletionService != null) deletionService.stop();
     super.stop();
     LOG.info("TajoWorker main thread exiting");
@@ -432,6 +468,10 @@ public class TajoWorker extends CompositeService {
     public boolean isTaskRunnerMode() {
       return taskRunnerMode;
     }
+
+    public TajoSystemMetrics getWorkerSystemMetrics() {
+      return workerSystemMetrics;
+    }
   }
 
   public void stopWorkerForce() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index d10b53d..bd2dc4d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -111,6 +111,7 @@ public class TajoWorkerManagerService extends CompositeService
   public void executeExecutionBlock(RpcController controller,
                                     TajoWorkerProtocol.RunExecutionBlockRequestProto request,
                                     RpcCallback<PrimitiveProtos.BoolProto> done) {
+    workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
     try {
       String[] params = new String[7];
       params[0] = "standby";  //mode(never used)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 1920f25..18c312f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -352,7 +352,7 @@ public class TaskRunner extends AbstractService {
                     taskRunnerManager.stopTask(getId());
                   }
                 } else {
-
+                  taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query",
"task").inc();
                   LOG.info("Accumulated Received Task: " + (++receivedNum));
 
                   QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 1ea213d..6523a4f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -43,6 +43,10 @@ public class TaskRunnerManager extends CompositeService {
     this.workerContext = workerContext;
   }
 
+  public TajoWorker.WorkerContext getWorkerContext() {
+    return workerContext;
+  }
+
   @Override
   public void init(Configuration conf) {
     tajoConf = (TajoConf)conf;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/resources/tajo-metrics.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-metrics.properties b/tajo-core/tajo-core-backend/src/main/resources/tajo-metrics.properties
new file mode 100644
index 0000000..4ae6a6c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-metrics.properties
@@ -0,0 +1,75 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+###############################################################################
+# report definition
+# syntax: reporter.<name>=<class>
+reporter.null=org.apache.tajo.util.metrics.reporter.NullReporter
+reporter.file=org.apache.tajo.util.metrics.reporter.MetricsFileScheduledReporter
+reporter.console=org.apache.tajo.util.metrics.reporter.MetricsConsoleScheduledReporter
+reporter.ganglia=org.apache.tajo.util.metrics.reporter.GangliaReporter
+###############################################################################
+
+###############################################################################
+# syntax: <metrics group name>.reporters=<reporter name1>[,<reporter name2>,...]
+# syntax: <metrics group name>.<reporter name>.<options>=<value>
+###############################################################################
+
+###############################################################################
+# tajo master
+###############################################################################
+tajomaster.reporters=null
+
+#tajomaster.reporters=file,console
+#tajomaster.console.period=60
+#tajomaster.file.filename=/tmp/tajo/tajomaster-metrics.out
+#tajomaster.file.period=60
+#tajomaster.ganglia.server=my.ganglia.com
+#tajomaster.ganglia.port=8649
+#tajomaster.ganglia.period=60
+###############################################################################
+
+###############################################################################
+# tajo master-jvm
+###############################################################################
+tajomaster-jvm.reporters=null
+#tajomaster-jvm.reporters=console
+#tajomaster-jvm.console.period=60
+#tajomaster-jvm.file.filename=/tmp/tajo/tajomaster-jvm-metrics.out
+#tajomaster-jvm.file.period=60
+###############################################################################
+
+###############################################################################
+# worker
+###############################################################################
+worker.reporters=null
+#worker.reporters=file,console
+#worker.console.period=60
+#worker.file.filename=/tmp/tajo/worker-metrics.out
+#worker.file.period=60
+###############################################################################
+
+###############################################################################
+# worker-jvm
+###############################################################################
+worker-jvm.reporters=null
+#worker-jvm.reporters=console
+#worker-jvm.console.period=60
+#worker-jvm.file.filename=/tmp/tajo/worker-jvm-metrics.out
+#worker-jvm.file.period=60
+###############################################################################

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
new file mode 100644
index 0000000..b70512c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestMetricsFilter {
+  @Test
+  public void testGroupNameMetricsFilter() {
+    GroupNameMetricsFilter filter = new GroupNameMetricsFilter("tajomaster");
+
+    assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null));
+    assertTrue(!filter.matches("tajomaster01.JVM.Heap.memFree", null));
+    assertTrue(!filter.matches("server.tajomaster.JVM.Heap.memFree", null));
+    assertTrue(!filter.matches("tajworker.JVM.Heap.memFree", null));
+  }
+
+  @Test
+  public void testRegexpMetricsFilter() {
+    List<String> filterExpressions = new ArrayList<String>();
+    filterExpressions.add("JVM");
+    filterExpressions.add("Query");
+
+    RegexpMetricsFilter filter = new RegexpMetricsFilter(filterExpressions);
+
+    assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null));
+    assertTrue(filter.matches("tajomaster.Query.numQuery", null));
+
+    assertTrue(!filter.matches("tajomaster.resource.numWorker", null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
new file mode 100644
index 0000000..a4af64c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Counter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestSystemMetrics {
+  Path testPropertyFile;
+  Path metricsOutputFile;
+  @Before
+  public void setUp() throws Exception {
+    testPropertyFile =
+        new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".properties");
+
+    metricsOutputFile =
+        new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".out");
+
+    FileOutputStream out = new FileOutputStream(testPropertyFile.toUri().getPath());
+    out.write("reporter.null=org.apache.tajo.util.metrics.reporter.NullReporter\n".getBytes());
+    out.write("reporter.file=org.apache.tajo.util.metrics.reporter.MetricsFileScheduledReporter\n".getBytes());
+    out.write("reporter.console=org.apache.tajo.util.metrics.reporter.MetricsConsoleScheduledReporter\n".getBytes());
+
+    out.write("test-file-group.reporters=file\n".getBytes());
+    out.write("test-console-group.reporters=console\n".getBytes());
+    out.write("test-find-console-group.reporters=console,file\n".getBytes());
+
+    out.write(("test-file-group.file.filename=" + metricsOutputFile.toUri().getPath() + "\n").getBytes());
+    out.write("test-file-group.file.period=5\n".getBytes());
+  }
+
+  @Test
+  public void testMetricsReporter() throws Exception {
+    TajoConf tajoConf = new TajoConf();
+    tajoConf.set("tajo.metrics.property.file", testPropertyFile.toUri().getPath());
+    TajoSystemMetrics tajoSystemMetrics = new TajoSystemMetrics(tajoConf, "test-file-group",
"localhost");
+    tajoSystemMetrics.start();
+
+    Collection<TajoMetricsScheduledReporter> reporters = tajoSystemMetrics.getMetricsReporters();
+
+    assertEquals(1, reporters.size());
+
+    TajoMetricsScheduledReporter reporter = reporters.iterator().next();
+    assertEquals(5, reporter.getPeriod());
+
+    for(int i = 0; i < 10; i++) {
+      tajoSystemMetrics.counter("test-group01", "test-item1").inc();
+      tajoSystemMetrics.counter("test-group01", "test-item2").inc(2);
+      tajoSystemMetrics.counter("test-group02", "test-item1").inc(3);
+    }
+
+    SortedMap<String, Counter> counterMap = tajoSystemMetrics.getRegistry().getCounters();
+    Counter counter1 = counterMap.get("test-file-group.test-group01.test-item1");
+    assertNotNull(counter1);
+    assertEquals(10, counter1.getCount());
+
+    Counter counter2 = counterMap.get("test-file-group.test-group01.test-item2");
+    assertNotNull(counter2);
+    assertEquals(20, counter2.getCount());
+
+    //test findMetricsItemGroup method
+    Map<String, Map<String, Counter>> groupItems = reporter.findMetricsItemGroup(counterMap);
+    assertEquals(2, groupItems.size());
+
+    Map<String, Counter> group01Items = groupItems.get("test-file-group.test-group01");
+    assertEquals(2, group01Items.size());
+
+    counter1 = group01Items.get("test-item1");
+    assertNotNull(counter1);
+    assertEquals(10, counter1.getCount());
+
+    counter2 = group01Items.get("test-item2");
+    assertNotNull(counter2);
+    assertEquals(20, counter2.getCount());
+
+    Map<String, Counter> group02Items = groupItems.get("test-file-group.test-group02");
+    assertEquals(1, group02Items.size());
+
+    reporter.report();
+
+    BufferedReader reader = new BufferedReader(new InputStreamReader(
+        new FileInputStream(metricsOutputFile.toUri().getPath())));
+
+    String line = null;
+
+    List<String> lines = new ArrayList<String>();
+    while((line = reader.readLine()) != null) {
+      lines.add(line);
+    }
+
+    assertEquals(2, lines.size());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileSystem fs = testPropertyFile.getFileSystem(new Configuration());
+    fs.delete(testPropertyFile, false);
+    fs.delete(metricsOutputFile, false);
+  }
+}


Mime
View raw message