incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding a new contrib project that will attempt keep index blocks local to the shard servers that are server the indexes.
Date Fri, 01 May 2015 12:56:57 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 32ef7b394 -> 4d934d786


Adding a new contrib project that will attempt keep index blocks local to the shard servers
that are server the indexes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4d934d78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4d934d78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4d934d78

Branch: refs/heads/master
Commit: 4d934d786df2af86c85a077678870dee25328219
Parents: 32ef7b3
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri May 1 08:56:50 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri May 1 08:56:50 2015 -0400

----------------------------------------------------------------------
 contrib/blur-block-placement-policy/pom.xml     |  16 +-
 .../BlurBlockPlacementPolicyDefault.java        | 236 ++++++++++++++++
 .../BlurBlockPlacementStatusDefault.java        |  59 ++++
 .../blockmanagement/DefaultServerLookup.java    | 269 +++++++++++++++++++
 .../server/blockmanagement/ServerLookup.java    |  33 +++
 .../BlurBlockPlacementPolicyDefaultTest.java    | 168 ++++++++++++
 .../DefaultServerLookupTest.java                |  88 ++++++
 .../blockmanagement/TestServerLookup.java       |  79 ++++++
 8 files changed, 937 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d934d78/contrib/blur-block-placement-policy/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/blur-block-placement-policy/pom.xml b/contrib/blur-block-placement-policy/pom.xml
index 42efce4..ceac7fb 100644
--- a/contrib/blur-block-placement-policy/pom.xml
+++ b/contrib/blur-block-placement-policy/pom.xml
@@ -53,17 +53,6 @@
 			<scope>test</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-thrift</artifactId>
-			<version>${projectVersion}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.httpcomponents</groupId>
-					<artifactId>httpclient</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
 			<groupId>log4j</groupId>
 			<artifactId>log4j</artifactId>
 			<version>${log4j.version}</version>
@@ -75,6 +64,11 @@
 			<version>${projectVersion}</version>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-thrift</artifactId>
+			<version>${projectVersion}</version>
+		</dependency>
+		<dependency>
 			<groupId>log4j</groupId>
 			<artifactId>log4j</artifactId>
 			<version>${log4j.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d934d78/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementPolicyDefault.java
b/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementPolicyDefault.java
new file mode 100644
index 0000000..aa5c84d
--- /dev/null
+++ b/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementPolicyDefault.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+
+public class BlurBlockPlacementPolicyDefault extends BlockPlacementPolicyDefault {
+
+  public static final String BLUR_BLOCK_PLACEMENT_SERVER_LOOKUP = "blur.block.placement.server.lookup";
+
+  private static final Log LOG = LogFactory.getLog(BlurBlockPlacementPolicyDefault.class);
+
+  private static final ServerLookup DEFAULT = new ServerLookup(null, null) {
+
+    @Override
+    public boolean isPathSupported(String srcPath) {
+      return false;
+    }
+
+    @Override
+    public String getShardServer(String srcPath) {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public DatanodeDescriptor getDatanodeDescriptor(String shardServer) {
+      throw new RuntimeException("Not implemented.");
+    }
+  };
+
+  private ServerLookup _serverLookup;
+  private Random _random;
+
+  @Override
+  public void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap,
+      Host2NodesMap host2datanodeMap) {
+    LOG.info("initialize");
+    super.initialize(conf, stats, clusterMap, host2datanodeMap);
+    _random = new Random();
+    Class<?> c = conf.getClass(BLUR_BLOCK_PLACEMENT_SERVER_LOOKUP, DefaultServerLookup.class);
+    if (host2datanodeMap == null) {
+      _serverLookup = DEFAULT;
+    } else {
+      try {
+        Constructor<?> constructor = c.getConstructor(new Class[] { Configuration.class,
Host2NodesMap.class });
+        _serverLookup = (ServerLookup) constructor.newInstance(conf, host2datanodeMap);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas, Node writer,
+      List<DatanodeStorageInfo> chosenNodes, boolean returnChosenNodes, Set<Node>
excludedNodes, long blocksize,
+      BlockStoragePolicy storagePolicy) {
+    LOG.info("chooseTarget");
+    if (_serverLookup.isPathSupported(srcPath)) {
+      String shardServer = _serverLookup.getShardServer(srcPath);
+      if (shardServer == null) {
+        return super.chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes,
+            blocksize, storagePolicy);
+      }
+      DatanodeDescriptor shardServerDatanodeDescriptor = _serverLookup.getDatanodeDescriptor(shardServer);
+      if (shardServerDatanodeDescriptor == null) {
+        return super.chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes,
+            blocksize, storagePolicy);
+      }
+      if (isAlreadyChosen(chosenNodes, shardServerDatanodeDescriptor)) {
+        return super.chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes,
+            blocksize, storagePolicy);
+      } else {
+        DatanodeStorageInfo[] shardServerStorageInfos = shardServerDatanodeDescriptor.getStorageInfos();
+        if (shardServerStorageInfos == null || shardServerStorageInfos.length == 0) {
+          return super.chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes,
+              blocksize, storagePolicy);
+        }
+        DatanodeStorageInfo shardServerStorageInfo = choseOne(shardServerStorageInfos);
+        if (numOfReplicas - 1 == 0) {
+          if (returnChosenNodes) {
+            List<DatanodeStorageInfo> copy = new ArrayList<DatanodeStorageInfo>(chosenNodes);
+            copy.add(shardServerStorageInfo);
+            return copy.toArray(new DatanodeStorageInfo[copy.size()]);
+          } else {
+            return new DatanodeStorageInfo[] { shardServerStorageInfo };
+          }
+        }
+        if (chosenNodes == null) {
+          chosenNodes = new ArrayList<DatanodeStorageInfo>();
+        }
+        chosenNodes.add(shardServerStorageInfo);
+        if (returnChosenNodes) {
+          return super.chooseTarget(srcPath, numOfReplicas - 1, writer, chosenNodes, returnChosenNodes,
excludedNodes,
+              blocksize, storagePolicy);
+        } else {
+          DatanodeStorageInfo[] datanodeStorageInfos = super.chooseTarget(srcPath, numOfReplicas
- 1, writer,
+              chosenNodes, returnChosenNodes, excludedNodes, blocksize, storagePolicy);
+          DatanodeStorageInfo[] result = new DatanodeStorageInfo[datanodeStorageInfos.length
+ 1];
+          System.arraycopy(datanodeStorageInfos, 0, result, 1, datanodeStorageInfos.length);
+          result[0] = shardServerStorageInfo;
+          return result;
+        }
+      }
+    } else {
+      return super.chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes,
+          blocksize, storagePolicy);
+    }
+  }
+
+  @Override
+  public BlockPlacementStatus verifyBlockPlacement(String srcPath, LocatedBlock lBlk, int
numberOfReplicas) {
+    LOG.info("verifyBlockPlacement");
+    if (_serverLookup.isPathSupported(srcPath)) {
+      String shardServer = _serverLookup.getShardServer(srcPath);
+      if (shardServer != null) {
+        return super.verifyBlockPlacement(srcPath, lBlk, numberOfReplicas);
+      }
+      DatanodeDescriptor shardServerDatanodeDescriptor = _serverLookup.getDatanodeDescriptor(shardServer);
+      String shardServerDatanodeUuid = shardServerDatanodeDescriptor.getDatanodeUuid();
+      DatanodeInfo[] locations = lBlk.getLocations();
+      for (DatanodeInfo info : locations) {
+        String datanodeUuid = info.getDatanodeUuid();
+        if (shardServerDatanodeUuid.equals(datanodeUuid)) {
+          // then one of the locations is on the shard server.
+          return super.verifyBlockPlacement(srcPath, lBlk, numberOfReplicas);
+        }
+      }
+      // none of the replicas are on a shard server.
+      BlockPlacementStatus blockPlacementStatus = super.verifyBlockPlacement(srcPath, lBlk,
numberOfReplicas);
+      if (blockPlacementStatus.isPlacementPolicySatisfied()) {
+        // default block placement is satisfied, but we are not.
+        return new BlurBlockPlacementStatusDefault(blockPlacementStatus, shardServer);
+      } else {
+        // both are unsatisfied
+        return new BlurBlockPlacementStatusDefault(blockPlacementStatus, shardServer);
+      }
+    } else {
+      return super.verifyBlockPlacement(srcPath, lBlk, numberOfReplicas);
+    }
+  }
+
+  @Override
+  public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc, Block block, short
replicationFactor,
+      Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo>
second, List<StorageType> excessTypes) {
+    LOG.info("chooseReplicaToDelete rep [" + replicationFactor + "]");
+
+    String path = bc.getName();
+
+    String shardServer = _serverLookup.getShardServer(path);
+    if (shardServer == null) {
+      return super.chooseReplicaToDelete(bc, block, replicationFactor, first, second, excessTypes);
+    }
+    DatanodeDescriptor shardServerDatanodeDescriptor = _serverLookup.getDatanodeDescriptor(shardServer);
+
+    if (replicationFactor > 1) {
+      Collection<DatanodeStorageInfo> firstCopy = new ArrayList<DatanodeStorageInfo>();
+      for (DatanodeStorageInfo info : first) {
+        DatanodeDescriptor datanodeDescriptor = info.getDatanodeDescriptor();
+        if (!datanodeDescriptor.equals(shardServerDatanodeDescriptor)) {
+          firstCopy.add(info);
+        }
+      }
+
+      Collection<DatanodeStorageInfo> secondCopy = new ArrayList<DatanodeStorageInfo>();
+      for (DatanodeStorageInfo info : second) {
+        DatanodeDescriptor datanodeDescriptor = info.getDatanodeDescriptor();
+        if (!datanodeDescriptor.equals(shardServerDatanodeDescriptor)) {
+          secondCopy.add(info);
+        }
+      }
+
+      return super.chooseReplicaToDelete(bc, block, replicationFactor, firstCopy, secondCopy,
excessTypes);
+    } else {
+      for (DatanodeStorageInfo info : first) {
+        DatanodeDescriptor datanodeDescriptor = info.getDatanodeDescriptor();
+        if (!datanodeDescriptor.equals(shardServerDatanodeDescriptor)) {
+          return info;
+        }
+      }
+
+      for (DatanodeStorageInfo info : second) {
+        DatanodeDescriptor datanodeDescriptor = info.getDatanodeDescriptor();
+        if (!datanodeDescriptor.equals(shardServerDatanodeDescriptor)) {
+          return info;
+        }
+      }
+      throw new RuntimeException("Should never happen!!!");
+    }
+  }
+
+  private DatanodeStorageInfo choseOne(DatanodeStorageInfo[] storageInfos) {
+    synchronized (_random) {
+      return storageInfos[_random.nextInt(storageInfos.length)];
+    }
+  }
+
+  private boolean isAlreadyChosen(List<DatanodeStorageInfo> chosenNodes, DatanodeDescriptor
datanodeDescriptor) {
+    for (DatanodeStorageInfo info : chosenNodes) {
+      if (info.getDatanodeDescriptor().equals(datanodeDescriptor)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d934d78/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementStatusDefault.java
----------------------------------------------------------------------
diff --git a/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementStatusDefault.java
b/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementStatusDefault.java
new file mode 100644
index 0000000..1bf015d
--- /dev/null
+++ b/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementStatusDefault.java
@@ -0,0 +1,59 @@
+package org.apache.hadoop.hdfs.server.blockmanagement;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
+
+public class BlurBlockPlacementStatusDefault implements BlockPlacementStatus {
+
+  private final BlockPlacementStatus _original;
+  private final String _shardServer;
+  private final boolean _origPlacementPolicySatisfied;
+
+  public BlurBlockPlacementStatusDefault(BlockPlacementStatus original, String shardServer)
{
+    _original = original;
+    _shardServer = shardServer;
+    if (_original != null) {
+      _origPlacementPolicySatisfied = _original.isPlacementPolicySatisfied();
+    } else {
+      _origPlacementPolicySatisfied = true;
+    }
+  }
+
+  @Override
+  public boolean isPlacementPolicySatisfied() {
+    if (_shardServer == null) {
+      return _origPlacementPolicySatisfied;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public String getErrorDescription() {
+    if (isPlacementPolicySatisfied()) {
+      return null;
+    }
+    String message = "Block should located on shard server [" + _shardServer + "] for best
performance";
+    if (_origPlacementPolicySatisfied) {
+      return message + ".";
+    } else {
+      return message + " AND " + _original.getErrorDescription() + ".";
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d934d78/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DefaultServerLookup.java
----------------------------------------------------------------------
diff --git a/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DefaultServerLookup.java
b/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DefaultServerLookup.java
new file mode 100644
index 0000000..39d3de7
--- /dev/null
+++ b/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DefaultServerLookup.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.ShardState;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class DefaultServerLookup extends ServerLookup {
+
+  public static final String BLUR_ZK_CONNECTIONS = "blur.zk.connections";
+
+  private static final String SHARD_PATTERN = "/" + BlurConstants.SHARD_PREFIX;
+
+  private static Log LOG = LogFactory.getLog(DefaultServerLookup.class);
+
+  private final Host2NodesMap _host2datanodeMap;
+  private final Map<String, Iface> _clients = new ConcurrentHashMap<String, Iface>();
+  private final Map<Path, ShardTableInfo> _pathToShardMapping = new ConcurrentHashMap<Path,
ShardTableInfo>();
+  private final String _baseUri;
+  private final List<Thread> _daemons = new ArrayList<Thread>();
+  private final AtomicBoolean _running = new AtomicBoolean();
+  private final long _pollTime;
+
+  static class ShardTableInfo {
+    final String tableName;
+    final String shardServer;
+    final String zkConnectionStr;
+
+    ShardTableInfo(String tableName, String shardServer, String zkConnectionStr) {
+      this.tableName = tableName;
+      this.shardServer = shardServer;
+      this.zkConnectionStr = zkConnectionStr;
+    }
+
+    @Override
+    public String toString() {
+      return "tableName=" + tableName + ", shardServer=" + shardServer + ", zkConnectionStr="
+ zkConnectionStr + "";
+    }
+  }
+
+  public DefaultServerLookup(Configuration conf, Host2NodesMap host2datanodeMap) {
+    super(conf, host2datanodeMap);
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+      @Override
+      public void run() {
+        _running.set(false);
+      }
+    }));
+    _running.set(true);
+    _pollTime = 1000;
+    try {
+      FileSystem fileSystem = FileSystem.get(conf);
+      _baseUri = fileSystem.getUri().toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    _host2datanodeMap = host2datanodeMap;
+    String[] cons = conf.getStrings(BLUR_ZK_CONNECTIONS);
+    if (cons != null) {
+      for (String con : cons) {
+        LOG.info("Blur client setup for [" + con + "]");
+        _clients.put(con, BlurClient.getClientFromZooKeeperConnectionStr(con));
+        startWatchDaemon(con);
+        startCleanupDaemon(con);
+      }
+    }
+  }
+
+  private void startCleanupDaemon(final String con) {
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        while (_running.get()) {
+          try {
+            runLayoutCleanup(con);
+          } catch (BlurException e) {
+            LOG.error("Unknown error.", e);
+          } catch (TException e) {
+            LOG.error("Unknown error.", e);
+          }
+          try {
+            Thread.sleep(_pollTime);
+          } catch (InterruptedException e) {
+            LOG.error("Unknown interruption.", e);
+            return;
+          }
+        }
+      }
+    };
+    Thread thread = new Thread(runnable);
+    thread.setDaemon(true);
+    thread.setName("Blur Cleanup for [" + con + "]");
+    thread.start();
+    _daemons.add(thread);
+  }
+
+  private void startWatchDaemon(final String con) {
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        while (_running.get()) {
+          try {
+            runLayoutUpdate(con);
+          } catch (BlurException e) {
+            LOG.error("Unknown error.", e);
+          } catch (TException e) {
+            LOG.error("Unknown error.", e);
+          }
+          try {
+            Thread.sleep(_pollTime);
+          } catch (InterruptedException e) {
+            LOG.error("Unknown interruption.", e);
+            return;
+          }
+        }
+      }
+    };
+    Thread thread = new Thread(runnable);
+    thread.setDaemon(true);
+    thread.setName("Blur Watch for [" + con + "]");
+    thread.start();
+    _daemons.add(thread);
+  }
+
+  protected void runLayoutCleanup(String zkConnectionStr) throws BlurException, TException
{
+    Iface iface = _clients.get(zkConnectionStr);
+    List<String> tableList = iface.tableList();
+    Collection<String> enabledTables = new HashSet<String>();
+    for (String table : tableList) {
+      TableDescriptor tableDescriptor = iface.describe(table);
+      String name = tableDescriptor.getName();
+      if (tableDescriptor.isEnabled()) {
+        enabledTables.add(name);
+      }
+    }
+    cleanUp(enabledTables);
+  }
+
+  private void cleanUp(Collection<String> enabledTables) {
+    Set<Entry<Path, ShardTableInfo>> entrySet = _pathToShardMapping.entrySet();
+    Iterator<Entry<Path, ShardTableInfo>> iterator = entrySet.iterator();
+    while (iterator.hasNext()) {
+      Entry<Path, ShardTableInfo> entry = iterator.next();
+      ShardTableInfo shardTableInfo = entry.getValue();
+      String tableName = shardTableInfo.tableName;
+      if (!enabledTables.contains(tableName)) {
+        LOG.info("Removing ShardTableInfo [" + shardTableInfo + "]");
+        iterator.remove();
+      }
+    }
+  }
+
+  private void runLayoutUpdate(String zkConnectionStr) throws BlurException, TException {
+    Iface iface = _clients.get(zkConnectionStr);
+    List<String> tableList = iface.tableList();
+    for (String table : tableList) {
+      TableDescriptor tableDescriptor = iface.describe(table);
+      if (!tableDescriptor.isEnabled()) {
+        continue;
+      }
+      String tableUri = tableDescriptor.getTableUri();
+      Path tablePath = new Path(tableUri);
+      String name = tableDescriptor.getName();
+      Map<String, Map<String, ShardState>> shardServerLayoutState = iface.shardServerLayoutState(name);
+      for (Entry<String, Map<String, ShardState>> entry : shardServerLayoutState.entrySet())
{
+        String shardId = entry.getKey();
+        Path shardPath = new Path(tablePath, shardId);
+        Map<String, ShardState> serverStateMap = entry.getValue();
+        String shardServer = getShardServer(serverStateMap);
+        if (shardServer != null) {
+          String shardServerHostName = getHostName(shardServer);
+          ShardTableInfo shardTableInfo = new ShardTableInfo(name, shardServerHostName, zkConnectionStr);
+          LOG.info("Adding mapping [" + shardPath + "] to ShardTableInfo [" + shardTableInfo
+ "]");
+          _pathToShardMapping.put(shardPath, shardTableInfo);
+        }
+      }
+    }
+  }
+
+  private String getHostName(String shardServer) {
+    int indexOf = shardServer.indexOf(':');
+    if (indexOf < 0) {
+      return shardServer;
+    }
+    return shardServer.substring(0, indexOf);
+  }
+
+  private String getShardServer(Map<String, ShardState> serverStateMap) {
+    for (Entry<String, ShardState> entry : serverStateMap.entrySet()) {
+      String server = entry.getKey();
+      ShardState shardState = entry.getValue();
+      if (shardState == ShardState.OPEN) {
+        return server;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String getShardServer(String srcPath) {
+    int indexOf = srcPath.indexOf(SHARD_PATTERN);
+    if (indexOf < 0) {
+      return null;
+    }
+    int end = srcPath.indexOf('/', indexOf + 1);
+    if (end < 0) {
+      return null;
+    } else {
+      Path shardPath = new Path(_baseUri + srcPath.substring(0, end));
+      ShardTableInfo shardTableInfo = _pathToShardMapping.get(shardPath);
+      LOG.info("Path [" + srcPath + "] Resolved to [" + shardPath.toString() + "] on ShardTableInfo
[" + shardTableInfo
+          + "]");
+      if (shardTableInfo == null) {
+        return null;
+      }
+      return shardTableInfo.shardServer;
+    }
+  }
+
+  @Override
+  public boolean isPathSupported(String srcPath) {
+    if (!srcPath.contains(SHARD_PATTERN)) {
+      return false;
+    }
+    return getShardServer(srcPath) != null;
+  }
+
+  @Override
+  public DatanodeDescriptor getDatanodeDescriptor(String shardServer) {
+    return _host2datanodeMap.getDataNodeByHostName(shardServer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d934d78/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ServerLookup.java
----------------------------------------------------------------------
diff --git a/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ServerLookup.java
b/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ServerLookup.java
new file mode 100644
index 0000000..8a3cb2c
--- /dev/null
+++ b/contrib/blur-block-placement-policy/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ServerLookup.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+
+public abstract class ServerLookup {
+
+  public ServerLookup(Configuration conf, Host2NodesMap host2datanodeMap) {
+
+  }
+
+  public abstract boolean isPathSupported(String srcPath);
+
+  public abstract String getShardServer(String srcPath);
+
+  public abstract DatanodeDescriptor getDatanodeDescriptor(String shardServer);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d934d78/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementPolicyDefaultTest.java
----------------------------------------------------------------------
diff --git a/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementPolicyDefaultTest.java
b/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementPolicyDefaultTest.java
new file mode 100644
index 0000000..2aea340
--- /dev/null
+++ b/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlurBlockPlacementPolicyDefaultTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.blur.MiniCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurBlockPlacementPolicyDefaultTest {
+
+  private static final String DFS_BLOCKREPORT_INTERVAL_MSEC = "dfs.blockreport.intervalMsec";
+  private static final String DFS_BLOCK_REPLICATOR_CLASSNAME = "dfs.block.replicator.classname";
+  private static MiniCluster _miniCluster;
+
+  @BeforeClass
+  public static void setupClass() {
+    _miniCluster = new MiniCluster();
+    Configuration conf = new Configuration();
+    conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME, BlurBlockPlacementPolicyDefault.class.getName());
+    conf.set(BlurBlockPlacementPolicyDefault.BLUR_BLOCK_PLACEMENT_SERVER_LOOKUP, TestServerLookup.class.getName());
+    conf.setLong(DFS_BLOCKREPORT_INTERVAL_MSEC, 10 * 1000);
+    boolean format = true;
+    String path = "./target/tmp/BlurBlockPlacementPolicyDefaultTest";
+    String[] racks = new String[] { "/r1", "/r1", "/r2", "/r2", "/r3", "/r3" };
+    _miniCluster.startDfs(conf, format, path, racks);
+  }
+
+  @AfterClass
+  public static void teardownClass() {
+    _miniCluster.shutdownDfs();
+  }
+
+  @Test
+  public void test1() throws IOException, InterruptedException {
+    FileSystem fileSystem = _miniCluster.getFileSystem();
+    String rootStr = fileSystem.getUri().toString();
+    Path root = new Path(rootStr + "/");
+    fileSystem.mkdirs(new Path(root, "/test/table/shard-00000000"));
+    Path p = writeFile(fileSystem, "/test/table/shard-00000000/test1");
+    String shardServer = "host4.foo.com";
+    assertBlocksExistOnShardServer(fileSystem, p, shardServer);
+    setReplication(fileSystem, p, 4);
+    assertBlocksExistOnShardServer(fileSystem, p, shardServer);
+    setReplication(fileSystem, p, 5);
+    assertBlocksExistOnShardServer(fileSystem, p, shardServer);
+    setReplication(fileSystem, p, 1);
+    assertBlocksExistOnShardServer(fileSystem, p, shardServer);
+  }
+
+  @Test
+  public void test2() throws IOException, InterruptedException {
+    FileSystem fileSystem = _miniCluster.getFileSystem();
+    String rootStr = fileSystem.getUri().toString();
+    Path root = new Path(rootStr + "/");
+    fileSystem.mkdirs(new Path(root, "/test/table/shard-00000000"));
+
+    String shardServer = "host4.foo.com";
+    Path p = writeFileNotOnShardServer(fileSystem, "/testfile", shardServer);
+    Path dst = new Path(root, "/test/table/shard-00000000/test2");
+    fileSystem.rename(p, dst);
+    p = dst;
+
+    setReplication(fileSystem, p, 2);
+
+    assertBlocksExistOnShardServer(fileSystem, p, shardServer);
+    setReplication(fileSystem, p, 4);
+    assertBlocksExistOnShardServer(fileSystem, p, shardServer);
+    setReplication(fileSystem, p, 5);
+    assertBlocksExistOnShardServer(fileSystem, p, shardServer);
+    setReplication(fileSystem, p, 1);
+    assertBlocksExistOnShardServer(fileSystem, p, shardServer);
+  }
+
+  private Path writeFileNotOnShardServer(FileSystem fileSystem, String path, String shardServer)
throws IOException {
+    String rootStr = fileSystem.getUri().toString();
+    Path p = new Path(rootStr + path);
+    boolean fail = true;
+    OUTER: while (fail) {
+      fail = false;
+      FSDataOutputStream outputStream = fileSystem.create(p, (short) 1);
+      byte[] buf = new byte[1000];
+      for (int i = 0; i < 1000; i++) {
+        outputStream.write(buf);
+      }
+      outputStream.close();
+      FileStatus fileStatus = fileSystem.getFileStatus(p);
+      BlockLocation[] blockLocations = fileSystem.getFileBlockLocations(p, 0, fileStatus.getLen());
+      for (BlockLocation blockLocation : blockLocations) {
+        fail = Arrays.asList(blockLocation.getHosts()).contains(shardServer);
+        if (fail) {
+          continue OUTER;
+        }
+      }
+    }
+    return p;
+  }
+
+  private void setReplication(FileSystem fileSystem, Path p, int rep) throws IOException,
InterruptedException {
+    fileSystem.setReplication(p, (short) rep);
+    waitForReplication(fileSystem, p, rep);
+  }
+
+  private void waitForReplication(FileSystem fileSystem, Path p, int replicas) throws IOException,
InterruptedException {
+    FileStatus fileStatus = fileSystem.getFileStatus(p);
+    boolean fail = true;
+    while (fail) {
+      fail = false;
+      BlockLocation[] blockLocations = fileSystem.getFileBlockLocations(p, 0, fileStatus.getLen());
+      for (BlockLocation blockLocation : blockLocations) {
+        System.out.println(blockLocation);
+        String[] hosts = blockLocation.getHosts();
+        if (hosts.length != replicas) {
+          fail = true;
+        }
+      }
+      Thread.sleep(1000);
+    }
+  }
+
+  private void assertBlocksExistOnShardServer(FileSystem fileSystem, Path p, String shardServer)
throws IOException {
+    FileStatus fileStatus = fileSystem.getFileStatus(p);
+    BlockLocation[] blockLocations = fileSystem.getFileBlockLocations(p, 0, fileStatus.getLen());
+    for (BlockLocation blockLocation : blockLocations) {
+      System.out.println(blockLocation);
+      String[] hosts = blockLocation.getHosts();
+      assertTrue(Arrays.asList(hosts).contains(shardServer));
+    }
+  }
+
+  private Path writeFile(FileSystem fileSystem, String path) throws IOException {
+    String rootStr = fileSystem.getUri().toString();
+    Path p = new Path(rootStr + path);
+    FSDataOutputStream outputStream = fileSystem.create(p);
+    byte[] buf = new byte[1000];
+    for (int i = 0; i < 1000; i++) {
+      outputStream.write(buf);
+    }
+    outputStream.close();
+    return p;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d934d78/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/DefaultServerLookupTest.java
----------------------------------------------------------------------
diff --git a/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/DefaultServerLookupTest.java
b/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/DefaultServerLookupTest.java
new file mode 100644
index 0000000..4a34c4d
--- /dev/null
+++ b/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/DefaultServerLookupTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class DefaultServerLookupTest {
+
+  private static MiniCluster _miniCluster;
+
+  @BeforeClass
+  public static void setupClass() {
+    _miniCluster = new MiniCluster();
+    _miniCluster.startBlurCluster("./target/tmp/DefaultServerLookupTest", 1, 2, true);
+  }
+
+  @AfterClass
+  public static void teardownClass() {
+    _miniCluster.shutdownBlurCluster();
+  }
+
+  @Test
+  public void test() throws BlurException, TException, IOException, InterruptedException
{
+    String zkConnectionString = _miniCluster.getZkConnectionString();
+    Configuration configuration = new Configuration(_miniCluster.getConfiguration());
+    configuration.set(DefaultServerLookup.BLUR_ZK_CONNECTIONS, zkConnectionString);
+    DefaultServerLookup defaultServerLookup = new DefaultServerLookup(configuration, null);
+    Thread.sleep(1000);
+    assertFalse(defaultServerLookup.isPathSupported("/test"));
+
+    String tableUri = _miniCluster.getFileSystemUri().toString() + "/blur/tables/test";
+
+    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(zkConnectionString);
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName("test");
+    tableDescriptor.setShardCount(3);
+    tableDescriptor.setTableUri(tableUri);
+    client.createTable(tableDescriptor);
+
+    FileSystem fileSystem = _miniCluster.getFileSystem();
+    Path shardPath = new Path(tableUri, "shard-00000000");
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
+    for (FileStatus fileStatus : listStatus) {
+      System.out.println(fileStatus.getPath());
+    }
+    Path filePath = new Path(shardPath, "write.lock");
+
+    assertTrue(defaultServerLookup.isPathSupported(filePath.toUri().getPath()));
+
+    client.disableTable(tableDescriptor.getName());
+
+    Thread.sleep(15000);
+
+    assertFalse(defaultServerLookup.isPathSupported(filePath.toUri().getPath()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d934d78/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestServerLookup.java
----------------------------------------------------------------------
diff --git a/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestServerLookup.java
b/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestServerLookup.java
new file mode 100644
index 0000000..5d26312
--- /dev/null
+++ b/contrib/blur-block-placement-policy/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestServerLookup.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class TestServerLookup extends ServerLookup {
+
+  private final Map<String, DatanodeDescriptor[]> _map;
+  private final ReadWriteLock _hostmapLock;
+
+  @SuppressWarnings("unchecked")
+  public TestServerLookup(Configuration conf, Host2NodesMap host2datanodeMap) {
+    super(conf, host2datanodeMap);
+    Class<? extends Host2NodesMap> c = host2datanodeMap.getClass();
+    try {
+      {
+        Field declaredField = c.getDeclaredField("map");
+        declaredField.setAccessible(true);
+        _map = (Map<String, DatanodeDescriptor[]>) declaredField.get(host2datanodeMap);
+      }
+      {
+        Field declaredField = c.getDeclaredField("hostmapLock");
+        declaredField.setAccessible(true);
+        _hostmapLock = (ReadWriteLock) declaredField.get(host2datanodeMap);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean isPathSupported(String srcPath) {
+    return srcPath.contains("/shard-");
+  }
+
+  @Override
+  public String getShardServer(String srcPath) {
+    return "host4.foo.com";
+  }
+
+  @Override
+  public DatanodeDescriptor getDatanodeDescriptor(String shardServer) {
+    _hostmapLock.writeLock().lock();
+    try {
+      Collection<DatanodeDescriptor[]> values = _map.values();
+      for (DatanodeDescriptor[] array : values) {
+        for (DatanodeDescriptor datanodeDescriptor : array) {
+          if (shardServer.equals(datanodeDescriptor.getHostName())) {
+            return datanodeDescriptor;
+          }
+        }
+      }
+      return null;
+    } finally {
+      _hostmapLock.writeLock().unlock();
+    }
+  }
+
+}


Mime
View raw message