incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [7/7] git commit: Making the index import for distributed in the hope of making it faster.
Date Sun, 12 Apr 2015 02:44:14 GMT
Making the index import for distributed in the hope of making it faster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/d2224777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/d2224777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/d2224777

Branch: refs/heads/master
Commit: d2224777b7804bed7dde781be41d19b7a32c3ef5
Parents: dec4fa7
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Apr 11 22:43:59 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Apr 11 22:43:59 2015 -0400

----------------------------------------------------------------------
 .../apache/blur/server/FilteredBlurServer.java  |  10 ++
 .../org/apache/blur/server/TableContext.java    |  83 -------------
 .../blur/thrift/BlurControllerServer.java       |  87 ++++++++++++--
 .../org/apache/blur/thrift/BlurShardServer.java |   7 +-
 .../java/org/apache/blur/thrift/TableAdmin.java | 116 ++++++++++++++++---
 .../server/cache/ThriftCacheServerTest.java     |  11 ++
 6 files changed, 205 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d2224777/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
index 1bff212..ff40e5a 100644
--- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
@@ -317,4 +317,14 @@ public class FilteredBlurServer implements Iface {
     return _iface.configurationPerServer(thriftServerPlusPort, configName);
   }
 
+  @Override
+  public void validateIndex(String table, List<String> externalIndexPaths) throws BlurException,
TException {
+    _iface.validateIndex(table, externalIndexPaths);
+  }
+
+  @Override
+  public void loadIndex(String table, List<String> externalIndexPaths) throws BlurException,
TException {
+    _iface.loadIndex(table, externalIndexPaths);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d2224777/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index ed62b9b..f4bae7c 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -58,20 +58,14 @@ import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
 //import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.server.cache.ThriftCache;
-import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.ScoreType;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.ShardUtil;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
 import org.apache.lucene.index.Term;
@@ -426,83 +420,6 @@ public class TableContext implements Cloneable {
     }
   }
 
-  public void loadData(String location) throws IOException {
-    Path path = new Path(location);
-    FileSystem fileSystem = path.getFileSystem(_configuration);
-
-    validateLoad(path, fileSystem);
-
-    FileStatus[] listStatus = fileSystem.listStatus(path);
-    for (FileStatus fileStatus : listStatus) {
-      loadShard(fileStatus.getPath(), fileSystem);
-    }
-
-    // printFS(path, fileSystem);
-
-  }
-
-  private void validateLoad(Path path, FileSystem fileSystem) throws IOException {
-    TableDescriptor descriptor = getDescriptor();
-    int shardCount = descriptor.getShardCount();
-    FileStatus[] listStatus = fileSystem.listStatus(path);
-    int count = 0;
-    for (FileStatus fileStatus : listStatus) {
-      Path shardPath = fileStatus.getPath();
-      String shardId = shardPath.getName();
-      int shardIndex = ShardUtil.getShardIndex(shardId);
-      if (shardIndex >= shardCount) {
-        throw new IOException("Too many shards [" + shardIndex + "].");
-      }
-      count++;
-      validateIndexesExist(shardPath, fileSystem);
-    }
-    if (shardCount != count) {
-      throw new IOException("Not enough shards [" + count + "] should be [" + shardCount
+ "].");
-    }
-  }
-
-  private void validateIndexesExist(Path shardPath, FileSystem fileSystem) throws IOException
{
-    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        return path.getName().endsWith(".commit");
-      }
-    });
-    for (FileStatus fileStatus : listStatus) {
-      Path path = fileStatus.getPath();
-      HdfsDirectory directory = new HdfsDirectory(_configuration, path);
-      try {
-        if (!DirectoryReader.indexExists(directory)) {
-          throw new IOException("Path [" + path + "] is not a valid index.");
-        }
-      } finally {
-        directory.close();
-      }
-    }
-  }
-
-  private void loadShard(Path newLoadShardPath, FileSystem fileSystem) throws IOException
{
-    Path tablePath = getTablePath();
-    Path shardPath = new Path(tablePath, newLoadShardPath.getName());
-    FileStatus[] listStatus = fileSystem.listStatus(newLoadShardPath, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        return path.getName().endsWith(".commit");
-      }
-    });
-
-    for (FileStatus fileStatus : listStatus) {
-      Path src = fileStatus.getPath();
-      Path dst = new Path(shardPath, src.getName());
-      if (fileSystem.rename(src, dst)) {
-        LOG.info("Successfully moved [{0}] to [{1}].", src, dst);
-      } else {
-        LOG.info("Could not move [{0}] to [{1}].", src, dst);
-        throw new IOException("Could not move [" + src + "] to [" + dst + "].");
-      }
-    }
-  }
-
   public Set<String> getDiscoverableFields() {
     return _discoverableFields;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d2224777/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 8bc8da4..b24bb03 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -112,12 +112,17 @@ import org.apache.blur.utils.BlurExecutorCompletionService;
 import org.apache.blur.utils.BlurIterator;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.ForkJoin;
+import org.apache.blur.utils.ShardUtil;
 import org.apache.blur.utils.ForkJoin.Merger;
 import org.apache.blur.utils.ForkJoin.ParallelCall;
 import org.apache.blur.zookeeper.WatchChildren;
 import org.apache.blur.zookeeper.WatchChildren.OnChange;
 import org.apache.blur.zookeeper.WatchNodeExistance;
 import org.apache.blur.zookeeper.ZookeeperPathConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -1053,14 +1058,6 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     });
   }
 
-  private String getCluster(String table) throws BlurException, TException {
-    TableDescriptor describe = describe(table);
-    if (describe == null) {
-      throw new BException("Table [" + table + "] not found.");
-    }
-    return describe.cluster;
-  }
-
   public static Schema merge(Schema result, Schema schema) {
     Map<String, Map<String, ColumnDefinition>> destColumnFamilies = result.getFamilies();
     Map<String, Map<String, ColumnDefinition>> srcColumnFamilies = schema.getFamilies();
@@ -1802,4 +1799,78 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     }
   }
 
+  @Override
+  public void loadData(String table, String location) throws BlurException, TException {
+    try {
+      String cluster = getCluster(table);
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster,
table);
+      TableContext tableContext = TableContext.create(tableDescriptor);
+      Configuration configuration = tableContext.getConfiguration();
+      Path path = new Path(location);
+      FileSystem fileSystem = path.getFileSystem(configuration);
+      validateOrLoadIndexes(cluster, table, path, fileSystem, tableDescriptor, configuration,
false);
+      validateOrLoadIndexes(cluster, table, path, fileSystem, tableDescriptor, configuration,
true);
+    } catch (IOException e) {
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void validateOrLoadIndexes(String cluster, final String table, Path path, FileSystem
fileSystem,
+      TableDescriptor descriptor, Configuration configuration, final boolean load) throws
IOException, BlurException,
+      TException {
+    int shardCount = descriptor.getShardCount();
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    int count = 0;
+    final Map<String, List<String>> shardServerPathMap = new HashMap<String,
List<String>>();
+    Map<String, String> shardServerLayout = shardServerLayout(descriptor.getName());
+    for (FileStatus fileStatus : listStatus) {
+      Path shardPath = fileStatus.getPath();
+      String shardId = shardPath.getName();
+      String server = shardServerLayout.get(shardId);
+
+      int shardIndex = ShardUtil.getShardIndex(shardId);
+      if (shardIndex >= shardCount) {
+        throw new IOException("Too many shards [" + shardIndex + "].");
+      }
+      List<String> paths = shardServerPathMap.get(server);
+      if (paths == null) {
+        shardServerPathMap.put(server, paths = new ArrayList<String>());
+      }
+      paths.add(shardPath.toString());
+      count++;
+    }
+    if (shardCount != count) {
+      throw new IOException("Not enough shards [" + count + "] should be [" + shardCount
+ "].");
+    }
+    try {
+      scatter(cluster, new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          throw new RuntimeException("Not Used.");
+        }
+
+        @Override
+        public Void call(Client client, Connection connection) throws BlurException, TException
{
+          String server = connection.getHost() + ":" + connection.getPort();
+          List<String> externalIndexPaths = shardServerPathMap.get(server);
+          if (externalIndexPaths == null || externalIndexPaths.isEmpty()) {
+            return null;
+          }
+          if (load) {
+            client.loadIndex(table, externalIndexPaths);
+          } else {
+            client.validateIndex(table, externalIndexPaths);
+          }
+          return null;
+        }
+
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to validate indexes for table [{0}]", e, table);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException("Unknown error while trying to validate indexes for table [{0}]",
e, table);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d2224777/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index d0e8cfc..625ed26 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -721,7 +721,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
     try {
       List<String> tableListByCluster = tableListByCluster(_cluster);
       List<String> writableTables = new ArrayList<String>();
-      for (String table :tableListByCluster) {
+      for (String table : tableListByCluster) {
         if (!_clusterStatus.isReadOnly(true, _cluster, table)) {
           writableTables.add(table);
         }
@@ -770,4 +770,9 @@ public class BlurShardServer extends TableAdmin implements Iface {
     return result;
   }
 
+  @Override
+  public void loadData(String table, String location) throws BlurException, TException {
+    throw new RuntimeException("Shard servers do not support this call.");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d2224777/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
index f2c29ba..c55ce0a 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -40,6 +40,7 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.generated.ArgumentDescriptor;
 import org.apache.blur.thrift.generated.Blur.Iface;
@@ -57,9 +58,15 @@ import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.TraceStorage;
 import org.apache.blur.utils.MemoryReporter;
 import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.lucene.index.DirectoryReader;
 import org.apache.zookeeper.ZooKeeper;
 
 public abstract class TableAdmin implements Iface {
@@ -714,27 +721,10 @@ public abstract class TableAdmin implements Iface {
   }
 
   @Override
-  public void loadData(String table, String location) throws BlurException, TException {
-    TableContext tableContext = getTableContext(table);
-    try {
-      tableContext.loadData(location);
-    } catch (IOException e) {
-      throw new BException(e.getMessage(), e);
-    }
-  }
-
-  @Override
   public void bulkMutateStart(String bulkId) throws BlurException, TException {
     // TODO Start transaction here...
   }
 
-  // @Override
-  // public void bulkMutateFinish(String bulkId, boolean apply, boolean
-  // blockUntilComplete) throws BlurException,
-  // TException {
-  // throw new RuntimeException("Not implemented.");
-  // }
-
   @Override
   public String configurationPerServer(String thriftServerPlusPort, String configName) throws
BlurException, TException {
     if (thriftServerPlusPort == null || thriftServerPlusPort.equals(_nodeName)) {
@@ -756,4 +746,96 @@ public abstract class TableAdmin implements Iface {
     _nodeName = nodeName;
   }
 
+  protected String getCluster(String table) throws BlurException, TException {
+    TableDescriptor describe = describe(table);
+    if (describe == null) {
+      throw new BException("Table [" + table + "] not found.");
+    }
+    return describe.cluster;
+  }
+
+  @Override
+  public void loadIndex(String table, List<String> externalIndexPaths) throws BlurException,
TException {
+    try {
+      if (externalIndexPaths == null || externalIndexPaths.isEmpty()) {
+        return;
+      }
+      String cluster = getCluster(table);
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster,
table);
+      TableContext tableContext = TableContext.create(tableDescriptor);
+      Configuration configuration = tableContext.getConfiguration();
+      Path tablePath = tableContext.getTablePath();
+      FileSystem fileSystem = tablePath.getFileSystem(configuration);
+      for (String externalPath : externalIndexPaths) {
+        Path newLoadShardPath = new Path(externalPath);
+        loadShard(newLoadShardPath, fileSystem, tablePath);
+      }
+    } catch (IOException e) {
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void loadShard(Path newLoadShardPath, FileSystem fileSystem, Path tablePath) throws
IOException {
+    Path shardPath = new Path(tablePath, newLoadShardPath.getName());
+    FileStatus[] listStatus = fileSystem.listStatus(newLoadShardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".commit");
+      }
+    });
+
+    for (FileStatus fileStatus : listStatus) {
+      Path src = fileStatus.getPath();
+      Path dst = new Path(shardPath, src.getName());
+      if (fileSystem.rename(src, dst)) {
+        LOG.info("Successfully moved [{0}] to [{1}].", src, dst);
+      } else {
+        LOG.info("Could not move [{0}] to [{1}].", src, dst);
+        throw new IOException("Could not move [" + src + "] to [" + dst + "].");
+      }
+    }
+  }
+
+  @Override
+  public void validateIndex(String table, List<String> externalIndexPaths) throws BlurException,
TException {
+    try {
+      if (externalIndexPaths == null || externalIndexPaths.isEmpty()) {
+        return;
+      }
+      String cluster = getCluster(table);
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster,
table);
+      TableContext tableContext = TableContext.create(tableDescriptor);
+      Configuration configuration = tableContext.getConfiguration();
+      Path tablePath = tableContext.getTablePath();
+      FileSystem fileSystem = tablePath.getFileSystem(configuration);
+      for (String externalPath : externalIndexPaths) {
+        Path shardPath = new Path(externalPath);
+        validateIndexesExist(shardPath, fileSystem, configuration);
+      }
+    } catch (IOException e) {
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void validateIndexesExist(Path shardPath, FileSystem fileSystem, Configuration
configuration)
+      throws IOException {
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".commit");
+      }
+    });
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      HdfsDirectory directory = new HdfsDirectory(configuration, path);
+      try {
+        if (!DirectoryReader.indexExists(directory)) {
+          throw new IOException("Path [" + path + "] is not a valid index.");
+        }
+      } finally {
+        directory.close();
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d2224777/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
b/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
index bd43b45..73ef9ed 100644
--- a/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
@@ -515,6 +515,17 @@ public class ThriftCacheServerTest {
           TException {
         throw new RuntimeException("Not implemented.");
       }
+
+      @Override
+      public void validateIndex(String table, List<String> externalIndexPaths) throws
BlurException, TException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void loadIndex(String table, List<String> externalIndexPaths) throws BlurException,
TException {
+        throw new RuntimeException("Not implemented.");
+      }
+
     };
   }
 


Mime
View raw message