lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1684711 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/store/hdfs/ core/src/test/org/apache/solr/core/
Date Wed, 10 Jun 2015 16:51:56 GMT
Author: markrmiller
Date: Wed Jun 10 16:51:56 2015
New Revision: 1684711

URL: http://svn.apache.org/r1684711
Log:
SOLR-7458: Expose HDFS Block Locality Metrics via JMX

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsLocalityReporter.java
  (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1684711&r1=1684710&r2=1684711&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Jun 10 16:51:56 2015
@@ -98,6 +98,8 @@ New Features
   necessarily returned in the file SolrDocument by returning a list of fields from 
   DocTransformer#getExtraRequestFields  (ryan)
 
+* SOLR-7458: Expose HDFS Block Locality Metrics via JMX (Mike Drob via Mark Miller)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java?rev=1684711&r1=1684710&r2=1684711&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java Wed
Jun 10 16:51:56 2015
@@ -53,16 +53,20 @@ import org.apache.solr.store.blockcache.
 import org.apache.solr.store.blockcache.Cache;
 import org.apache.solr.store.blockcache.Metrics;
 import org.apache.solr.store.hdfs.HdfsDirectory;
+import org.apache.solr.store.hdfs.HdfsLocalityReporter;
 import org.apache.solr.store.hdfs.HdfsLockFactory;
 import org.apache.solr.util.HdfsUtil;
+import org.apache.solr.util.plugin.SolrCoreAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 
-public class HdfsDirectoryFactory extends CachingDirectoryFactory {
+public class HdfsDirectoryFactory extends CachingDirectoryFactory implements SolrCoreAware
{
   public static Logger LOG = LoggerFactory
       .getLogger(HdfsDirectoryFactory.class);
   
@@ -108,7 +112,7 @@ public class HdfsDirectoryFactory extend
         }
       })
       .build();
-  
+
   private final static class MetricsHolder {
     // [JCIP SE, Goetz, 16.6] Lazy initialization
     // Won't load until MetricsHolder is referenced
@@ -126,6 +130,10 @@ public class HdfsDirectoryFactory extend
     tmpFsCache.cleanUp();
   }
 
+  private final static class LocalityHolder {
+    public static final HdfsLocalityReporter reporter = new HdfsLocalityReporter();
+  }
+
   @Override
   public void init(NamedList args) {
     params = SolrParams.toSolrParams(args);
@@ -177,6 +185,7 @@ public class HdfsDirectoryFactory extend
     boolean blockCacheGlobal = getConfig(BLOCKCACHE_GLOBAL, false); // default to false for
back compat
     boolean blockCacheReadEnabled = getConfig(BLOCKCACHE_READ_ENABLED, true);
     
+    final HdfsDirectory hdfsDir;
     final Directory dir;
     if (blockCacheEnabled && dirContext != DirContext.META_DATA) {
       int numberOfBlocksPerBank = getConfig(NUMBEROFBLOCKSPERBANK, 16384);
@@ -204,12 +213,15 @@ public class HdfsDirectoryFactory extend
           bufferSize, bufferCount, blockCacheGlobal);
       
       Cache cache = new BlockDirectoryCache(blockCache, path, metrics, blockCacheGlobal);
-      HdfsDirectory hdfsDirectory = new HdfsDirectory(new Path(path), lockFactory, conf);
-      dir = new BlockDirectory(path, hdfsDirectory, cache, null, blockCacheReadEnabled, false);
+      hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf);
+      dir = new BlockDirectory(path, hdfsDir, cache, null, blockCacheReadEnabled, false);
     } else {
-      dir = new HdfsDirectory(new Path(path), lockFactory, conf);
+      hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf);
+      dir = hdfsDir;
     }
     
+    LocalityHolder.reporter.registerDirectory(hdfsDir);
+
     boolean nrtCachingDirectory = getConfig(NRTCACHINGDIRECTORY_ENABLE, true);
     if (nrtCachingDirectory) {
       double nrtCacheMaxMergeSizeMB = getConfig(NRTCACHINGDIRECTORY_MAXMERGESIZEMB, 16);
@@ -443,7 +455,17 @@ public class HdfsDirectoryFactory extend
 
   @Override
   public Collection<SolrInfoMBean> offerMBeans() {
-    return Arrays.<SolrInfoMBean>asList(MetricsHolder.metrics);
+    return Arrays.<SolrInfoMBean>asList(MetricsHolder.metrics, LocalityHolder.reporter);
+  }
+
+  @Override
+  public void inform(SolrCore core) {
+    setHost(core.getCoreDescriptor().getCoreContainer().getHostName());
+  }
+
+  @VisibleForTesting
+  void setHost(String hostname) {
+    LocalityHolder.reporter.setHost(hostname);
   }
 
   @Override

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java?rev=1684711&r1=1684710&r2=1684711&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java Wed Jun
10 16:51:56 2015
@@ -738,6 +738,7 @@ public class SolrResourceLoader implemen
     awareCompatibility.put( 
       SolrCoreAware.class, new Class[] {
         CodecFactory.class,
+        DirectoryFactory.class,
         ManagedIndexSchemaFactory.class,
         QueryResponseWriter.class,
         SearchComponent.class,

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java?rev=1684711&r1=1684710&r2=1684711&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java Wed
Jun 10 16:51:56 2015
@@ -100,6 +100,15 @@ public class HdfsDirectory extends BaseD
   public void close() throws IOException {
     LOG.info("Closing hdfs directory {}", hdfsDirPath);
     fileSystem.close();
+    isOpen = false;
+  }
+
+  /**
+   * Check whether this directory is open or closed. This check may return stale results
in the form of false negatives.
+   * @return true if the directory is definitely closed, false if the directory is open or
is pending closure
+   */
+  public boolean isClosed() {
+    return !isOpen;
   }
   
   @Override
@@ -241,4 +250,22 @@ public class HdfsDirectory extends BaseD
     LOG.debug("Sync called on {}", Arrays.toString(names.toArray()));
   }
   
+  @Override
+  public int hashCode() {
+    return hdfsDirPath.hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof HdfsDirectory)) {
+      return false;
+    }
+    return this.hdfsDirPath.equals(((HdfsDirectory) obj).hdfsDirPath);
+  }
 }

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsLocalityReporter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsLocalityReporter.java?rev=1684711&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsLocalityReporter.java
(added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/hdfs/HdfsLocalityReporter.java
Wed Jun 10 16:51:56 2015
@@ -0,0 +1,212 @@
+package org.apache.solr.store.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.core.SolrInfoMBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HdfsLocalityReporter implements SolrInfoMBean {
+  public static final String LOCALITY_BYTES_TOTAL = "locality.bytes.total";
+  public static final String LOCALITY_BYTES_LOCAL = "locality.bytes.local";
+  public static final String LOCALITY_BYTES_RATIO = "locality.bytes.ratio";
+  public static final String LOCALITY_BLOCKS_TOTAL = "locality.blocks.total";
+  public static final String LOCALITY_BLOCKS_LOCAL = "locality.blocks.local";
+  public static final String LOCALITY_BLOCKS_RATIO = "locality.blocks.ratio";
+
+  public static final Logger logger = LoggerFactory.getLogger(HdfsLocalityReporter.class);
+
+  private String hostname;
+  private final ConcurrentMap<HdfsDirectory,ConcurrentMap<FileStatus,BlockLocation[]>>
cache;
+
+  public HdfsLocalityReporter() {
+    cache = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Set the host name to use when determining locality
+   * @param hostname The name of this host; should correspond to what HDFS Data Nodes think
this is.
+   */
+  public void setHost(String hostname) {
+    this.hostname = hostname;
+  }
+  
+  @Override
+  public String getName() {
+    return "hdfs-locality";
+  }
+
+  @Override
+  public String getVersion() {
+    return getClass().getPackage().getSpecificationVersion();
+  }
+
+  @Override
+  public String getDescription() {
+    return "Provides metrics for HDFS data locality.";
+  }
+
+  @Override
+  public Category getCategory() {
+    return Category.OTHER;
+  }
+
+  @Override
+  public String getSource() {
+    return null;
+  }
+
+  @Override
+  public URL[] getDocs() {
+    return null;
+  }
+
+  /**
+   * Provide statistics on HDFS block locality, both in terms of bytes and block counts.
+   */
+  @Override
+  public NamedList getStatistics() {
+    long totalBytes = 0;
+    long localBytes = 0;
+    int totalCount = 0;
+    int localCount = 0;
+
+    for (Iterator<HdfsDirectory> iterator = cache.keySet().iterator(); iterator.hasNext();)
{
+      HdfsDirectory hdfsDirectory = iterator.next();
+
+      if (hdfsDirectory.isClosed()) {
+        iterator.remove();
+      } else {
+        try {
+          refreshDirectory(hdfsDirectory);
+          Map<FileStatus,BlockLocation[]> blockMap = cache.get(hdfsDirectory);
+
+          // For every block in every file in this directory, count it
+          for (BlockLocation[] locations : blockMap.values()) {
+            for (BlockLocation bl : locations) {
+              totalBytes += bl.getLength();
+              totalCount++;
+
+              if (Arrays.asList(bl.getHosts()).contains(hostname)) {
+                localBytes += bl.getLength();
+                localCount++;
+              }
+            }
+          }
+        } catch (IOException e) {
+          logger.warn("Could not retrieve locality information for {} due to exception: {}",
+              hdfsDirectory.getHdfsDirPath(), e);
+        }
+      }
+    }
+
+    return createStatistics(totalBytes, localBytes, totalCount, localCount);
+  }
+
+  /**
+   * Generate a statistics object based on the given measurements for all files monitored
by this reporter.
+   * 
+   * @param totalBytes
+   *          The total bytes used
+   * @param localBytes
+   *          The amount of bytes found on local nodes
+   * @param totalCount
+   *          The total block count
+   * @param localCount
+   *          The amount of blocks found on local nodes
+   * @return HDFS block locality statistics
+   */
+  private NamedList<Number> createStatistics(long totalBytes, long localBytes, int
totalCount, int localCount) {
+    NamedList<Number> statistics = new SimpleOrderedMap<Number>();
+
+    statistics.add(LOCALITY_BYTES_TOTAL, totalBytes);
+    statistics.add(LOCALITY_BYTES_LOCAL, localBytes);
+    if (localBytes == 0) {
+      statistics.add(LOCALITY_BYTES_RATIO, 0);
+    } else {
+      statistics.add(LOCALITY_BYTES_RATIO, localBytes / (double) totalBytes);
+    }
+    statistics.add(LOCALITY_BLOCKS_TOTAL, totalCount);
+    statistics.add(LOCALITY_BLOCKS_LOCAL, localCount);
+    if (localCount == 0) {
+      statistics.add(LOCALITY_BLOCKS_RATIO, 0);
+    } else {
+      statistics.add(LOCALITY_BLOCKS_RATIO, localCount / (double) totalCount);
+    }
+
+    return statistics;
+  }
+
+  /**
+   * Add a directory for block locality reporting. This directory will continue to be checked
until its close method has
+   * been called.
+   * 
+   * @param dir
+   *          The directory to keep metrics on.
+   */
+  public void registerDirectory(HdfsDirectory dir) {
+    logger.info("Registering direcotry {} for locality metrics.", dir.getHdfsDirPath().toString());
+    cache.put(dir, new ConcurrentHashMap<FileStatus, BlockLocation[]>());
+  }
+
+  /**
+   * Update the cached block locations for the given directory. This includes deleting any
files that no longer exist in
+   * the file system and adding any new files that have shown up.
+   * 
+   * @param dir
+   *          The directory to refresh
+   * @throws IOException
+   *           If there is a problem getting info from HDFS
+   */
+  private void refreshDirectory(HdfsDirectory dir) throws IOException {
+    Map<FileStatus,BlockLocation[]> directoryCache = cache.get(dir);
+    Set<FileStatus> cachedStatuses = directoryCache.keySet();
+
+    FileSystem fs = dir.getFileSystem();
+    FileStatus[] statuses = fs.listStatus(dir.getHdfsDirPath());
+    List<FileStatus> statusList = Arrays.asList(statuses);
+
+    logger.debug("Updating locality information for: {}", statusList);
+
+    // Keep only the files that still exist
+    cachedStatuses.retainAll(statusList);
+
+    // Fill in missing entries in the cache
+    for (FileStatus status : statusList) {
+      if (!status.isDirectory() && !directoryCache.containsKey(status)) {
+        BlockLocation[] locations = fs.getFileBlockLocations(status, 0, status.getLen());
+        directoryCache.put(status, locations);
+      }
+    }
+  }
+}

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java?rev=1684711&r1=1684710&r2=1684711&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java
(original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java
Wed Jun 10 16:51:56 2015
@@ -17,23 +17,34 @@
 
 package org.apache.solr.core;
 
+import java.io.IOException;
 import java.nio.file.Path;
 import java.text.SimpleDateFormat;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Locale;
+import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.LockFactory;
 import org.apache.lucene.store.NoLockFactory;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.cloud.hdfs.HdfsTestUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.handler.SnapShooter;
+import org.apache.solr.store.hdfs.HdfsLocalityReporter;
 import org.apache.solr.util.BadHdfsThreadsFilter;
 import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -165,5 +176,51 @@ public class HdfsDirectoryFactoryTest ex
     assertTrue(hdfs.isDirectory(currentIndexDirPath));
     assertTrue(!hdfs.isDirectory(oldIndexDirPath));
   }
-
+  
+  @Test
+  public void testLocalityReporter() throws Exception {
+    Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster);
+    conf.set("dfs.permissions.enabled", "false");
+    
+    HdfsDirectoryFactory factory = new HdfsDirectoryFactory();
+    Map<String,String> props = new HashMap<String,String>();
+    props.put(HdfsDirectoryFactory.HDFS_HOME, HdfsTestUtil.getURI(dfsCluster) + "/solr");
+    props.put(HdfsDirectoryFactory.BLOCKCACHE_ENABLED, "false");
+    props.put(HdfsDirectoryFactory.NRTCACHINGDIRECTORY_ENABLE, "false");
+    factory.init(new NamedList<>(props));
+    
+    Iterator<SolrInfoMBean> it = factory.offerMBeans().iterator();
+    it.next(); // skip
+    SolrInfoMBean localityBean = it.next(); // brittle, but it's ok
+    
+    // Make sure we have the right bean.
+    assertEquals("hdfs-locality", localityBean.getName());
+    
+    // We haven't done anything, so there should be no data
+    NamedList<?> statistics = localityBean.getStatistics();
+    assertEquals(0l, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_TOTAL));
+    assertEquals(0, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_RATIO));
+    
+    // create a directory and a file
+    String path = HdfsTestUtil.getURI(dfsCluster) + "/solr3/";
+    Directory dir = factory.create(path, NoLockFactory.INSTANCE, DirContext.DEFAULT);
+    try(IndexOutput writer = dir.createOutput("output", null)) {
+      writer.writeLong(42l);
+    }
+    
+    final long long_bytes = Long.SIZE / Byte.SIZE;
+    
+    // no locality because hostname not set
+    statistics = localityBean.getStatistics();
+    assertEquals(long_bytes, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_TOTAL));
+    assertEquals(1, statistics.get(HdfsLocalityReporter.LOCALITY_BLOCKS_TOTAL));
+    assertEquals(0, statistics.get(HdfsLocalityReporter.LOCALITY_BLOCKS_LOCAL));
+    
+    // set hostname and check again
+    factory.setHost("127.0.0.1");
+    statistics = localityBean.getStatistics();
+    assertEquals(long_bytes, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_LOCAL));
+    
+    factory.close();
+  }
 }



Mime
View raw message