accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [54/61] [abbrv] accumulo git commit: ACCUMULO-722: move DistributedNameNodeProxy behind ZookeeeperNameNode, remove SwitchingNameNode, implement abandonBlock, support incomplete file reading
Date Thu, 03 Mar 2016 22:00:19 GMT
ACCUMULO-722: move DistributedNameNodeProxy behind ZookeeeperNameNode, remove SwitchingNameNode, implement abandonBlock, support incomplete file reading

git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/ACCUMULO-722@1378547 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4efec671
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4efec671
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4efec671

Branch: refs/heads/ACCUMULO-722
Commit: 4efec67192c63e2149a9caf818682b0b8c4f8036
Parents: 76337f2
Author: Eric C. Newton <ecn@apache.org>
Authored: Wed Aug 29 13:37:54 2012 +0000
Committer: Eric C. Newton <ecn@apache.org>
Committed: Wed Aug 29 13:37:54 2012 +0000

----------------------------------------------------------------------
 distnn/pom.xml                                  |  54 +-
 .../org/apache/hadoop/hdfs/DNNConstants.java    |   8 +-
 .../org/apache/hadoop/hdfs/DNNFileSystem.java   |  25 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |   5 +-
 .../namenode/DistributedNamenodeProxy.java      | 614 ++++++++++---------
 .../hdfs/server/namenode/SwitchingNameNode.java | 145 -----
 .../hdfs/server/namenode/ZookeeperNameNode.java | 376 ++++++++----
 7 files changed, 638 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4efec671/distnn/pom.xml
----------------------------------------------------------------------
diff --git a/distnn/pom.xml b/distnn/pom.xml
index f68f36b..9f7cf0e 100644
--- a/distnn/pom.xml
+++ b/distnn/pom.xml
@@ -4,6 +4,25 @@
   <artifactId>distnn</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>distnn</name>
+
+	<profiles>
+		<profile>
+			<id>accumulo-1.4</id>
+			<properties>
+				<accumulo.version>1.4.1</accumulo.version>
+			</properties>
+		</profile>
+		<profile>
+			<id>accumulo-1.5</id>
+			<activation>
+				<activeByDefault>true</activeByDefault>
+			</activation>
+			<properties>
+				<accumulo.version>1.5.0-SNAPSHOT</accumulo.version>
+			</properties>
+		</profile>
+	</profiles>
+  
   <dependencies>
   	<dependency>
   		<groupId>org.apache.hadoop</groupId>
@@ -11,24 +30,45 @@
   		<version>1.0.3</version>
   	</dependency>
   	<dependency>
-  		<groupId>org.apache.hadoop</groupId>
-  		<artifactId>hadoop-common</artifactId>
-  		<version>2.0.0-alpha</version>
-  	</dependency>
-  	<dependency>
   		<groupId>org.apache.accumulo</groupId>
   		<artifactId>accumulo-core</artifactId>
-  		<version>1.4.1</version>
+  		<version>${accumulo.version}</version>
   	</dependency>
   	<dependency>
   		<groupId>org.apache.accumulo</groupId>
   		<artifactId>accumulo-server</artifactId>
-  		<version>1.4.1</version>
+  		<version>${accumulo.version}</version>
   	</dependency>
   	<dependency>
   		<groupId>com.netflix.curator</groupId>
   		<artifactId>curator-framework</artifactId>
   		<version>1.1.15</version>
   	</dependency>
+  	<dependency>
+  		<groupId>com.google.guava</groupId>
+  		<artifactId>guava</artifactId>
+  		<version>12.0.1</version>
+  	</dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-core</artifactId>
+        <version>1.0.3</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>zookeeper</artifactId>
+        <version>3.3.1</version>
+      </dependency>
+      <dependency>
+        <groupId>javax.servlet</groupId>
+        <artifactId>servlet-api</artifactId>
+        <version>2.4</version>
+      </dependency>
+      <dependency>
+        <groupId>org.mortbay.jetty</groupId>
+        <artifactId>jetty</artifactId>
+        <version>[5.1,)</version>
+      </dependency>
+
   </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4efec671/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
----------------------------------------------------------------------
diff --git a/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java b/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
index bfb15e6..658e44b 100644
--- a/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
+++ b/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
@@ -1,10 +1,8 @@
 package org.apache.hadoop.hdfs;
 
 public class DNNConstants {
-
   public static final String DNN = "/dnn";
-  public static final String DATANODES_PATH = "/datanodes";
-  public static final String NAMESPACE_PATH = "/namespace";
-  public static final String BLOCKS_PATH = "/blocks";
-  
+  public static final String DATANODES_PATH = DNN + "/datanodes";
+  public static final String NAMESPACE_PATH = DNN + "/namespace";
+  public static final String BLOCKS_PATH = DNN + "/blocks";
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4efec671/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
----------------------------------------------------------------------
diff --git a/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java b/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
index aa7c042..2bbf1db 100644
--- a/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
+++ b/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
@@ -4,7 +4,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -27,11 +26,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
-import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy;
-import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy.ConnectInfo;
 import org.apache.hadoop.hdfs.server.namenode.FakeNameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.SwitchingNameNode;
 import org.apache.hadoop.hdfs.server.namenode.ZookeeperNameNode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessControlException;
@@ -40,11 +36,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.CuratorFrameworkFactory.Builder;
-import com.netflix.curator.retry.RetryUntilElapsed;
-
 // Basically a copy of DistributedFileSystem providing a different NN client implementation
 public class DNNFileSystem extends FileSystem {
   private static Logger log = Logger.getLogger(DNNFileSystem.class);
@@ -71,22 +62,14 @@ public class DNNFileSystem extends FileSystem {
   public void initialize(URI uri, Configuration conf) throws IOException {
     super.initialize(uri, conf);
     setConf(conf);
-    ConnectInfo info = new ConnectInfo(uri);
-    FakeNameNode fakefakefake = null;
+    FakeNameNode fake = null;
     try {
-      Builder builder = CuratorFrameworkFactory.builder().namespace(DNNConstants.DNN);
-      builder.connectString(info.zookeepers);
-      builder.retryPolicy(new RetryUntilElapsed(120*1000, 500));
-      //builder.aclProvider(aclProvider);
-      CuratorFramework client = builder.build();
-      client.start();
-      ZookeeperNameNode zoo = new ZookeeperNameNode(client);
-      fakefakefake = SwitchingNameNode.create(zoo, info);
+      fake = new ZookeeperNameNode(conf, uri);
     } catch (Exception ex) {
       throw new IOException(ex);
     }
-    log.info("Creating DFSClient with fake name node " + fakefakefake);
-    this.dfs = new DFSClient(null, fakefakefake, conf, statistics);
+    log.info("Creating DFSClient with fake name node " + fake);
+    this.dfs = new DFSClient(null, fake, conf, statistics);
     this.uri = uri;
     this.workingDir = getHomeDirectory();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4efec671/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 5f274c6..22ba507 100644
--- a/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -85,13 +85,12 @@ import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeInstrumentation;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
-import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
 import org.apache.hadoop.hdfs.server.namenode.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
-import org.apache.hadoop.hdfs.server.namenode.SwitchingNameNode;
+import org.apache.hadoop.hdfs.server.namenode.ZookeeperNameNode;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
@@ -360,7 +359,7 @@ public class DataNode extends Configured
 //                       DatanodeProtocol.versionID,
 //                       nameNodeAddr, 
 //                       conf);
-    this.namenode =  SwitchingNameNode.create(conf);
+    this.namenode =  new ZookeeperNameNode(conf);
     // get version and id info from the name-node
     NamespaceInfo nsInfo = handshake();
     StartupOption startOpt = getStartupOption(conf);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4efec671/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
----------------------------------------------------------------------
diff --git a/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java b/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
index a17c0ca..71a85d3 100644
--- a/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
+++ b/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
@@ -4,14 +4,10 @@
  *  add hierarchical locking
  * 	add support for permissions
  * 	add support for leasing
- * 	add support for block generations?
  * 
  * finish namenode actions:
- * 	TEST::implement delete
- * 	TEST::implement mkdirs
  * 
  * finish datanode actions:
- * 	error reporting
  * 
  * 
  * 
@@ -41,6 +37,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -49,8 +46,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchScanner;
@@ -69,10 +71,9 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DNNConstants;
@@ -97,18 +98,17 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import com.netflix.curator.framework.CuratorFramework;
 
 public class DistributedNamenodeProxy implements FakeNameNode {
+  Executor executor = Executors.newSingleThreadExecutor();
+
   public static class ConnectInfo {
     public ConnectInfo(URI uri) {
       String userInfo = uri.getUserInfo();
@@ -138,55 +138,37 @@ public class DistributedNamenodeProxy implements FakeNameNode {
 
   private static Logger log = Logger.getLogger(DistributedNamenodeProxy.class);
   
-  long start = System.currentTimeMillis();
-
   private class Replicator {
     
-    // TODO: make this respect configured replication settings
-    
-    private HashSet<String> targets;
+    private HashSet<DatanodeInfo> targets;
     
     Replicator() {
-      targets = new HashSet<String>();
+      targets = new HashSet<DatanodeInfo>();
     }
     
-    DatanodeInfo[] getReplicationTargets() throws IOException {
+    DatanodeInfo[] getReplicationTargets(int replicationFactor) throws IOException {
       
       // TODO: periodically scan the datanodes table to find new datanodes
-      if(targets.size() == 0)
+      while (targets.size() == 0) {
         scanDatanodes();
+        if (targets.size() > 0)
+          break;
+        UtilWaitThread.sleep(250);
+      }
       
+      List<DatanodeInfo> targetsCopy = new ArrayList<DatanodeInfo>();
+      synchronized (targets) {
+        targetsCopy.addAll(targets);
+      }
       // pick nodes at random
       // TODO: take into account whether a datanode is too full to host another block
-      // the old namenode would also have a hard limit on the total number
-      // of fs objects it could store 
-      int replicationFactor = 1;
+      Collections.shuffle(targetsCopy);
       
-      if(targets.size() < replicationFactor)
+      if(targetsCopy.size() < 1)
         throw new IOException("unable to achieve required replication: too few datanodes running");
       
-      HashSet<String> targetSetNames = new HashSet<String>();
-      
-      HashSet<DatanodeInfo> targetSet = new HashSet<DatanodeInfo>();
-      for(int i=0; i < replicationFactor; i++) {
-        int r = rand.nextInt(targets.size());
-        
-        Iterator<String> iter = targets.iterator();
-        for(int j=0; j < r-1; iter.next());
-        String target = iter.next();
-        
-        // don't create two replicas on the same target
-        while(targetSetNames.contains(target)) {
-          r = rand.nextInt(targets.size());
-          iter = targets.iterator();
-          for(int j=0; j < r-1; iter.next());
-          target = iter.next();
-        }
-        
-        targetSet.add(new DatanodeInfo(new DatanodeID(target)));
-      }
-      
-      DatanodeInfo[] targetSetArray = targetSet.toArray(new DatanodeInfo[targetSet.size()]);
+      targetsCopy = targetsCopy.subList(0, Math.min(replicationFactor, targetsCopy.size()));
+      DatanodeInfo[] targetSetArray = targetsCopy.toArray(new DatanodeInfo[targetsCopy.size()]);
       return targetSetArray;
     }
     
@@ -196,16 +178,29 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     // failed interactions with a datanode
     private void scanDatanodes() throws IOException {
       log.info("scanning datanodes table ..");
-      targets.clear();
+      HashSet<DatanodeInfo> updatedTargets = new HashSet<DatanodeInfo>();
       BatchScanner scanner = createBatchScanner(datanodesTable, new Range());
-      ColumnFQ.fetch(scanner, remaining);
+      infoIpcPort.fetch(scanner);
       try {
         for (Entry<Key,Value> entry : scanner) {
-          targets.add(entry.getKey().getRow().toString());
+          String nodeName = entry.getKey().getRow().toString();
+          int ipcPort = Integer.parseInt(entry.getValue().toString());
+          updatedTargets.add(new DatanodeInfo(new DatanodeID(nodeName, "", -1, ipcPort)));
         }
       } finally {
         scanner.close();
       }
+      try {
+        if (updatedTargets.isEmpty()) {
+          log.info("scanning datanodes from zookeeper ..");
+          for (String nodeName : zookeeper.getChildren().forPath(DNNConstants.DATANODES_PATH))
+            updatedTargets.add(new DatanodeInfo(new DatanodeID(nodeName)));
+        }
+      } catch (Exception ex) {
+        log.warn(ex, ex);
+      }
+      log.info("there are " + updatedTargets.size() + " datanodes");
+      targets = updatedTargets;
     }
   } 
   
@@ -258,7 +253,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   
   private Random rand = new Random();
   private Replicator replicator = new Replicator();
-  private Connector conn;
+  private final Connector conn;
   private final static String namespaceTable = "namespace";
   private final static String blocksTable = "blocks";
   private final static String datanodesTable = "datanodes";
@@ -267,11 +262,12 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   private final static Text blocksFam = new Text("blocks");
   private final static Text datanodesFam = new Text("datanodes");
   private final static Text commandFam = new Text("command");
+
   private final static ColumnFQ remaining = new ColumnFQ(infoFam, new Text("remaining"));
-  
   private final static ColumnFQ infoSize = new ColumnFQ(infoFam, new Text("size"));
   private final static ColumnFQ isDir = new ColumnFQ(infoFam, new Text("isDir"));
   private final static ColumnFQ infoCapacity = new ColumnFQ(infoFam, new Text("capacity"));
+  private final static ColumnFQ infoIpcPort = new ColumnFQ(infoFam, new Text("ipc_port"));
   private final static ColumnFQ infoUsed = new ColumnFQ(infoFam, new Text("used"));
   private final static ColumnFQ infoReplication = new ColumnFQ(infoFam, new Text("replication"));
   private final static ColumnFQ infoBlockSize = new ColumnFQ(infoFam, new Text("blocksize"));
@@ -291,51 +287,18 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   
   private long lastRemaining = -1;
   
-  private ZooKeeper zookeeper = null;
-  private String instanceName = null;
-  private String keepers = null;
-  private final String username = "root";
-  private byte[] passwd = "secret".getBytes();
-  
-  private Connector getConnector() {
-    synchronized (this) {
-      if (conn == null) {
-        try {
-          Instance instance = new ZooKeeperInstance(instanceName, keepers);
-          conn = instance.getConnector(username, passwd);
-        } catch (Exception ex) {
-          conn = null;
-          log.warn("Unable to get connector " + ex);
-        }
-      }
-    }
-    return conn;
-  }
-  
-  public DistributedNamenodeProxy(Configuration conf) throws IOException {
-    instanceName = conf.get("accumulo.zookeeper.instance");
-    keepers = conf.get("accumulo.zookeeper.keepers");
-    zookeeper = new ZooKeeper(keepers, 30000, new Watcher() {
-      @Override
-      public void process(WatchedEvent arg0) {
-        log.info("zookeeper says " + arg0);
-      }
-    });
-    for (String name : new String[] {DNNConstants.DNN, DNNConstants.BLOCKS_PATH, DNNConstants.DATANODES_PATH, DNNConstants.NAMESPACE_PATH}) {
-      try {
-        zookeeper.create(name, new byte[]{}, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      } catch (KeeperException.NodeExistsException ex) {
-        // ya, ya, don't care
-      } catch (Exception ex) {
-        throw new IOException(ex);
-      } 
-    }
-  }
+  private final CuratorFramework zookeeper;
   
-  public DistributedNamenodeProxy(Connector conn) throws IOException {
+  public DistributedNamenodeProxy(CuratorFramework keeper, URI uri) throws IOException {
     log.info("========= Distributed Name Node Proxy init =========");
-    this.conn = conn;
-    
+    ConnectInfo info = new ConnectInfo(uri);
+    Instance instance = new ZooKeeperInstance(info.instance, info.zookeepers);
+    zookeeper = keeper;
+    try {
+      this.conn = instance.getConnector(info.username, info.passwd);
+    } catch (Exception e) {
+      throw new IOException(e);
+    } 
     //		String healthNodeHost = config.get("healthnode");
     //		if(healthNodeHost == null)
     //			throw new IOException("error: no healthnode address specified. add one to core-site.xml");
@@ -355,6 +318,57 @@ public class DistributedNamenodeProxy implements FakeNameNode {
       throws IOException {
     log.info("using abandonBlock");
     
+    // find the block's position in the list (probably the last one)
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    bs.fetchColumnFamily(blocksFam);
+    
+    // delete it from the file
+    Mutation m = new Mutation(new Text(src));
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        String cq = entry.getKey().getColumnQualifier().toString();
+        String parts[] = cq.split("_");
+        long block = Long.parseLong(parts[1]);
+        if (b.getBlockId() == block) {
+          m.putDelete(blocksFam, entry.getKey().getColumnQualifier());
+        }
+      }
+    } finally {
+      bs.close();
+    }
+    if (m.getUpdates().isEmpty()) {
+      throw new IOException("Block " + b.getBlockId() + " not found to abandon for " + src);
+    }
+    
+    // delete the block size and location information
+    BatchWriter bw = createBatchWriter(namespaceTable);
+    try {
+      bw.addMutation(m);
+    } catch (MutationsRejectedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        bw.close();
+      } catch (MutationsRejectedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    bw = createBatchWriter(blocksTable);
+    try {
+      Text row = new Text("" + b.getBlockId());
+      bs = createBatchScanner(blocksTable, new Range(row));
+      m = new Mutation(row);
+      for (Entry<Key,Value> entry : bs) {
+        m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+      }
+    } finally {
+      try {
+        bw.close();
+      } catch (MutationsRejectedException e) {
+        throw new RuntimeException(e);
+      }
+      bs.close();
+    }
   }
   
   @Override
@@ -378,6 +392,29 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   public LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludeNodes)
       throws IOException {
     log.info("using addBlock " + src + " " + clientName);
+
+    // get the last block ID and replication
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    bs.fetchColumnFamily(blocksFam);
+    infoReplication.fetch(bs);
+    
+    // TODO: fetch from configuration
+    int defaultReplication = 3;
+    int replication = -1;
+    int blockPos = 0;
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        if (entry.getKey().getColumnFamily().equals(blocksFam))
+          blockPos++;
+        if (infoReplication.hasColumns(entry.getKey()))
+          replication = Integer.parseInt(entry.getValue().toString());
+      }
+    } finally {
+      bs.close();
+    }
+    if (replication < 1) {
+      replication = defaultReplication;
+    }
     
     // create new blocks on data nodes
     //   zookeeper holds the negative numbered blocks
@@ -387,7 +424,8 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     Block b = new Block(blockID, 0, 0);
     
     // choose a set of nodes on which to replicate block
-    DatanodeInfo[] targets = replicator.getReplicationTargets(); 
+    DatanodeInfo[] targets = replicator.getReplicationTargets(replication); 
+    log.info("replicating " + blockID + " to " + Arrays.asList(targets));
     
     // TODO: get a lease to the first
     // TODO: can we record all this in the namespace table?
@@ -397,19 +435,6 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     // record block to host mapping and vice versa
     recordBlockHosts(blockIDBytes, targets);
     
-    // get the last block ID
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    bs.fetchColumnFamily(blocksFam);
-    
-    int blockPos = 0;
-    try {
-      for (@SuppressWarnings("unused") Entry<Key,Value> entry : bs) {
-        blockPos++;
-      }
-    } finally {
-      bs.close();
-    }
-    
     // record file to block mapping
     Mutation nameData = new Mutation(new Text(src.getBytes()));
     nameData.put(blocksFam, new Text(String.format("%08d_%d", blockPos, blockID).getBytes()), blank);
@@ -440,29 +465,35 @@ public class DistributedNamenodeProxy implements FakeNameNode {
    */
   
   @Override
-  public void blockReceived(DatanodeRegistration registration, Block[] blocks, String[] delHints) throws IOException {
+  public void blockReceived(DatanodeRegistration registration, final Block[] blocks, String[] delHints) throws IOException {
     log.info("using blockReceived");
-    
-    // for each block we should have recorded its existence already
-    // we should also know about the datanode
-    
-    // update blocks table
-    BatchWriter bw = createBatchWriter(blocksTable);
-    try {
-      try {
-        for(Block b : blocks) {
-          Mutation blockData = new Mutation(new Text(Long.toString(b.getBlockId())));
-          ColumnFQ.put(blockData, infoBlockSize, new Value(Long.toString(b.getNumBytes()).getBytes()));
-          bw.addMutation(blockData);
+    SimpleTimer.getInstance().schedule(new TimerTask() {
+      @Override
+      public void run() {
+        
+        // for each block we should have recorded its existence already
+        // we should also know about the datanode
+        
+        // update blocks table
+        try {
+          final BatchWriter bw = createBatchWriter(blocksTable);
+          try {
+            for(Block b : blocks) {
+              if (ZookeeperNameNode.isZooBlockId(b.getBlockId()))
+                continue;
+              Mutation blockData = new Mutation(new Text(Long.toString(b.getBlockId())));
+              infoBlockSize.put(blockData, new Value(Long.toString(b.getNumBytes()).getBytes()));
+              bw.addMutation(blockData);
+            }
+          } finally {
+            bw.close();
+          }
+          // update total file space ?
+        } catch (Exception ex) {
+          log.info(ex, ex);
         }
-      } finally {
-        bw.close();
       }
-      // update total file space ?
-    } catch (MutationsRejectedException ex) {
-      throw new IOException(ex);
-    }
-    
+    }, 0);
   }
   
   @Override
@@ -472,9 +503,13 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     BlockListAsLongs blist = new BlockListAsLongs(blocks);
     Set<Long> current = new HashSet<Long>();
     for (int i = 0; i < blist.getNumberOfBlocks(); i++) {
-      current.add(blist.getBlockId(i));
+      if (blist.getBlockId(i) > 0) {
+        current.add(blist.getBlockId(i));
+      }
     }
     log.info(registration.getName() + " reports blocks " + current);
+    if (current.isEmpty())
+      return null;
     BatchWriter bw = createBatchWriter(blocksTable);
     Mutation m = new Mutation(registration.getName());
     Scanner scan = createScanner(datanodesTable);
@@ -542,41 +577,33 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     } finally {
       bs.close();
     }
-    log.info("have ranges " + ranges);
     if (ranges.isEmpty())
       return true;
     long fileSize = 0;
-    retry:
-    while (true) {
-      BatchScanner blockScanner = createBatchScanner(blocksTable, ranges.toArray(new Range[]{}));
-      ColumnFQ.fetch(blockScanner, infoBlockSize);
-      fileSize = 0;
-      int count = 0;
-      try {
-        for (Entry<Key,Value> entry : blockScanner) {
-          log.info("Looking at block sizes " + entry.getKey() + " -> " + entry.getValue());
-          long blockSize = Long.parseLong(new String(entry.getValue().get()));
-          if (blockSize == 0) {
-            UtilWaitThread.sleep(250);
-            continue retry;
-          }
-          fileSize += blockSize;
-          count++;
-        }
-      } finally {
-        blockScanner.close();
-      }
-      if (count != ranges.size()) {
-        log.info("Did not read block sizes for all blocks on file " + src + " read " + count + " but expected " + ranges.size());
-        UtilWaitThread.sleep(250);
-        continue;
+    BatchScanner blockScanner = createBatchScanner(blocksTable, ranges.toArray(new Range[]{}));
+    infoBlockSize.fetch(blockScanner);
+    fileSize = 0;
+    int count = 0;
+    try {
+      for (Entry<Key,Value> entry : blockScanner) {
+        log.info("Looking at block sizes " + entry.getKey() + " -> " + entry.getValue());
+        long blockSize = Long.parseLong(new String(entry.getValue().get()));
+        if (blockSize == 0) 
+          break;
+        fileSize += blockSize;
+        count++;
       }
-      break;
+    } finally {
+      blockScanner.close();
+    }
+    if (count != ranges.size()) {
+      log.info("Did not read block sizes for all blocks on file " + src + " read " + count + " but expected " + ranges.size());
+      return false;
     }
     
     // write size to namespace table
     Mutation fileSizePut = new Mutation(new Text(src.getBytes()));
-    ColumnFQ.put(fileSizePut, infoSize, new Value(Long.toString(fileSize).getBytes()));
+    infoSize.put(fileSizePut, new Value(Long.toString(fileSize).getBytes()));
     BatchWriter bw = createBatchWriter(namespaceTable);
     try {
       try {
@@ -597,7 +624,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   }
   
   static private void put(Mutation m, ColumnFQ cfq, String value) {
-    ColumnFQ.put(m, cfq, new Value(value.getBytes()));
+    cfq.put(m, new Value(value.getBytes()));
   }
   
   static private Value now() {
@@ -614,7 +641,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     ColumnFQ srcColumn = new ColumnFQ(childrenFam, new Text(src));
     
     BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(parent)));
-    ColumnFQ.fetch(bs, isDir);
+    isDir.fetch(bs);
     bs.fetchColumnFamily(childrenFam);
     try {
       for (Entry<Key,Value> entry : bs) {
@@ -658,7 +685,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
       BatchWriter bw = createBatchWriter(namespaceTable);
       try {
         Mutation createRequest = new Mutation(new Text(src));
-        ColumnFQ.put(createRequest, infoModificationTime, now());
+        infoModificationTime.put(createRequest, now());
         put(createRequest, infoReplication, Short.toString(replication));
         put(createRequest, infoBlockSize, Long.toString(blockSize));
         put(createRequest, infoPermission, masked.toString());
@@ -702,7 +729,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     }
   }
   
-  private BatchWriter createBatchWriter(String table) throws IOException {
+  public BatchWriter createBatchWriter(String table) throws IOException {
     return createBatchWriter(conn, table);
   }
   
@@ -722,7 +749,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     
     // determine whether this is a directory
     BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    ColumnFQ.fetch(bs, isDir);
+    isDir.fetch(bs);
     bs.fetchColumnFamily(childrenFam);
     
     String isDir_ = null;
@@ -744,7 +771,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     Mutation childDelete = new Mutation(new Text(parent));
     Text srcText = new Text(src);
     childDelete.putDelete(childrenFam, srcText);
-    ColumnFQ.put(childDelete, infoModificationTime, now());
+    infoModificationTime.put(childDelete, now());
     
     ArrayList<Mutation> deletes = new ArrayList<Mutation>();
     getDeletes(srcText, deletes);
@@ -796,6 +823,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
           }
         }
       }
+      bs.close();
       bw.close();
       log.info("Host -> block map " + hostBlockMap);
 
@@ -889,6 +917,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
       throw new IOException("file not found: " + src);
     }
     
+    boolean underConst = false;
     fileLength = 0L;
     long blockOffset = 0L;
     for(Text id : IDs.keySet()) {
@@ -905,12 +934,15 @@ public class DistributedNamenodeProxy implements FakeNameNode {
       ArrayList<DatanodeInfo> dni = new ArrayList<DatanodeInfo>();
       bs = createBatchScanner(blocksTable, new Range(idString));
       bs.fetchColumnFamily(datanodesFam);
-      ColumnFQ.fetch(bs, infoBlockSize);
+      infoBlockSize.fetch(bs);
       try {
         for (Entry<Key,Value> entry : bs) {
           if (infoBlockSize.hasColumns(entry.getKey())) {
             blockSize = Long.parseLong(new String(entry.getValue().get()));
             fileLength += blockSize;
+            if (blockSize == 0) {
+              underConst = true;
+            }
             log.info("got size " + blockSize + " for block " + blockIDString);
           } else if (entry.getKey().getColumnFamily().equals(datanodesFam)) {
             String host = entry.getKey().getColumnQualifier().toString();
@@ -935,17 +967,16 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     }
     
     // TODO: sort locatedBlocks by network-distance from client
-    boolean underConst = false;
-    log.info("Reporting file size of " + fileLength);
-    BatchWriter bw = createBatchWriter(namespaceTable);
-    try {
-      Mutation m = new Mutation(src);
-      ColumnFQ.put(m, infoSize, new Value(Long.toString(fileLength).getBytes()));
-      bw.addMutation(m);
-      bw.close();
-    } catch (Exception ex) {
-      throw new IOException(ex);
-    }
+    log.info("Reporting file size of " + fileLength + " underConstruction = " + true);
+//    BatchWriter bw = createBatchWriter(namespaceTable);
+//    try {
+//      Mutation m = new Mutation(src);
+//      ColumnFQ.put(m, infoSize, new Value(Long.toString(fileLength).getBytes()));
+//      bw.addMutation(m);
+//      bw.close();
+//    } catch (Exception ex) {
+//      throw new IOException(ex);
+//    }
     return new LocatedBlocks(fileLength, locatedBlocks, underConst);
   }
   
@@ -1013,8 +1044,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
    * This method is currently doing a lot of lookups ...
    */
   @Override
-  public DirectoryListing getListing(String src, byte[] startAfter)
-      throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
+  public DirectoryListing getListing(String src, byte[] startAfter) throws IOException {
     log.info("using getListing " + src);
     // TODO: use startAfter and needLocation
     
@@ -1023,7 +1053,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     String isDirFlag = null;
     BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
     bs.fetchColumnFamily(childrenFam);
-    ColumnFQ.fetch(bs, isDir);
+    isDir.fetch(bs);
     List<String> children = new ArrayList<String>();
     try {
       for (Entry<Key,Value> entry : bs) {
@@ -1143,7 +1173,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     
     src = normalizePath(src);
     BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    ColumnFQ.fetch(bs, isDir);
+    isDir.fetch(bs);
     try {
       for (Entry<Key,Value> entry : bs) {
         if (isDir.hasColumns(entry.getKey())) {
@@ -1162,7 +1192,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     byte[] parentPath = getParentPath(src);
     
     bs = createBatchScanner(namespaceTable, new Range(new Text(parentPath)));
-    ColumnFQ.fetch(bs, isDir);
+    isDir.fetch(bs);
     bs.fetchColumnFamily(childrenFam);
     String isDirString = null;
     try {
@@ -1190,8 +1220,8 @@ public class DistributedNamenodeProxy implements FakeNameNode {
       try {
         bw.addMutation(m);
         m = new Mutation(new Text(src));
-        ColumnFQ.put(m, isDir, new Value("Y".getBytes()));
-        ColumnFQ.put(m, infoModificationTime, new Value(Long.toString(System.currentTimeMillis()).getBytes()));
+        isDir.put(m, new Value("Y".getBytes()));
+        infoModificationTime.put(m, new Value(Long.toString(System.currentTimeMillis()).getBytes()));
         bw.addMutation(m);
       } finally {
         bw.close();
@@ -1213,7 +1243,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   @Override
   public UpgradeCommand processUpgradeCommand(UpgradeCommand comm)
       throws IOException {
-    log.info("using processUpgradeCommand");
+    unimplemented(comm);
     return null;
   }
   
@@ -1248,44 +1278,6 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     
   }
   
-  /**
-   * helpers 
-   * both of these write to blocksTable and datanodesTable
-   * 
-   * @param host
-   * @param hblocks
-   * @throws IOException
-   */
-  private void recordHostBlocks(String host, long[] hblocks) throws IOException {
-    try {
-      if(hblocks.length == 0)
-        return;
-      
-      Mutation hostData = new Mutation(new Text(host));
-      for(int i=0; i < hblocks.length; i++)
-        hostData.put(blocksFam, new Text(Long.toString(hblocks[i]).getBytes()), blank);
-      BatchWriter writer = createBatchWriter(datanodesTable);
-      try {
-        writer.addMutation(hostData);
-      } finally {
-        writer.close();
-      }
-      
-      writer = createBatchWriter(blocksTable);
-      try {
-        for(int i=0; i < hblocks.length; i++) {
-          Mutation block = new Mutation(new Text(Long.toString(hblocks[i]).getBytes()));
-          block.put(datanodesFam, new Text(host.getBytes()), blank);
-          writer.addMutation(block);
-        }
-      } finally {
-        writer.close();
-      }
-    } catch (MutationsRejectedException ex) {
-      throw new IOException(ex);
-    }
-  }
-  
   @Override
   public boolean recoverLease(String src, String clientName) throws IOException {
     unimplemented(src, clientName);
@@ -1294,8 +1286,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   
   @Override
   public void refreshNodes() throws IOException {
-    log.info("using refreshNodes");
-    
+    unimplemented();
   }
   
   @Override
@@ -1305,11 +1296,10 @@ public class DistributedNamenodeProxy implements FakeNameNode {
     
     // record this datanode's info
     try {
-      Connector conn = getConnector();
       if (conn != null) {
         BatchWriter bw = createBatchWriter(datanodesTable);
         Mutation reg = new Mutation(new Text(registration.name.getBytes()));
-        ColumnFQ.put(reg, infoStorageID, new Value(registration.storageID.getBytes()));
+        infoStorageID.put(reg, new Value(registration.storageID.getBytes()));
         try {
           try {
             bw.addMutation(reg);
@@ -1321,7 +1311,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
         }
       }
     } catch (Throwable ex) {
-      log.info("Ignoring exceptiong, maybe accumulo is not yet initialized? " + ex);
+      log.info("Ignoring exception, maybe accumulo is not yet initialized? " + ex);
     }
     // clients get this info in a list of targets from addBlock()
     return registration;
@@ -1337,14 +1327,18 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   }
   
   FileStatus getFileStatus(String src) throws IOException {
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
-    ColumnFQ.fetch(bs, isDir);
     FileStatus result = new FileStatus(false, false);
-    for (Entry<Key,Value> entry : bs) {
-      result.exists = true;
-      if (new String(entry.getValue().get()).equals("Y")) {
-        result.isDir = true;
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
+    try {
+      isDir.fetch(bs);
+      for (Entry<Key,Value> entry : bs) {
+        result.exists = true;
+        if (new String(entry.getValue().get()).equals("Y")) {
+          result.isDir = true;
+        }
       }
+    } finally {
+      bs.close();
     }
     return result;
   }
@@ -1416,83 +1410,106 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   
   @Override
   public void renewLease(String clientName) throws IOException {
-    log.info("using renewLease");
-    
+    unimplemented(clientName);
   }
   
   @Override
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
-    log.info("using reportBadBlocks");
+    unimplemented((Object)blocks);
   }
   
   @Override
   public void saveNamespace() throws IOException {
-    log.info("using saveNamespace");
-    
+    unimplemented();
   }
   
+  private static class SendResult {
+    List<DatanodeCommand> commands = new ArrayList<DatanodeCommand>();
+    List<Mutation> deletes = new ArrayList<Mutation>();
+  }
+  
+  // try to send a heartbeat.. if it times out, do nothing: we are probably recovering the metadata tables
   @Override
   public DatanodeCommand[] sendHeartbeat(final DatanodeRegistration registration,
       final long capacity, final long dfsUsed, final long remaining, final int xmitsInProgress,
       final int xceiverCount) throws IOException {
-    log.info("using sendHeartbeat");
-    if (System.currentTimeMillis() - start < 10*1000)
-      return new DatanodeCommand[0];
-    
-    // update datanodes table with info
-    // skip this if none of the numbers have changed
-    // TODO: get last numbers from a lookup
-    if(capacity != lastCapacity || 
-        dfsUsed != lastDfsUsed ||
-        remaining != lastRemaining) {
-      Connector conn;
-      try {
-        conn = getConnector();
-      } catch (Throwable ex) {
-        // probably not initialized
-        return new DatanodeCommand[0];
-      }
-      try {
-        if (conn != null) {
-          BatchWriter bw = createBatchWriter(conn, datanodesTable);
-          Mutation m = new Mutation(new Text(registration.name.getBytes()));
-          ColumnFQ.put(m, infoCapacity, new Value(Long.toString(capacity).getBytes()));
-          ColumnFQ.put(m, infoUsed, new Value(Long.toString(dfsUsed).getBytes()));
-          ColumnFQ.put(m, DistributedNamenodeProxy.remaining, new Value(Long.toString(remaining).getBytes()));
+    
+    FutureTask<SendResult> future = new FutureTask<SendResult>(new Callable<SendResult>() {
+      @Override
+      public SendResult call() throws Exception {
+        SendResult result = new SendResult();
+        
+        log.info("using sendHeartbeat");
+        if (!conn.tableOperations().exists(datanodesTable))
+          return result;
+        // update datanodes table with info
+        // skip this if none of the numbers have changed
+        // TODO: get last numbers from a lookup
+        if(capacity != lastCapacity || 
+            dfsUsed != lastDfsUsed ||
+            remaining != lastRemaining) {
           try {
-            bw.addMutation(m);
-          } finally {
-            bw.close();
+            BatchWriter bw = createBatchWriter(conn, datanodesTable);
+            Mutation m = new Mutation(new Text(registration.name.getBytes()));
+            infoCapacity.put(m, new Value(Long.toString(capacity).getBytes()));
+            infoUsed.put(m, new Value(Long.toString(dfsUsed).getBytes()));
+            infoIpcPort.put(m, new Value(Integer.toString(registration.getIpcPort()).getBytes()));
+            DistributedNamenodeProxy.remaining.put(m, new Value(Long.toString(remaining).getBytes()));
+            try {
+              bw.addMutation(m);
+            } finally {
+              bw.close();
+            }
+          } catch (Exception ex) {
+            log.error(ex, ex);
           }
         }
-      } catch (Exception ex) {
-        log.error(ex, ex);
+        lastCapacity = capacity;
+        lastDfsUsed = dfsUsed;
+        lastRemaining = remaining;
+        // return a list of commands for the data node
+        List<DatanodeCommand> commands = new ArrayList<DatanodeCommand>();
+        try {
+          BatchScanner bs = createBatchScanner(datanodesTable, new Range(registration.getName()));
+          bs.fetchColumnFamily(commandFam);
+          for (Entry<Key,Value> entry : bs) {
+            Key key = entry.getKey();
+            DatanodeCommand command = (DatanodeCommand)deserialize(entry.getValue().get());
+            log.info("found datanode Command " + command);
+            commands.add(command);
+            Mutation m = new Mutation(key.getRow());
+            m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+            result.deletes.add(m);
+          }
+          bs.close();
+        } catch (Exception ex) {
+          throw new IOException(ex);
+        }
+
+        return result;
+      }
+    });
+    
+    executor.execute(future);
+    try {
+      synchronized(future) {
+        future.wait(1000);
       }
+    } catch (InterruptedException ex) {
+      // ignored
     }
-    lastCapacity = capacity;
-    lastDfsUsed = dfsUsed;
-    lastRemaining = remaining;
-    // return a list of commands for the data node
-    List<DatanodeCommand> commands = new ArrayList<DatanodeCommand>();
     try {
-      BatchScanner bs = createBatchScanner(datanodesTable, new Range(registration.getName()));
-      bs.fetchColumnFamily(commandFam);
-      BatchWriter bw = createBatchWriter(datanodesTable);
-      for (Entry<Key,Value> entry : bs) {
-        Key key = entry.getKey();
-        DatanodeCommand command = (DatanodeCommand)deserialize(entry.getValue().get());
-        log.info("found datanode Command " + command);
-        commands.add(command);
-        Mutation m = new Mutation(key.getRow());
-        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-        bw.addMutation(m);
+      if (future.isDone()) {
+        SendResult result = future.get();
+        BatchWriter bw = createBatchWriter(datanodesTable);
+        bw.addMutations(result.deletes);
+        bw.close();
+        return result.commands.toArray(new DatanodeCommand[0]);
       }
-      bs.close();
-      bw.close();
     } catch (Exception ex) {
-      throw new IOException(ex);
+      log.error(ex, ex);
     }
-    return commands.toArray(new DatanodeCommand[0]);
+    return new DatanodeCommand[0];
   }
   
   
@@ -1528,8 +1545,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   @Override
   public void setOwner(String src, String username, String groupname)
       throws IOException {
-    log.info("using setOwner");
-    
+    unimplemented(src, username, groupname);    
   }
   
   @Override
@@ -1542,7 +1558,7 @@ public class DistributedNamenodeProxy implements FakeNameNode {
         BatchWriter bw = createBatchWriter(namespaceTable);
         try {
           Mutation m = new Mutation(src);
-          ColumnFQ.put(m, infoPermission, new Value(permission.toString().getBytes()));
+          infoPermission.put(m, new Value(permission.toString().getBytes()));
           bw.addMutation(m);
         } finally {
           bw.close();
@@ -1556,41 +1572,31 @@ public class DistributedNamenodeProxy implements FakeNameNode {
   @Override
   public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
       throws IOException {
-    log.info("using setQuota");
-    
+    unimplemented(path, namespaceQuota, diskspaceQuota);
   }
   
   @Override
   public boolean setReplication(String src, short replication)
       throws IOException {
-    log.info("using setReplication");
+    unimplemented(src, replication);
     return false;
   }
   
   @Override
   public boolean setSafeMode(SafeModeAction action) throws IOException {
-    log.info("using setSafeMode");
+    unimplemented(action);
     return false;
   }
   
   
   @Override
   public void setTimes(String src, long mtime, long atime) throws IOException {
-    log.info("using setTimes");
-    
-  }
-  
-  public void stop() {
+    unimplemented(src, mtime, atime);
   }
   
   @Override
   public NamespaceInfo versionRequest() throws IOException {
-    log.info("using versionRequest");
-    // TODO: find out how to get namespace id
-    // could store this in the info of the / entry
-    NamespaceInfo nsi = new NamespaceInfo(384837986, 0, 0);
-    //throw new RuntimeException();
-    return nsi;
+    throw new NotImplementedException();
   }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4efec671/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java
----------------------------------------------------------------------
diff --git a/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java b/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java
deleted file mode 100644
index 0711624..0000000
--- a/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DNNConstants;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy.ConnectInfo;
-import org.apache.log4j.Logger;
-
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.CuratorFrameworkFactory.Builder;
-import com.netflix.curator.retry.RetryUntilElapsed;
-
-public class SwitchingNameNode {
-  private static final Logger log = Logger.getLogger(SwitchingNameNode.class);
-  static Pattern isRoot = Pattern.compile("/accumulo(|/instance_id.*|/version.*|/walogArchive|/recovery.*|/tables$|/tables/(\\!0|0|1|2)(/.*|$))");
-  
-  static private boolean isZooName(String path) {
-    boolean result = isRoot.matcher(path).matches();
-    log.info("Looking at " + path + " isZooName " + result);
-    return result;
-  }
-  
-  public static FakeNameNode create(final Configuration conf) {
-    try {
-      URI uri = new URI(conf.get("fs.default.name"));
-      ConnectInfo info = new ConnectInfo(uri);
-      Builder builder = CuratorFrameworkFactory.builder().namespace(DNNConstants.DNN);
-      builder.connectString(info.zookeepers);
-      builder.retryPolicy(new RetryUntilElapsed(120*1000, 500));
-      //builder.aclProvider(aclProvider);
-      CuratorFramework client = builder.build();
-      client.start();
-      ZookeeperNameNode zoo = new ZookeeperNameNode(client);
-      return SwitchingNameNode.create(zoo, info);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-  
-  
-  public static FakeNameNode create(final FakeNameNode zoonode, final ConnectInfo info) {
-    
-    return (FakeNameNode) Proxy.newProxyInstance(SwitchingNameNode.class.getClassLoader(), new Class<?>[] {FakeNameNode.class}, new InvocationHandler() {
-      FakeNameNode distributed = null;
-      
-      @Override
-      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-        log.info("calling " + method.getName());
-        if (method.getName().equals("toString")) {
-          return "FakeNameNode " + zoonode + " dynamic name node@ " + info.instance;
-        }
-        try {
-          // handle versionRequest with the zoonode
-          if (method.getName().equals("versionRequest")) {
-            return method.invoke(zoonode, args);
-          }
-          // split blocks manually to the separate nodes
-          if (method.getName().equals("blockReport")) {
-            long[] blocks = (long[])args[1];
-            long[] zooblocks = new long[blocks.length];
-            long[] distblocks = new long[blocks.length];
-            int zoocount = 0;
-            int distcount = 0;
-            for (long block : blocks)
-              if (block < 0)
-                zooblocks[zoocount++] = block;
-              else
-                distblocks[distcount++] = block;
-            zooblocks = Arrays.copyOf(zooblocks, zoocount);
-            distblocks = Arrays.copyOf(distblocks, distcount);
-            Object result = null;
-            if (zooblocks.length > 0) {
-              args[1] = zooblocks;
-              log.info("Calling zoo blockReport with " + zooblocks.length + " blocks");
-              result = method.invoke(zoonode, args);
-            }
-            if (distblocks.length > 0 && distributed != null) {
-              args[1] = distblocks;
-              log.info("Calling dist blockReport with " + distblocks.length + " blocks");
-              result = method.invoke(distributed, args);
-            }
-            return result;
-          }
-          if (method.getName().equals("blockReceived")) {
-            Block[] blocks = (Block[])args[1];
-            List<Block> zooblocks = new ArrayList<Block>();
-            List<Block> distblocks = new ArrayList<Block>();
-            for (Block block : blocks) {
-              if (block.getBlockId() < 0)
-                zooblocks.add(block);
-              else
-                distblocks.add(block);
-            }
-            Object result = null;
-            if (!zooblocks.isEmpty()) {
-              args[1] = zooblocks.toArray(new Block[0]);
-              result = method.invoke(zoonode, args);
-            }
-            if (distributed != null && !distblocks.isEmpty()) {
-              args[1] = distblocks.toArray(new Block[0]);
-              result = method.invoke(distributed, args);
-            }
-            return result;
-          }
-          // dispatch on filename
-          if (args != null && args.length > 0 && (args[0] instanceof String)) {
-            if (isZooName((String) args[0]))
-              return method.invoke(zoonode, args);
-          }
-          // try to send to the dnn, then to znn
-          synchronized (this) {
-            if (distributed == null) {
-              try {
-                Instance zinst = new ZooKeeperInstance(info.instance, info.zookeepers);
-                Connector conn = zinst.getConnector(info.username, info.passwd);
-                distributed = new DistributedNamenodeProxy(conn);
-              } catch (Exception ex) {
-                log.warn("error invoking " + method.getName() + ", invoking zookeeper version");
-                return method.invoke(zoonode, args);
-              }
-            }
-          }
-          return method.invoke(distributed, args);
-        } catch (InvocationTargetException ex) {
-          throw ex.getCause();
-        }
-      }
-    });
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4efec671/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
----------------------------------------------------------------------
diff --git a/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java b/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
index 632d781..64826cd 100644
--- a/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
+++ b/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
@@ -8,6 +8,8 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -15,15 +17,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
+import java.util.regex.Pattern;
 
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -41,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy.ConnectInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -54,6 +58,9 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 
 import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.CuratorFrameworkFactory.Builder;
+import com.netflix.curator.retry.RetryUntilElapsed;
 
 public class ZookeeperNameNode implements FakeNameNode {
   static private Logger log = Logger.getLogger(ZookeeperNameNode.class); 
@@ -103,8 +110,51 @@ public class ZookeeperNameNode implements FakeNameNode {
     boolean complete;
   }
   
-  public ZookeeperNameNode(CuratorFramework client) {
+  static Pattern isRoot = Pattern.compile("/accumulo(|/instance_id.*|/version.*|/walogArchive|/wal(/.*|$)|/recovery.*|/tables$|/tables/(\\!0|0|1|2)(/.*|$))");
+  
+  static private boolean isZooName(String path) {
+    boolean result = isRoot.matcher(path).matches();
+    log.info("Looking at " + path + " isZooName " + result);
+    return result;
+  }
+  static public boolean isZooBlockId(long blockId) {
+    return blockId < 0;
+  }
+  
+  private final URI uri;
+  private DistributedNamenodeProxy dist = null;
+  private final String instance;
+
+  private long start = System.currentTimeMillis();
+  
+  private static URI getURI(Configuration conf) throws IOException {
+    try {
+      return new URI(conf.get("fs.default.name"));
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+  
+  public ZookeeperNameNode(Configuration conf) throws IOException {
+    this(conf, getURI(conf));
+  }
+
+  public ZookeeperNameNode(Configuration conf, URI uri) throws IOException {
+    ConnectInfo info = new ConnectInfo(uri);
+    instance = info.instance;
+    Builder builder = CuratorFrameworkFactory.builder();
+    builder.connectString(info.zookeepers);
+    builder.retryPolicy(new RetryUntilElapsed(120*1000, 500));
+    //builder.aclProvider(aclProvider);
+    CuratorFramework client = builder.build();
+    client.start();
     this.keeper = client;
+    this.uri = uri;
+    try {
+      findDatanodes();
+    } catch (Exception e) {
+      // ignored
+    }
   }
   
   private static void unimplemented(Object ... args) {
@@ -113,12 +163,41 @@ public class ZookeeperNameNode implements FakeNameNode {
     log.warn(method + " unimplemented, args: " + Arrays.asList(args), t);
   }
   
+  private FakeNameNode dist() {
+    try {
+      if (dist == null) {
+        String instanceId = new String(keeper.getData().forPath("/accumulo/instances/" + instance));
+        log.info("Looking at instance " + instance + " id " + instanceId);
+        String tservers = "/accumulo/" + instanceId + "/tservers";
+        List<String> children = keeper.getChildren().forPath(tservers);
+        boolean atLeastOneTserver = false;
+        for (String child : children) {
+          List<String> locks = keeper.getChildren().forPath(tservers + "/" + child);
+          if (locks != null && locks.size() > 1) {
+            atLeastOneTserver = true;
+            break;
+          }
+        }
+        if (atLeastOneTserver)
+          dist = new DistributedNamenodeProxy(keeper, uri);
+      }
+    } catch (Exception ex) {
+      log.warn(ex, ex);
+    }
+    return dist;
+  }
+  
   @Override
   public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException {
+    log.info("getBlockLocations " + src);
+    if (!isZooName(src))
+      return dist().getBlockLocations(src, offset, length);
+    
     log.info("getBlockLocation " + src + " offset " + offset + " length " + length);
     try {
       Map<String, BlockInfo> blocks = new TreeMap<String, BlockInfo>();
       String blockpath = DNNConstants.NAMESPACE_PATH + src;
+      boolean underConstruction = false;
       for (String child : keeper.getChildren().forPath(blockpath)) {
         byte[] data = keeper.getData().forPath(blockpath + "/" + child);
         Object obj = deserialize(data);
@@ -130,9 +209,12 @@ public class ZookeeperNameNode implements FakeNameNode {
             info = (BlockInfo)obj;
             blocks.put(child, info);
             log.info(src + " block " + info.id + " size " + info.size);
+            if (!info.complete)
+              underConstruction = true;
           }
         }
       }
+      Map<String,DatanodeRegistration> datanodes = findDatanodes();
       log.info("Got " + blocks.size() + " blocks for " + src);
       List<LocatedBlock> lblocks = new ArrayList<LocatedBlock>();
       long currentOffset = 0;
@@ -140,7 +222,11 @@ public class ZookeeperNameNode implements FakeNameNode {
         BlockInfo binfo = entry.getValue();
         DatanodeInfo[] info = new DatanodeInfo[binfo.datanodes.length];
         for (int j = 0; j < info.length; j++) {
-          info[j] = new DatanodeInfo(new DatanodeID(binfo.datanodes[j]));
+          DatanodeRegistration dn = datanodes.get(binfo.datanodes[j]);
+          if (dn != null)
+            info[j] = new DatanodeInfo(new DatanodeID(binfo.datanodes[j], dn.getStorageID(), dn.getInfoPort(), dn.getIpcPort()));
+          else
+            info[j] = new DatanodeInfo(new DatanodeID(binfo.datanodes[j]));
         }
         log.info("Found " + entry.getKey() + " "+ info.length + " locations for block " + binfo.id);
         if (currentOffset >= offset && currentOffset < offset + length)
@@ -148,7 +234,7 @@ public class ZookeeperNameNode implements FakeNameNode {
         currentOffset += binfo.size;
       }
       log.info("Returning fileLength " + currentOffset + " for " + src);
-      return new LocatedBlocks(currentOffset, lblocks, false);
+      return new LocatedBlocks(currentOffset, lblocks, underConstruction);
     } catch (Exception ex) {
       throw new IOException(ex);
     }
@@ -178,6 +264,10 @@ public class ZookeeperNameNode implements FakeNameNode {
   public void create(String src, FsPermission masked, String clientName, boolean overwrite, boolean createParent, short replication, long blockSize)
       throws IOException {
     log.info("creating " + src);
+    if (!isZooName(src)) {
+      dist().create(src, masked, clientName, overwrite, createParent, replication, blockSize);
+      return;
+    }
     try {
       FileInfo fileInfo = new FileInfo(blockSize, System.currentTimeMillis(), masked.toString(), replication, 0);
       byte[] data = serialize(fileInfo);
@@ -236,7 +326,33 @@ public class ZookeeperNameNode implements FakeNameNode {
   
   @Override
   public void abandonBlock(Block b, String src, String holder) throws IOException {
-    unimplemented(b, src, holder);    
+    log.info("abandonBlock " + b.getBlockId() + ": " + src);
+    if (!isZooName(src)) { 
+      dist().abandonBlock(b, src, holder);
+      return;
+    }
+    try {
+      String path = DNNConstants.NAMESPACE_PATH + src;
+      for (String child : keeper.getChildren().forPath(path)) {
+        byte[] blockData = keeper.getData().forPath(path + "/" + child);
+        Object obj = deserialize(blockData);
+        if (obj instanceof BlockInfo) {
+          BlockInfo info = (BlockInfo) obj;
+          if (info.id == b.getBlockId()) {
+            keeper.delete().forPath(path + "/" + child);
+            try {
+              keeper.delete().forPath(DNNConstants.BLOCKS_PATH + "/" + b.getBlockName());
+            } catch (KeeperException.NoNodeException ex) {
+              // ignored
+            }
+            return;
+          }
+        }
+      }
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    throw new IOException("Unexpected error abandoning block " + b.getBlockId() + " block not found!");
   }
   
   @Override
@@ -246,6 +362,9 @@ public class ZookeeperNameNode implements FakeNameNode {
   
   @Override
   public LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludedNodes) throws IOException {
+    log.info("addBlock " + src);
+    if (!isZooName(src)) 
+      return dist().addBlock(src, clientName, excludedNodes);
     // get the list of online data nodes
     Map<String, DatanodeRegistration> nodes;
     try {
@@ -253,10 +372,19 @@ public class ZookeeperNameNode implements FakeNameNode {
     } catch (Exception e) {
       throw new IOException(e);
     }
-    int replication = 1;
     
-    if(nodes.size() < replication)
-      throw new IOException("unable to achieve required replication: too few datanodes running");
+    short defaultReplication = 3; // TODO: read config
+    short replication = -1;
+    try {
+      String path = DNNConstants.NAMESPACE_PATH + src;
+      byte data[] = keeper.getData().forPath(path);
+      HdfsFileStatus status = decodeFile(path, data);
+      replication = status.getReplication();
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    if (replication < 0)
+      replication = defaultReplication;
     
     List<String> randomList = new ArrayList<String>(nodes.keySet());
     Collections.shuffle(randomList);
@@ -293,16 +421,20 @@ public class ZookeeperNameNode implements FakeNameNode {
     }
   }
 
-  private Map<String, DatanodeRegistration> findDatanodes() throws Exception {
-    List<String> children = keeper.getChildren().forPath(DNNConstants.DATANODES_PATH);
+  private synchronized Map<String, DatanodeRegistration> findDatanodes() throws IOException {
     Map<String, DatanodeRegistration> nodes = new HashMap<String, DatanodeRegistration>();
-    for (String child : children) {
-      byte[] data = keeper.getData().forPath(DNNConstants.DATANODES_PATH + "/" + child);
-      ByteArrayInputStream bais = new ByteArrayInputStream(data);
-      DataInputStream ds = new DataInputStream(bais);
-      DatanodeRegistration registration = new DatanodeRegistration();
-      registration.readFields(ds);
-      nodes.put(child, registration);
+    try {
+      List<String> children = keeper.getChildren().forPath(DNNConstants.DATANODES_PATH);
+      for (String child : children) {
+        byte[] data = keeper.getData().forPath(DNNConstants.DATANODES_PATH + "/" + child);
+        ByteArrayInputStream bais = new ByteArrayInputStream(data);
+        DataInputStream ds = new DataInputStream(bais);
+        DatanodeRegistration registration = new DatanodeRegistration();
+        registration.readFields(ds);
+        nodes.put(child, registration);
+      }
+    } catch (Exception ex) {
+      throw new IOException(ex);
     }
     return nodes;
   }
@@ -310,37 +442,31 @@ public class ZookeeperNameNode implements FakeNameNode {
   @Override
   public boolean complete(String src, String clientName) throws IOException {
     log.info("using complete " + src);
+    if (!isZooName(src))
+      return dist().complete(src, clientName);
     String path = DNNConstants.NAMESPACE_PATH + src;
-    while (true) {
-      try {
-        boolean retry = false;
-        long length = 0;
-        for (String child : keeper.getChildren().forPath(path)) {
-          Object object = deserialize(keeper.getData().forPath(path + "/" + child));
-          if (object instanceof BlockInfo) {
-            BlockInfo info = (BlockInfo)object;
-            Block block = new Block(info.id);
-            info = (BlockInfo)deserialize(keeper.getData().forPath(DNNConstants.BLOCKS_PATH + "/" + block.getBlockName()));
-            log.info("Block size for " + info.id + " is " + info.size);
-            length += info.size;
-            if (!info.complete) {
-              retry = true;
-              break;
-            }
-          }
-        }
-        if (retry) {
-          UtilWaitThread.sleep(250);
-          continue;
+    try {
+      long length = 0;
+      for (String child : keeper.getChildren().forPath(path)) {
+        Object object = deserialize(keeper.getData().forPath(path + "/" + child));
+        if (object instanceof BlockInfo) {
+          BlockInfo info = (BlockInfo)object;
+          Block block = new Block(info.id);
+          info = (BlockInfo)deserialize(keeper.getData().forPath(DNNConstants.BLOCKS_PATH + "/" + block.getBlockName()));
+          log.info("Block size for " + info.id + " is " + info.size);
+          length += info.size;
+          if (!info.complete)
+            return false;
         }
-        FileInfo info = (FileInfo)deserialize(keeper.getData().forPath(path));
-        info.size = length;
-        keeper.setData().forPath(path, serialize(info));
-        log.info("updated file length of " + src + " to " + length);
-        return true;
-      } catch (Exception ex) {
-        log.error(ex, ex);
       }
+      FileInfo info = (FileInfo)deserialize(keeper.getData().forPath(path));
+      info.size = length;
+      keeper.setData().forPath(path, serialize(info));
+      log.info("updated file length of " + src + " to " + length);
+      return true;
+    } catch (Exception ex) {
+      log.error(ex, ex);
+      return false;
     }
   }
   
@@ -373,6 +499,10 @@ public class ZookeeperNameNode implements FakeNameNode {
   @Override
   public boolean rename(String src, String dst) throws IOException {
     log.info("rename " + src + " -> " + dst);
+    if (isZooName(src) != isZooName(dst))
+      throw new IOException("You cannot rename files across zookeeper metadata and accumulo metadata");
+    if (!isZooName(src))
+      return dist().rename(src, dst);
     try {
       Object srcInfo = getInfo(DNNConstants.NAMESPACE_PATH + src);
       Object dstInfo = getInfo(DNNConstants.NAMESPACE_PATH + dst);
@@ -392,18 +522,20 @@ public class ZookeeperNameNode implements FakeNameNode {
         else
           delete(dst);
       }
-      recursivelyCopy(DNNConstants.NAMESPACE_PATH + src, DNNConstants.NAMESPACE_PATH + dst);
+      String nsSrc = DNNConstants.NAMESPACE_PATH + src;
+      recursivelyCopy(nsSrc, DNNConstants.NAMESPACE_PATH + dst);
+      recursivelyDelete(nsSrc, false);
       return true;
     } catch (Exception ex) {
       throw new IOException(ex);
     }
   }
   
-  private String basename(String src) {
+  private static String basename(String src) {
     return src.substring(src.lastIndexOf("/") + 1);
   }
 
-  private String getParent(String dst) {
+  private static String getParent(String dst) {
     return dst.substring(0, dst.lastIndexOf("/"));
   }
 
@@ -414,23 +546,29 @@ public class ZookeeperNameNode implements FakeNameNode {
   
   @Override
   public boolean delete(String src, boolean recursive) throws IOException {
+    log.info("attempting delete " + src);
+    if (!isZooName(src))
+      return dist().delete(src);
+    
     try {
-      recursivelyDelete(DNNConstants.NAMESPACE_PATH + src);
+      recursivelyDelete(DNNConstants.NAMESPACE_PATH + src, true);
     } catch (Exception ex) {
       throw new IOException(ex);
     }
     return true;
   }
   
-  private void recursivelyDelete(String path) throws Exception {
+  private void recursivelyDelete(String path, boolean removeBlocks) throws Exception {
+    log.info("deleting " + path);
     List<String> children = null;
     try {
       children = keeper.getChildren().forPath(path);
     } catch (KeeperException.NoNodeException ex) {
       return;
     }
+    log.info("children of " + path + " is " + children);
     Object obj = deserialize(keeper.getData().forPath(path));
-    if (obj instanceof FileInfo) {
+    if (removeBlocks && obj instanceof FileInfo) {
       // create the datanode command to (eventually) delete the blocks
       Map<String, List<Long>> hostToBlockMap = new HashMap<String, List<Long>>();
       for (String child : children) {
@@ -443,7 +581,9 @@ public class ZookeeperNameNode implements FakeNameNode {
               hostToBlockMap.put(node, blocks = new ArrayList<Long>());
             blocks.add(block.id);
           }
-          keeper.delete().forPath(DNNConstants.BLOCKS_PATH + "/" + new Block(block.id, 0, 0).getBlockName());
+          String blockPath = DNNConstants.BLOCKS_PATH + "/" + new Block(block.id, 0, 0).getBlockName();
+          log.info("deleting " + blockPath);
+          keeper.delete().forPath(blockPath);
         }
       }
       for (Entry<String,List<Long>> entry : hostToBlockMap.entrySet()) {
@@ -457,14 +597,18 @@ public class ZookeeperNameNode implements FakeNameNode {
       }
     }
     for (String child : children) {
-      recursivelyDelete(path + "/" + child);
+      recursivelyDelete(path + "/" + child, removeBlocks);
     }
+    log.info("deleting " + path);
     keeper.delete().forPath(path);
   }
 
   @Override
   public boolean mkdirs(String src, FsPermission masked) throws IOException {
     log.info("mkdirs " + src);
+    if (!isZooName(src))
+      return dist().mkdirs(src, masked);
+    
     try {
       DirInfo dirInfo = new DirInfo(System.currentTimeMillis());
       byte[] data = serialize(dirInfo);
@@ -520,6 +664,10 @@ public class ZookeeperNameNode implements FakeNameNode {
   
   @Override
   public DirectoryListing getListing(String src, byte[] startAfter) throws IOException {
+    log.info("getListing " + src);
+    if (!isZooName(src))
+      return dist().getListing(src, startAfter);
+    
     try {
       String basePath = DNNConstants.NAMESPACE_PATH + src;
       List<String> children = keeper.getChildren().forPath(basePath);
@@ -538,73 +686,70 @@ public class ZookeeperNameNode implements FakeNameNode {
   
   @Override
   public void renewLease(String clientName) throws IOException {
-    // TODO Auto-generated method stub
-    
+    log.info("renewLease " + clientName);
   }
   
   @Override
   public long[] getStats() throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented();
     return null;
   }
   
   @Override
   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(type);
     return null;
   }
   
   @Override
   public long getPreferredBlockSize(String filename) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(filename);
     return 0;
   }
   
   @Override
   public boolean setSafeMode(SafeModeAction action) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(action);
     return false;
   }
   
   @Override
   public void saveNamespace() throws IOException {
-    // TODO Auto-generated method stub
-    
+   unimplemented();
   }
   
   @Override
   public void refreshNodes() throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented();
   }
   
   @Override
   public void finalizeUpgrade() throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented();
   }
   
   @Override
   public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented();
     return null;
   }
   
   @Override
   public void metaSave(String filename) throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(filename);
   }
   
   @Override
   public void setBalancerBandwidth(long bandwidth) throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(bandwidth);
   }
   
   @Override
   public HdfsFileStatus getFileInfo(String src) throws IOException {
-    log.info("Get file status");
+    log.info("getFileInfo " + src);
+    if (!isZooName(src))
+      return dist().getFileInfo(src);
+    
     try {
       byte[] data = keeper.getData().forPath(DNNConstants.NAMESPACE_PATH + src);
       HdfsFileStatus result = decodeFile(src, data);
@@ -624,54 +769,51 @@ public class ZookeeperNameNode implements FakeNameNode {
   
   @Override
   public ContentSummary getContentSummary(String path) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(path);
     return null;
   }
   
   @Override
   public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(path, namespaceQuota, diskspaceQuota);
   }
   
   @Override
   public void fsync(String src, String client) throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(src, client);
   }
   
   @Override
   public void setTimes(String src, long mtime, long atime) throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(src, mtime, atime);
   }
   
   @Override
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(renewer);
     return null;
   }
   
   @Override
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(token);
     return 0;
   }
   
   @Override
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(token);
   }
   
   @Override
   public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(protocol, clientVersion);
     return 0;
   }
   
   @Override
   public DatanodeRegistration register(DatanodeRegistration registration) throws IOException {
+    log.info("register " + registration);
     if (keeper != null) {
       log.info("registering in zookeeper as " + registration.name);
       ByteArrayOutputStream stream = new ByteArrayOutputStream();
@@ -698,12 +840,18 @@ public class ZookeeperNameNode implements FakeNameNode {
         throw new IOException(e);
       }
     }
+    findDatanodes();
+    FakeNameNode dist = dist();
+    if (dist != null) {
+      dist.register(registration);
+    }
     return registration;
   }
   
   @Override
   public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, long capacity, long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount)
       throws IOException {
+    log.info("sendHeartbeat " + registration);
     List<DatanodeCommand> commands = new ArrayList<DatanodeCommand>();
     try {
       String commandsPath = DNNConstants.DATANODES_PATH + "/" + registration.getName() + "/commands";
@@ -715,15 +863,30 @@ public class ZookeeperNameNode implements FakeNameNode {
     } catch (Exception e) {
       throw new IOException(e);
     }
+    findDatanodes();
+    if (dist != null) {
+      DatanodeCommand[] cmds = dist.sendHeartbeat(registration, capacity, dfsUsed, remaining, xmitsInProgress, xceiverCount);
+      if (cmds != null) {
+        for (DatanodeCommand cmd : cmds) {
+          commands.add(cmd);
+        }
+      }
+    }
     return commands.toArray(new DatanodeCommand[0]);
   }
   
   @Override
   public DatanodeCommand blockReport(DatanodeRegistration registration, long[] blocks) throws IOException {
+    log.info("blockReport " + registration);
+    if (dist != null) {
+      return dist.blockReport(registration, blocks);
+    }
     BlockListAsLongs blist = new BlockListAsLongs(blocks);
     Set<Long> current = new HashSet<Long>();
     for (int i = 0; i < blist.getNumberOfBlocks(); i++) {
-      current.add(blist.getBlockId(i));
+      if (isZooBlockId(blist.getBlockId(i))) {
+        current.add(blist.getBlockId(i));
+      }
     }
     log.info(registration.name + " reports " + current);
     return null;
@@ -731,32 +894,38 @@ public class ZookeeperNameNode implements FakeNameNode {
   
   @Override
   public void blocksBeingWrittenReport(DatanodeRegistration registration, long[] blocks) throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(registration, new BlockListAsLongs(blocks));
   }
   
   @Override
   public void blockReceived(DatanodeRegistration registration, Block[] blocks, String[] delHints) throws IOException {
     log.info("blockRecieved " + Arrays.asList(blocks));
+    FakeNameNode dist = dist();
     for (Block block : blocks) {
-      String path = DNNConstants.BLOCKS_PATH + "/" + block.getBlockName();
-      try {
-        BlockInfo info = (BlockInfo)deserialize(keeper.getData().forPath(path));
-        info.size = block.getNumBytes();
-        info.complete = true;
-        byte[] data = serialize(info);
-        keeper.setData().forPath(path, data);
-        log.info("Block size updated on " + block + " to " + info.size);
-      } catch (Exception e) {
-        log.error(e, e);
+      if (!isZooBlockId(block.getBlockId()) && dist == null) {
+        throw new IOException("blockReceived for distributed name node, but there are no data nodes yet! " + block.getBlockId());
+      }
+      if (isZooBlockId(block.getBlockId())) {
+        String path = DNNConstants.BLOCKS_PATH + "/" + block.getBlockName();
+        try {
+          BlockInfo info = (BlockInfo)deserialize(keeper.getData().forPath(path));
+          info.size = block.getNumBytes();
+          info.complete = true;
+          byte[] data = serialize(info);
+          keeper.setData().forPath(path, data);
+          log.info("Block size updated on " + block + " to " + info.size);
+        } catch (Exception e) {
+          log.error(e, e);
+        }
       }
     }
+    if (dist != null)
+      dist.blockReceived(registration, blocks, delHints);
   }
   
   @Override
   public void errorReport(DatanodeRegistration registration, int errorCode, String msg) throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(registration, errorCode, msg);
   }
   
   @Override
@@ -771,21 +940,20 @@ public class ZookeeperNameNode implements FakeNameNode {
   
   @Override
   public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(comm);
     return null;
   }
   
   @Override
   public long nextGenerationStamp(Block block, boolean fromNN) throws IOException {
-    // TODO Auto-generated method stub
+    unimplemented(block, fromNN);
     return 0;
   }
   
   @Override
   public void commitBlockSynchronization(Block block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
       throws IOException {
-    // TODO Auto-generated method stub
-    
+    unimplemented(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
   }
   
 }


Mime
View raw message