accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1371432 [3/4] - in /accumulo/branches/ACCUMULO-722/distnn: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/hadoop/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop...
Date Thu, 09 Aug 2012 20:21:33 GMT
Added: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java?rev=1371432&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java (added)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java Thu Aug  9 20:21:32 2012
@@ -0,0 +1,1596 @@
+/**
+ * To Do
+ * 
+ *  add hierarchical locking
+ * 	add support for permissions
+ * 	add support for leasing
+ * 	add support for block generations?
+ * 
+ * finish namenode actions:
+ * 	TEST::implement delete
+ * 	TEST::implement mkdirs
+ * 
+ * finish datanode actions:
+ * 	error reporting
+ * 
+ * 
+ * 
+ * store this kind of stuff in namespaceTable for each file:
+ * 
+ * Path path
+ * long length
+ * boolean isdir
+ * short block_replication
+ * long blocksize
+ * long modification_time
+ * long access_time
+ * FsPermission permission
+ * String owner
+ * String group
+ * 
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DNNConstants;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class DistributedNamenodeProxy implements FakeNameNode {
+  public static class ConnectInfo {
+    public ConnectInfo(URI uri) {
+      String userInfo = uri.getUserInfo();
+      log.info("userInfo " + userInfo);
+      for (String part : userInfo.split(";")) {
+        String parts[] = part.split("=");
+        String attr = parts[0];
+        String value = parts[1];
+        if (attr.equals("user")) {
+          this.username = value;
+        } else if (attr.equals("pass")) {
+          this.passwd = value.getBytes();
+        } else if (attr.equals("keepers")) {
+          this.zookeepers = value;
+        } else if (attr.equals("instance")) {
+          this.instance = value;
+        } else {
+          throw new RuntimeException("unknown entry " + attr + " in authority information");
+        }
+      }
+    }
+    public String username;
+    public byte[] passwd;
+    public String zookeepers;
+    public String instance;
+  }
+
+  private static Logger log = Logger.getLogger(DistributedNamenodeProxy.class);
+  
+  long start = System.currentTimeMillis();
+
+  private class Replicator {
+    
+    // TODO: make this respect configured replication settings
+    
+    private HashSet<String> targets;
+    
+    Replicator() {
+      targets = new HashSet<String>();
+    }
+    
+    DatanodeInfo[] getReplicationTargets() throws IOException {
+      
+      // TODO: periodically scan the datanodes table to find new datanodes
+      if(targets.size() == 0)
+        scanDatanodes();
+      
+      // pick nodes at random
+      // TODO: take into account whether a datanode is too full to host another block
+      // the old namenode would also have a hard limit on the total number
+      // of fs objects it could store 
+      int replicationFactor = 1;
+      
+      if(targets.size() < replicationFactor)
+        throw new IOException("unable to achieve required replication: too few datanodes running");
+      
+      HashSet<String> targetSetNames = new HashSet<String>();
+      
+      HashSet<DatanodeInfo> targetSet = new HashSet<DatanodeInfo>();
+      for(int i=0; i < replicationFactor; i++) {
+        int r = rand.nextInt(targets.size());
+        
+        Iterator<String> iter = targets.iterator();
+        for(int j=0; j < r-1; iter.next());
+        String target = iter.next();
+        
+        // don't create two replicas on the same target
+        while(targetSetNames.contains(target)) {
+          r = rand.nextInt(targets.size());
+          iter = targets.iterator();
+          for(int j=0; j < r-1; iter.next());
+          target = iter.next();
+        }
+        
+        targetSet.add(new DatanodeInfo(new DatanodeID(target)));
+      }
+      
+      DatanodeInfo[] targetSetArray = targetSet.toArray(new DatanodeInfo[targetSet.size()]);
+      return targetSetArray;
+    }
+    
+    
+    // rescan the datanodes table to keep the set of datanodes up to date
+    // other client code can also help us remove nodes from this set when reporting
+    // failed interactions with a datanode
+    private void scanDatanodes() throws IOException {
+      log.info("scanning datanodes table ..");
+      targets.clear();
+      BatchScanner scanner = createBatchScanner(datanodesTable, new Range());
+      ColumnFQ.fetch(scanner, remaining);
+      try {
+        for (Entry<Key,Value> entry : scanner) {
+          targets.add(entry.getKey().getRow().toString());
+        }
+      } finally {
+        scanner.close();
+      }
+    }
+  } 
+  
+  static private BatchScanner createBatchScanner(Connector conn, String table, Range ... ranges) throws IOException {
+    try {
+      BatchScanner result = conn.createBatchScanner(table, Constants.NO_AUTHS, QUERY_THREADS);
+      result.setRanges(Arrays.asList(ranges));
+      return result;
+    } catch (TableNotFoundException ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  private static BatchWriter createBatchWriter(Connector conn, String table) throws IOException {
+    try {
+      return conn.createBatchWriter(table, 10*1000, 1000, 4);
+    } catch (TableNotFoundException ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  private static byte[] getParentPath(String src) {
+    if(src.equals("/"))
+      return "/".getBytes();
+    
+    String[] components = src.split(Path.SEPARATOR);
+    
+    StringBuilder sb = new StringBuilder();
+    for(int i=0; i < components.length - 1; i++)
+      sb.append(components[i] + "/");
+    
+    if(sb.length() > 1)
+      sb.deleteCharAt(sb.length()-1);
+    
+    return sb.toString().getBytes();
+  }
+  
+  public static String normalizePath(String src) {
+    if (src.length() > 1 && src.endsWith("/")) {
+      src = src.substring(0, src.length() - 1);
+    }
+    return src;
+  }
+  
+  private static void unimplemented(Object ... args) {
+    Throwable t = new Throwable();
+    String method = t.getStackTrace()[1].getMethodName();
+    log.warn(method + " unimplemented, args: " + Arrays.asList(args), t);
+  }
+  
+  private Random rand = new Random();
+  private Replicator replicator = new Replicator();
+  private Connector conn;
+  private final static String namespaceTable = "namespace";
+  private final static String blocksTable = "blocks";
+  private final static String datanodesTable = "datanodes";
+  private final static Text infoFam = new Text("info");
+  private final static Text childrenFam = new Text("children");
+  private final static Text blocksFam = new Text("blocks");
+  private final static Text datanodesFam = new Text("datanodes");
+  private final static Text commandFam = new Text("command");
+  private final static ColumnFQ remaining = new ColumnFQ(infoFam, new Text("remaining"));
+  
+  private final static ColumnFQ infoSize = new ColumnFQ(infoFam, new Text("size"));
+  private final static ColumnFQ isDir = new ColumnFQ(infoFam, new Text("isDir"));
+  private final static ColumnFQ infoCapacity = new ColumnFQ(infoFam, new Text("capacity"));
+  private final static ColumnFQ infoUsed = new ColumnFQ(infoFam, new Text("used"));
+  private final static ColumnFQ infoReplication = new ColumnFQ(infoFam, new Text("replication"));
+  private final static ColumnFQ infoBlockSize = new ColumnFQ(infoFam, new Text("blocksize"));
+  private final static ColumnFQ infoModificationTime = new ColumnFQ(infoFam, new Text("create_time"));
+  private final static ColumnFQ infoStorageID = new ColumnFQ(infoFam, new Text("storageID"));
+  private final static ColumnFQ infoPermission = new ColumnFQ(infoFam, new Text("permission"));
+  
+  private final static Value blank = new Value(new byte[]{});
+  
+  private final static int QUERY_THREADS = 10;
+  
+  //private HealthProtocol healthServer;
+
+  private long lastCapacity = -1;
+  
+  private long lastDfsUsed = -1;
+  
+  private long lastRemaining = -1;
+  
+  private ZooKeeper zookeeper = null;
+  private String instanceName = null;
+  private String keepers = null;
+  private final String username = "root";
+  private byte[] passwd = "secret".getBytes();
+  
+  private Connector getConnector() {
+    synchronized (this) {
+      if (conn == null) {
+        try {
+          Instance instance = new ZooKeeperInstance(instanceName, keepers);
+          conn = instance.getConnector(username, passwd);
+        } catch (Exception ex) {
+          conn = null;
+          log.warn("Unable to get connector " + ex);
+        }
+      }
+    }
+    return conn;
+  }
+  
+  public DistributedNamenodeProxy(Configuration conf) throws IOException {
+    instanceName = conf.get("accumulo.zookeeper.instance");
+    keepers = conf.get("accumulo.zookeeper.keepers");
+    zookeeper = new ZooKeeper(keepers, 30000, new Watcher() {
+      @Override
+      public void process(WatchedEvent arg0) {
+        log.info("zookeeper says " + arg0);
+      }
+    });
+    for (String name : new String[] {DNNConstants.DNN, DNNConstants.BLOCKS_PATH, DNNConstants.DATANODES_PATH, DNNConstants.NAMESPACE_PATH}) {
+      try {
+        zookeeper.create(name, new byte[]{}, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      } catch (KeeperException.NodeExistsException ex) {
+        // ya, ya, don't care
+      } catch (Exception ex) {
+        throw new IOException(ex);
+      } 
+    }
+  }
+  
+  public DistributedNamenodeProxy(Connector conn) throws IOException {
+    log.info("========= Distributed Name Node Proxy init =========");
+    this.conn = conn;
+    
+    //		String healthNodeHost = config.get("healthnode");
+    //		if(healthNodeHost == null)
+    //			throw new IOException("error: no healthnode address specified. add one to core-site.xml");
+    
+    //InetSocketAddress healthNodeAddr = new InetSocketAddress(healthNodeHost, 9090);
+    
+    
+    //healthServer = (HealthProtocol)RPC.getProxy(HealthProtocol.class,
+    //		HealthProtocol.versionID, healthNodeAddr, config,
+    //	        NetUtils.getSocketFactory(config, HealthProtocol.class));
+    
+  }
+  
+  
+  @Override
+  public void abandonBlock(Block b, String src, String holder)
+      throws IOException {
+    log.info("using abandonBlock");
+    
+  }
+  
+  @Override
+  public LocatedBlock addBlock(String arg0, String arg1) throws IOException {
+    return addBlock(arg0, arg1, new DatanodeInfo[0]);
+  }
+  
+  /**
+   * The client would like to obtain an additional block for the indicated
+   * filename (which is being written-to).  Return an array that consists
+   * of the block, plus a set of machines.  The first on this list should
+   * be where the client writes data.  Subsequent items in the list must
+   * be provided in the connection to the first datanode.
+   *
+   * Make sure the previous blocks have been reported by datanodes and
+   * are replicated.  Will return an empty 2-elt array if we want the
+   * client to "try again later".
+   * @throws IOException 
+   */
+  @Override
+  public LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludeNodes)
+      throws IOException {
+    log.info("using addBlock " + src + " " + clientName);
+    
+    // create new blocks on data nodes
+    //   zookeeper holds the negative numbered blocks
+    long blockID = Math.abs(rand.nextLong());
+    byte[] blockIDBytes = Long.toString(blockID).getBytes();
+    
+    Block b = new Block(blockID, 0, 0);
+    
+    // choose a set of nodes on which to replicate block
+    DatanodeInfo[] targets = replicator.getReplicationTargets(); 
+    
+    // TODO: get a lease to the first
+    // TODO: can we record all this in the namespace table?
+    // i.e. do we ever need to lookup blocks without knowing the associated files?
+    // blockReceived() doesn't know the file mapping
+    
+    // record block to host mapping and vice versa
+    recordBlockHosts(blockIDBytes, targets);
+    
+    // get the last block ID
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    bs.fetchColumnFamily(blocksFam);
+    
+    int blockPos = 0;
+    try {
+      for (@SuppressWarnings("unused") Entry<Key,Value> entry : bs) {
+        blockPos++;
+      }
+    } finally {
+      bs.close();
+    }
+    
+    // record file to block mapping
+    Mutation nameData = new Mutation(new Text(src.getBytes()));
+    nameData.put(blocksFam, new Text(String.format("%08d_%d", blockPos, blockID).getBytes()), blank);
+    BatchWriter bw = createBatchWriter(namespaceTable);
+    try {
+      try {
+        bw.addMutation(nameData);
+      } finally {
+        bw.close();  
+      }
+    } catch (MutationsRejectedException ex) {
+      throw new IOException(ex);
+    }
+    LocatedBlock newBlock = new LocatedBlock(b, targets);
+    log.info("addBlock new block for " + src + " " + b.getBlockName());
+    return newBlock;
+  }
+  
+  @Override
+  public LocatedBlock append(String src, String clientName)
+      throws IOException {
+    log.info("using append");
+    return null;
+  }
+  
+  /** ------------ Data Node Protocol Methods -----------
+   * 
+   */
+  
+  @Override
+  public void blockReceived(DatanodeRegistration registration, Block[] blocks, String[] delHints) throws IOException {
+    log.info("using blockReceived");
+    
+    // for each block we should have recorded its existence already
+    // we should also know about the datanode
+    
+    // update blocks table
+    BatchWriter bw = createBatchWriter(blocksTable);
+    try {
+      try {
+        for(Block b : blocks) {
+          Mutation blockData = new Mutation(new Text(Long.toString(b.getBlockId())));
+          ColumnFQ.put(blockData, infoBlockSize, new Value(Long.toString(b.getNumBytes()).getBytes()));
+          bw.addMutation(blockData);
+        }
+      } finally {
+        bw.close();
+      }
+      // update total file space ?
+    } catch (MutationsRejectedException ex) {
+      throw new IOException(ex);
+    }
+    
+  }
+  
+  @Override
+  public DatanodeCommand blockReport(DatanodeRegistration registration,
+      long[] blocks) throws IOException {
+    log.info("using blockReport");
+    BlockListAsLongs blist = new BlockListAsLongs(blocks);
+    Set<Long> current = new HashSet<Long>();
+    for (int i = 0; i < blist.getNumberOfBlocks(); i++) {
+      current.add(blist.getBlockId(i));
+    }
+    log.info(registration.getName() + " reports blocks " + current);
+    BatchWriter bw = createBatchWriter(blocksTable);
+    Mutation m = new Mutation(registration.getName());
+    Scanner scan = createScanner(datanodesTable);
+    scan.setRange(new Range(registration.getName()));
+    scan.fetchColumnFamily(blocksFam);
+    try {
+      for (Entry<Key,Value> entry : scan) {
+        long block = Long.parseLong(entry.getKey().getColumnQualifier().toString());
+        if (!current.remove(block)) {
+          // found some block, not in the blocklist, remove the entry
+          m.putDelete(blocksFam, entry.getKey().getColumnQualifier());
+        }
+      }
+      if (!m.getUpdates().isEmpty())
+        bw.addMutation(m);
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    } finally {
+      try {
+        bw.close();
+      } catch (MutationsRejectedException ex) {
+        throw new IOException(ex);
+      }
+    }
+    List<Block> deleteList = new ArrayList<Block>(current.size());
+    for (Long id : current) {
+      deleteList.add(new Block(id));
+    }
+    log.debug("Asking " + registration.getName() + " to delete blocks " + deleteList);
+    return new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, deleteList.toArray(new Block[0]));
+  }
+  
+  @Override
+  public void blocksBeingWrittenReport(DatanodeRegistration arg0, long[] arg1) throws IOException {
+    unimplemented(arg0, arg1);
+  }
+  
+  @Override
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException {
+    unimplemented(token);
+  }
+  
+  @Override
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength, boolean closeFile,
+      boolean deleteblock, DatanodeID[] newtargets) throws IOException {
+    log.info("using commitBlockSynchronization");
+    
+  }
+  
+  @Override
+  public boolean complete(String src, String clientName) throws IOException {
+    log.info("using complete " + src);
+    
+    // write complete status to namespace?
+    // does this just help avoid mutations to existent complete files?
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    bs.fetchColumnFamily(blocksFam);
+    List<Range> ranges = new ArrayList<Range>();
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        String orderBlock = entry.getKey().getColumnQualifier().toString();
+        ranges.add(new Range(orderBlock.split("_")[1]));
+      }
+    } finally {
+      bs.close();
+    }
+    log.info("have ranges " + ranges);
+    if (ranges.isEmpty())
+      return true;
+    long fileSize = 0;
+    retry:
+    while (true) {
+      BatchScanner blockScanner = createBatchScanner(blocksTable, ranges.toArray(new Range[]{}));
+      ColumnFQ.fetch(blockScanner, infoBlockSize);
+      fileSize = 0;
+      int count = 0;
+      try {
+        for (Entry<Key,Value> entry : blockScanner) {
+          log.info("Looking at block sizes " + entry.getKey() + " -> " + entry.getValue());
+          long blockSize = Long.parseLong(new String(entry.getValue().get()));
+          if (blockSize == 0) {
+            UtilWaitThread.sleep(250);
+            continue retry;
+          }
+          fileSize += blockSize;
+          count++;
+        }
+      } finally {
+        blockScanner.close();
+      }
+      if (count != ranges.size()) {
+        log.info("Did not read block sizes for all blocks on file " + src + " read " + count + " but expected " + ranges.size());
+        UtilWaitThread.sleep(250);
+        continue;
+      }
+      break;
+    }
+    
+    // write size to namespace table
+    Mutation fileSizePut = new Mutation(new Text(src.getBytes()));
+    ColumnFQ.put(fileSizePut, infoSize, new Value(Long.toString(fileSize).getBytes()));
+    BatchWriter bw = createBatchWriter(namespaceTable);
+    try {
+      try {
+        bw.addMutation(fileSizePut);
+      } finally {
+        bw.close();
+      }
+    } catch (MutationsRejectedException ex) {
+      throw new IOException(ex);
+    }
+    return true;
+  }
+  
+  public int computeDatanodeWork() {
+    log.info("using computeDatanodeWork");
+    // TODO: how is this number used by the caller?
+    return 0;
+  }
+  
+  static private void put(Mutation m, ColumnFQ cfq, String value) {
+    ColumnFQ.put(m, cfq, new Value(value.getBytes()));
+  }
+  
+  static private Value now() {
+    return new Value(Long.toString(System.currentTimeMillis()).getBytes());
+  }
+  
+  @Override
+  public void create(String src, FsPermission masked, String clientName, boolean overwrite, boolean createParent, short replication, long blockSize) throws IOException {
+    log.info("using create");
+    
+    // verify that parent directories exist
+    byte[] parent = getParentPath(src);
+    String isDirFlag = null;
+    ColumnFQ srcColumn = new ColumnFQ(childrenFam, new Text(src));
+    
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(parent)));
+    ColumnFQ.fetch(bs, isDir);
+    bs.fetchColumnFamily(childrenFam);
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        if (isDir.hasColumns(entry.getKey()))
+        {
+          isDirFlag = new String(entry.getValue().get());
+        }
+        
+        if (srcColumn.hasColumns(entry.getKey())) {
+          throw new IOException("file exists: " + src);
+        }
+      }
+    } finally {
+      bs.close();
+    }
+    if (isDirFlag == null) {
+      if (!createParent)
+        throw new IOException("Parent directory does not exist " + new Text(parent));
+      this.mkdirs(new String(parent), FsPermission.getDefault());
+    }
+    if ("N".equals(isDirFlag)) {
+      throw new IOException("Parent entry is not a directory " + new Text(parent));
+    }
+    
+    // TODO: check access permissions
+    
+    // edit namespace table to create this file
+    
+    /*
+     * not yet recorded:
+     * 
+     * long length
+     * long modification_time
+     * long access_time
+     * String owner
+     * String group
+     */
+    
+    // TODO: not atomic
+    try {
+      BatchWriter bw = createBatchWriter(namespaceTable);
+      try {
+        Mutation createRequest = new Mutation(new Text(src));
+        ColumnFQ.put(createRequest, infoModificationTime, now());
+        put(createRequest, infoReplication, Short.toString(replication));
+        put(createRequest, infoBlockSize, Long.toString(blockSize));
+        put(createRequest, infoPermission, masked.toString());
+        put(createRequest, isDir, "N");
+        bw.addMutation(createRequest);
+        
+        // record existence of new file in parent dir now or on complete?
+        Mutation childCreate = new Mutation(new Text(getParentPath(src)));
+        // TODO: could store that this is a file in the Value
+        childCreate.put(childrenFam, new Text(src.getBytes()), blank);
+        bw.addMutation(childCreate);
+      } finally {
+        bw.close();
+      }
+    } catch (MutationsRejectedException ex) {
+      throw new IOException(ex);
+    }
+    
+    // TODO: verify replication
+    
+    // TODO: grant a lease to the client??
+    // GFS grants the lease to the primary datanode, not the client
+  }
+  
+  @Override
+  public void create(String src, FsPermission masked, String clientName,
+      boolean overwrite, short replication, long blockSize)
+          throws IOException {
+    create(src, masked, clientName, overwrite, true, replication, blockSize);
+  }
+  
+  private BatchScanner createBatchScanner(String table, Range ... ranges) throws IOException {
+    return createBatchScanner(conn, table, ranges);
+  }
+  
+  private Scanner createScanner(String table) throws IOException {
+    try {
+      return conn.createScanner(table, Constants.NO_AUTHS);
+    } catch (TableNotFoundException ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  private BatchWriter createBatchWriter(String table) throws IOException {
+    return createBatchWriter(conn, table);
+  }
+  
+  @Override
+  public boolean delete(String src) throws IOException {
+    log.info("using delete");
+    // NameNode code calls this with recursive=true as default
+    return delete(src, true);
+  }
+  
+  @Override
+  public boolean delete(String src, boolean recursive) throws IOException {
+    log.info("using delete " + src);
+    // check permissions - how?
+    
+    byte[] parent = getParentPath(src);
+    
+    // determine whether this is a directory
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    ColumnFQ.fetch(bs, isDir);
+    bs.fetchColumnFamily(childrenFam);
+    
+    String isDir_ = null;
+    ArrayList<Text> children = new ArrayList<Text>();
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        if (isDir.hasColumns(entry.getKey())) {
+          isDir_ = entry.getKey().getColumnQualifier().toString();
+        } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+          children.add(entry.getKey().getColumnQualifier());
+        }
+      }
+    } finally {
+      bs.close();
+    }
+    if("Y".equals(isDir_) && !recursive && children.size() > 0)
+      throw new IOException("can't delete directory. not empty");
+    
+    Mutation childDelete = new Mutation(new Text(parent));
+    Text srcText = new Text(src);
+    childDelete.putDelete(childrenFam, srcText);
+    ColumnFQ.put(childDelete, infoModificationTime, now());
+    
+    ArrayList<Mutation> deletes = new ArrayList<Mutation>();
+    getDeletes(srcText, deletes);
+    deletes.add(childDelete);
+    
+    // delete everything at once
+    BatchWriter nw = createBatchWriter(namespaceTable);
+    Set<Text> blocks = new HashSet<Text>();
+    try {
+      try {
+        nw.addMutations(deletes);
+        for (Mutation m : deletes) {
+          for (ColumnUpdate update : m.getUpdates()) {
+            byte cf[] = update.getColumnFamily();
+            if (blocksFam.compareTo(cf, 0, cf.length) == 0) {
+              blocks.add(new Text(new String(update.getColumnQualifier()).split("_", 2)[1]));
+            }
+          }
+        }
+      } finally {
+        nw.close();
+      }
+      if (blocks.isEmpty())
+        return true;
+      log.info("deleting blocks "+ blocks);
+      Map<String, List<String>> hostBlockMap = new HashMap<String, List<String>>();
+      // Now remove the blocks
+      BatchWriter bw = createBatchWriter(blocksTable);
+      // scan back the entries that go with the blocks
+      List<Range> ranges = new ArrayList<Range>();
+      for (Text row : blocks) {
+        ranges.add(new Range(row));
+      }
+      bs = createBatchScanner(blocksTable, ranges.toArray(new Range[0]));
+      // delete everything that matches our block list
+      for (Entry<Key,Value> entry : bs) {
+        if (blocks.contains(entry.getKey().getRow())) {
+          Mutation m = new Mutation(entry.getKey().getRow());
+          m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+          bw.addMutation(m);
+          if (entry.getKey().getColumnFamily().equals(datanodesFam)) {
+            String host = entry.getKey().getColumnQualifier().toString();
+            String block = entry.getKey().getRow().toString();
+            List<String> blockList = null;
+            if ((blockList = hostBlockMap.get(host)) == null) {
+              hostBlockMap.put(host, blockList = new ArrayList<String>());
+            }
+            blockList.add(block);
+          }
+        }
+      }
+      bw.close();
+      log.info("Host -> block map " + hostBlockMap);
+
+      // Create commands to remove the blocks on the datanodes at the next heartbeat
+      bw = createBatchWriter(datanodesTable);
+      for (Entry<String,List<String>> entry : hostBlockMap.entrySet()) {
+        String host = entry.getKey();
+        Block block[] = new Block[entry.getValue().size()];
+        int i = 0;
+        for (String blockString : entry.getValue()) {
+          block[i++] = new Block(Long.parseLong(blockString), 0, 0);
+        }
+        Mutation m = new Mutation(host);
+        DatanodeCommand cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, block);
+        m.put(commandFam, new Text(UUID.randomUUID().toString()), new Value(serialize(cmd)));
+        bw.addMutation(m);
+      }
+      bw.close();
+    } catch (MutationsRejectedException ex) {
+      throw new IOException(ex);
+    } catch (Exception ex) {
+      log.info(ex, ex);
+    }
+    // TODO: when to return false?
+    return true;
+  }
+  
+  @Override
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
+      throws IOException {
+    log.info("using distributedUpgradeProgress");
+    return null;
+  }
+  
+  @Override
+  public void errorReport(DatanodeRegistration registration, int errorCode,
+      String msg) throws IOException {
+    log.info("using errorReport");
+    
+  }
+  
+  @Override
+  public void finalizeUpgrade() throws IOException {
+    log.info("using finalizeUpgrade");
+    
+  }
+  
+  @Override
+  public void fsync(String src, String client) throws IOException {
+    log.info("using fsync");
+    
+  }
+  
+  @Override
+  public LocatedBlocks getBlockLocations(String src, long offset, long length)
+      throws IOException {
+    log.info("using getBlockLocations: " + src + " " + offset + " " + length);
+    
+    ArrayList<LocatedBlock> locatedBlocks = new ArrayList<LocatedBlock>();
+    
+    // get blocks from namespace table
+    Value fileSizeBytes = null;
+    java.util.Map<Text, Value> IDs = new TreeMap<Text, Value>();
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    bs.fetchColumnFamily(blocksFam);
+    bs.fetchColumnFamily(infoFam);
+    try {
+      log.info("getting blocks for " + src + " from namespace table");
+      for (Entry<Key,Value> entry : bs) {
+        if (infoSize.hasColumns(entry.getKey())) {
+            fileSizeBytes = entry.getValue();
+        } else if (entry.getKey().getColumnFamily().equals(blocksFam)) {
+          IDs.put(entry.getKey().getColumnQualifier(), entry.getValue());
+        }
+      }
+    } finally {
+      bs.close();
+    }
+    
+    long fileLength = 0;
+    
+    if(fileSizeBytes == null)
+      throw new IOException("file not found: " + src);
+    
+    fileLength = Long.parseLong(new String(fileSizeBytes.get()));
+    log.info("File " + src + " is " +fileLength + " bytes long");
+    
+    // TODO: calculate which blocks we need for the offset and length
+    // TODO: could store the actual long id in the value of the table
+    if(IDs.size() == 0) {
+      throw new IOException("file not found: " + src);
+    }
+    
+    fileLength = 0L;
+    long blockOffset = 0L;
+    for(Text id : IDs.keySet()) {
+      // remove block position indicator
+      String idString = id.toString().split("_")[1];
+      
+      String blockIDString = new String(idString);
+      log.info("found block: " + blockIDString);
+      
+      // lookup host and length information for each block
+      // TODO: can we join this data into the namespace table?
+      log.info("getting host data for block ...");
+      long blockSize = 0;
+      ArrayList<DatanodeInfo> dni = new ArrayList<DatanodeInfo>();
+      bs = createBatchScanner(blocksTable, new Range(idString));
+      bs.fetchColumnFamily(datanodesFam);
+      ColumnFQ.fetch(bs, infoBlockSize);
+      try {
+        for (Entry<Key,Value> entry : bs) {
+          if (infoBlockSize.hasColumns(entry.getKey())) {
+            blockSize = Long.parseLong(new String(entry.getValue().get()));
+            fileLength += blockSize;
+            log.info("got size " + blockSize + " for block " + blockIDString);
+          } else if (entry.getKey().getColumnFamily().equals(datanodesFam)) {
+            String host = entry.getKey().getColumnQualifier().toString();
+            dni.add(new DatanodeInfo(new DatanodeID(host)));
+            log.info("got host: " + new String(host) + " for block " + blockIDString);
+          }
+        }
+      } finally {
+        bs.close();
+      }
+      
+      // TODO: add generation	
+      locatedBlocks.add(
+          new LocatedBlock(
+              new Block(Long.parseLong(idString), blockSize, 0), 
+              dni.toArray(new DatanodeInfo[dni.size()]),
+              blockOffset
+              )		
+          );
+      
+      blockOffset += blockSize;
+    }
+    
+    // TODO: sort locatedBlocks by network-distance from client
+    boolean underConst = false;
+    log.info("Reporting file size of " + fileLength);
+    BatchWriter bw = createBatchWriter(namespaceTable);
+    try {
+      Mutation m = new Mutation(src);
+      ColumnFQ.put(m, infoSize, new Value(Long.toString(fileLength).getBytes()));
+      bw.addMutation(m);
+      bw.close();
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    return new LocatedBlocks(fileLength, locatedBlocks, underConst);
+  }
+  
+  @Override
+  public ContentSummary getContentSummary(String path) throws IOException {
+    log.info("using getContentSummary");
+    return null;
+  }
+  
+  @Override
+  public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
+      throws IOException {
+    log.info("using getDatanodeReport");
+    return null;
+  }
+  
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException {
+    unimplemented(renewer);
+    return null;
+  }
+  
+  /**
+   * recursively create delete objects for src and all children
+   * 
+   * @param src
+   * @param recursive
+   * @return
+   * @throws IOException 
+   */
+  private void getDeletes(Text src, List<Mutation> deletes) throws IOException {
+    
+    // Maybe this list won't fit in memory?
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    Mutation m = new Mutation(src);
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        Text columnFamily = entry.getKey().getColumnFamily();
+        if (columnFamily.equals(childrenFam)) {
+          getDeletes(entry.getKey().getColumnQualifier(), deletes);
+        }
+        log.info("deleting " + src + " " + entry.getKey().getColumnFamily() + ":" + entry.getKey().getColumnQualifier());
+        if (!src.equals("/"))
+          m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+      }
+      if (!m.getUpdates().isEmpty())
+        deletes.add(m);
+    } finally {
+      bs.close();
+    }
+  }
+  
+  @Override
+  public HdfsFileStatus getFileInfo(String src) throws IOException {
+    log.info("using getFileInfo " + src);
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
+    try {
+      return loadFileStatus(src, bs.iterator());
+    } finally {
+      bs.close();
+    }
+  }
+  
+  /**
+   * This method is currently doing a lot of lookups ...
+   */
+  @Override
+  public DirectoryListing getListing(String src, byte[] startAfter)
+      throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
+    log.info("using getListing " + src);
+    // TODO: use startAfter and needLocation
+    
+    ArrayList<HdfsFileStatus> files = new ArrayList<HdfsFileStatus>();
+    
+    String isDirFlag = null;
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    bs.fetchColumnFamily(childrenFam);
+    ColumnFQ.fetch(bs, isDir);
+    List<String> children = new ArrayList<String>();
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        log.info("Looking at entry " + entry.getKey() + " -> " + entry.getValue());
+        String value = new String(entry.getValue().get());
+        if (isDir.hasColumns(entry.getKey())) {
+          isDirFlag = value;
+        } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+          children.add(entry.getKey().getColumnQualifier().toString());
+        }
+      }
+    } finally {
+      bs.close();
+    }
+    if(isDirFlag == null)
+      throw new IOException("directory not found: " + src);
+    
+    if(isDirFlag.equals("N"))
+      throw new IOException(src + " is not a directory");
+    
+    log.info("Looking at children " + children);
+    for (String child : children) {
+      bs = createBatchScanner(namespaceTable, new Range(child));
+      try {
+        HdfsFileStatus stat = loadFileStatus(child, bs.iterator());
+        files.add(stat);
+      } finally {
+        bs.close();
+      }
+    }
+    log.info("files " + files);
+    return new DirectoryListing(files.toArray(new HdfsFileStatus[files.size()]), 0);
+  }
+  
+  @Override
+  public long getPreferredBlockSize(String filename) throws IOException {
+    log.info("using getPreferredBlockSize");
+    return 0;
+  }
+  
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    log.info("using getProtocolVersion");
+    return 0;
+  }
+  
+  @Override
+  public long[] getStats() throws IOException {
+    log.info("using getStats");
+    return null;
+  }
+  
+  private HdfsFileStatus loadFileStatus(String src, Iterator<Entry<Key,Value>> fileResult) throws IOException {
+    log.info("loadFileStatus " + src);
+    String isDirFlag = null;
+    long modification_time = 0;
+    long blocksize = 0;
+    int block_replication = 0;
+    long length = 0;
+    String permissionString = null;
+    
+    Text row = null;
+    while (fileResult.hasNext()) {
+      Entry<Key, Value> entry = fileResult.next();
+      log.info("looking at file data " + entry.getKey() + " -> " + entry.getValue());
+      String value = new String(entry.getValue().get());
+      if (isDir.hasColumns(entry.getKey())) {
+        isDirFlag = value;
+      } else if (infoSize.hasColumns(entry.getKey())) {
+        length = Long.parseLong(value);
+      } else if (infoReplication.hasColumns(entry.getKey())) {
+        block_replication = Integer.parseInt(value);
+      } else if (infoBlockSize.hasColumns(entry.getKey())) {
+        blocksize = Long.parseLong(value);
+      } else if (infoModificationTime.hasColumns(entry.getKey())) {
+        modification_time = Long.parseLong(value);
+      } else if (infoPermission.hasColumns(entry.getKey())) {
+        permissionString = new String(entry.getValue().get());
+      }
+      row = entry.getKey().getRow();
+    }
+    if (isDirFlag == null) {
+      log.info("did not find is_dir for " + src);
+      throw new FileNotFoundException(src);
+    }
+    
+    boolean isdir = isDirFlag.equals("Y");
+    FsPermission permission = FsPermission.getDefault();
+    if (permissionString != null) {
+      permission = FsPermission.valueOf((isdir ? "d":"-") + permissionString);
+    }
+    
+    // TODO
+    long access_time = modification_time;
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    String group = UserGroupInformation.getCurrentUser().getGroupNames()[0];
+    return new HdfsFileStatus(length, isdir, block_replication, blocksize, modification_time, access_time, permission, user, group, 
+        TextUtil.getBytes(row));
+  }
+  
+  @Override
+  public void metaSave(String filename) throws IOException {
+    log.info("using metaSave");
+    
+  }
+  
+  @Override
+  public boolean mkdirs(String src, FsPermission masked) throws IOException {
+    log.info("using mkdirs ");
+    
+    if (!DFSUtil.isValidName(src)) {
+      throw new IOException("Invalid directory name: " + src);
+    }
+    
+    // TODO: check permissions
+    
+    src = normalizePath(src);
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    ColumnFQ.fetch(bs, isDir);
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        if (isDir.hasColumns(entry.getKey())) {
+          if (new String(entry.getValue().get()).equals("Y"))
+            return true;
+          else
+            throw new IOException(src + "already exists and is not a directory");
+        }
+      }
+    } finally {
+      bs.close();
+    }
+    
+    // TODO: get locks ...
+    // verify parent path exists
+    byte[] parentPath = getParentPath(src);
+    
+    bs = createBatchScanner(namespaceTable, new Range(new Text(parentPath)));
+    ColumnFQ.fetch(bs, isDir);
+    bs.fetchColumnFamily(childrenFam);
+    String isDirString = null;
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        if (isDir.hasColumns(entry.getKey())) {
+          isDirString = new String(entry.getValue().get());
+        } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+        }
+      }
+    } finally {
+      bs.close();
+    }
+    
+    if(isDirString == null) 
+      mkdirs(new String(parentPath), masked);
+    else if(isDirString.equals("N")) 
+      throw new IOException("error: parent " + src + " is not a directory");
+    
+    // edit namespace
+    BatchWriter bw = createBatchWriter(namespaceTable);
+    Mutation m = new Mutation(new Text(parentPath));
+    m.put(childrenFam, new Text(src), blank);
+    //String dirName = getDirName(src);
+    try {
+      try {
+        bw.addMutation(m);
+        m = new Mutation(new Text(src));
+        ColumnFQ.put(m, isDir, new Value("Y".getBytes()));
+        ColumnFQ.put(m, infoModificationTime, new Value(Long.toString(System.currentTimeMillis()).getBytes()));
+        bw.addMutation(m);
+      } finally {
+        bw.close();
+      }
+    } catch (MutationsRejectedException ex) {
+      throw new IOException(ex);
+    }
+    return true;
+  }
+  
+  @Override
+  public long nextGenerationStamp(Block arg0, boolean arg1) throws IOException {
+    unimplemented(arg0, arg1);
+    return 0;
+  }
+  
+  
+  
+  @Override
+  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm)
+      throws IOException {
+    log.info("using processUpgradeCommand");
+    return null;
+  }
+  
+  private void recordBlockHosts(byte[] blockIDBytes, DatanodeInfo[] hosts) throws IOException {
+    try {
+      if(hosts.length == 0)
+        return;
+      
+      Mutation blockData = new Mutation(new Text(blockIDBytes));
+      for(int i=0; i < hosts.length; i++)
+        blockData.put(datanodesFam, new Text(hosts[i].name.getBytes()), blank);
+      BatchWriter bw = createBatchWriter(blocksTable);
+      try {
+        bw.addMutation(blockData);
+      } finally {
+        bw.close();
+      }
+      
+      bw = createBatchWriter(datanodesTable);
+      try {
+        for(int i=0; i < hosts.length; i++) {
+          Mutation host = new Mutation(new Text(hosts[i].name));
+          host.put(blocksFam, new Text(blockIDBytes), blank);
+          bw.addMutation(host);
+        }
+      } finally {
+        bw.close();
+      }
+    } catch (MutationsRejectedException ex) {
+      throw new IOException(ex);
+    }
+    
+  }
+  
+  /**
+   * helpers 
+   * both of these write to blocksTable and datanodesTable
+   * 
+   * @param host
+   * @param hblocks
+   * @throws IOException
+   */
+  private void recordHostBlocks(String host, long[] hblocks) throws IOException {
+    try {
+      if(hblocks.length == 0)
+        return;
+      
+      Mutation hostData = new Mutation(new Text(host));
+      for(int i=0; i < hblocks.length; i++)
+        hostData.put(blocksFam, new Text(Long.toString(hblocks[i]).getBytes()), blank);
+      BatchWriter writer = createBatchWriter(datanodesTable);
+      try {
+        writer.addMutation(hostData);
+      } finally {
+        writer.close();
+      }
+      
+      writer = createBatchWriter(blocksTable);
+      try {
+        for(int i=0; i < hblocks.length; i++) {
+          Mutation block = new Mutation(new Text(Long.toString(hblocks[i]).getBytes()));
+          block.put(datanodesFam, new Text(host.getBytes()), blank);
+          writer.addMutation(block);
+        }
+      } finally {
+        writer.close();
+      }
+    } catch (MutationsRejectedException ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  @Override
+  public boolean recoverLease(String src, String clientName) throws IOException {
+    unimplemented(src, clientName);
+    return false;
+  }
+  
+  @Override
+  public void refreshNodes() throws IOException {
+    log.info("using refreshNodes");
+    
+  }
+  
+  @Override
+  public DatanodeRegistration register(DatanodeRegistration registration)
+      throws IOException {
+    log.info("using register");
+    
+    // record this datanode's info
+    try {
+      Connector conn = getConnector();
+      if (conn != null) {
+        BatchWriter bw = createBatchWriter(datanodesTable);
+        Mutation reg = new Mutation(new Text(registration.name.getBytes()));
+        ColumnFQ.put(reg, infoStorageID, new Value(registration.storageID.getBytes()));
+        try {
+          try {
+            bw.addMutation(reg);
+          } finally {
+            bw.close();
+          }
+        } catch (MutationsRejectedException ex) {
+          throw new IOException(ex);
+        }
+      }
+    } catch (Throwable ex) {
+      log.info("Ignoring exceptiong, maybe accumulo is not yet initialized? " + ex);
+    }
+    // clients get this info in a list of targets from addBlock()
+    return registration;
+  }
+  
+  private static class FileStatus {
+    boolean exists;
+    boolean isDir;
+    public FileStatus(boolean exists, boolean isDir) {
+      this.exists = exists;
+      this.isDir = isDir;
+    }
+  }
+  
+  FileStatus getFileStatus(String src) throws IOException {
+    BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
+    ColumnFQ.fetch(bs, isDir);
+    FileStatus result = new FileStatus(false, false);
+    for (Entry<Key,Value> entry : bs) {
+      result.exists = true;
+      if (new String(entry.getValue().get()).equals("Y")) {
+        result.isDir = true;
+      }
+    }
+    return result;
+  }
+  
+  @Override
+  public boolean rename(String src, String dst) throws IOException {
+    log.info("using rename " + src + " -> " + dst);
+    
+    try {
+      FileStatus srcStatus = getFileStatus(src);
+      if (srcStatus.exists == false)
+        throw new FileNotFoundException(src);
+      if (srcStatus.isDir == true)
+        return false;
+      FileStatus dstStatus = getFileStatus(dst);
+      String parent = new String(getParentPath(dst));
+      FileStatus parentStatus = getFileStatus(parent);
+      if (!dstStatus.exists && !parentStatus.exists) {
+        throw new FileNotFoundException(dst);
+      }
+      if (!parentStatus.isDir) {
+        throw new IOException(parent + " is not a directory");
+      }
+      if (dstStatus.isDir) {
+        return rename(src, dst + "/" + basename(src));
+      }
+      // remove conflicting destination file, if any
+      if (dstStatus.exists)
+        delete(dst, true);
+      // copy file information
+      BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
+      BatchWriter bw = createBatchWriter(namespaceTable);
+      try {
+        Mutation m = new Mutation(new Text(dst));
+        Mutation s = new Mutation(new Text(src));
+        for (Entry<Key,Value> entry: bs) {
+          Key key = entry.getKey();
+          m.put(key.getColumnFamily(), key.getColumnQualifier(), entry.getValue());
+          s.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        }
+        bw.addMutation(m);
+        bw.addMutation(s);
+        // remove child link in src's parent
+        m = new Mutation(new Text(getParentPath(src)));
+        m.putDelete(childrenFam, new Text(src));
+        bw.addMutation(m);
+        // add child link in dst's parent
+        m = new Mutation(new Text(getParentPath(dst)));
+        m.put(childrenFam, new Text(dst), blank);
+      } finally {
+        bs.close();
+        bw.close();
+      }
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    return true;
+  }
+  
+  private String basename(String src) {
+    return src.substring(src.lastIndexOf("/") + 1);
+  }
+
+  @Override
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException {
+    unimplemented(token);
+    return 0;
+  }
+  
+  @Override
+  public void renewLease(String clientName) throws IOException {
+    log.info("using renewLease");
+    
+  }
+  
+  @Override
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    log.info("using reportBadBlocks");
+  }
+  
+  @Override
+  public void saveNamespace() throws IOException {
+    log.info("using saveNamespace");
+    
+  }
+  
+  @Override
+  public DatanodeCommand[] sendHeartbeat(final DatanodeRegistration registration,
+      final long capacity, final long dfsUsed, final long remaining, final int xmitsInProgress,
+      final int xceiverCount) throws IOException {
+    log.info("using sendHeartbeat");
+    if (System.currentTimeMillis() - start < 10*1000)
+      return new DatanodeCommand[0];
+    
+    // update datanodes table with info
+    // skip this if none of the numbers have changed
+    // TODO: get last numbers from a lookup
+    if(capacity != lastCapacity || 
+        dfsUsed != lastDfsUsed ||
+        remaining != lastRemaining) {
+      Connector conn;
+      try {
+        conn = getConnector();
+      } catch (Throwable ex) {
+        // probably not initialized
+        return new DatanodeCommand[0];
+      }
+      try {
+        if (conn != null) {
+          BatchWriter bw = createBatchWriter(conn, datanodesTable);
+          Mutation m = new Mutation(new Text(registration.name.getBytes()));
+          ColumnFQ.put(m, infoCapacity, new Value(Long.toString(capacity).getBytes()));
+          ColumnFQ.put(m, infoUsed, new Value(Long.toString(dfsUsed).getBytes()));
+          ColumnFQ.put(m, DistributedNamenodeProxy.remaining, new Value(Long.toString(remaining).getBytes()));
+          try {
+            bw.addMutation(m);
+          } finally {
+            bw.close();
+          }
+        }
+      } catch (Exception ex) {
+        log.error(ex, ex);
+      }
+    }
+    lastCapacity = capacity;
+    lastDfsUsed = dfsUsed;
+    lastRemaining = remaining;
+    // return a list of commands for the data node
+    List<DatanodeCommand> commands = new ArrayList<DatanodeCommand>();
+    try {
+      BatchScanner bs = createBatchScanner(datanodesTable, new Range(registration.getName()));
+      bs.fetchColumnFamily(commandFam);
+      BatchWriter bw = createBatchWriter(datanodesTable);
+      for (Entry<Key,Value> entry : bs) {
+        Key key = entry.getKey();
+        DatanodeCommand command = (DatanodeCommand)deserialize(entry.getValue().get());
+        log.info("found datanode Command " + command);
+        commands.add(command);
+        Mutation m = new Mutation(key.getRow());
+        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        bw.addMutation(m);
+      }
+      bs.close();
+      bw.close();
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    return commands.toArray(new DatanodeCommand[0]);
+  }
+  
+  
+  public static Object deserialize(byte[] data) throws IOException {
+    ByteArrayInputStream streamer = new ByteArrayInputStream(data);
+    DataInputStream deserializer = new DataInputStream(streamer);
+    String className = deserializer.readUTF();
+    try {
+      Writable w = (Writable)DistributedNamenodeProxy.class.getClassLoader().loadClass(className).getConstructor().newInstance();
+      w.readFields(deserializer);
+      return w;
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    } finally {
+      deserializer.close();
+    }
+  }
+  
+  public static byte[] serialize(Writable obj) throws IOException {
+    ByteArrayOutputStream streamer = new ByteArrayOutputStream();
+    DataOutputStream serializer = new DataOutputStream(streamer);
+    serializer.writeUTF(obj.getClass().getName());
+    obj.write(serializer);
+    serializer.close();
+    return streamer.toByteArray();
+  }
+
+  @Override
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    unimplemented(bandwidth);
+  }
+  
+  @Override
+  public void setOwner(String src, String username, String groupname)
+      throws IOException {
+    log.info("using setOwner");
+    
+  }
+  
+  @Override
+  public void setPermission(String src, FsPermission permission)
+      throws IOException {
+    log.info("using setPermission");
+    try {
+      HdfsFileStatus fileInfo = getFileInfo(src);
+      if (!fileInfo.getPermission().equals(permission)) {
+        BatchWriter bw = createBatchWriter(namespaceTable);
+        try {
+          Mutation m = new Mutation(src);
+          ColumnFQ.put(m, infoPermission, new Value(permission.toString().getBytes()));
+          bw.addMutation(m);
+        } finally {
+          bw.close();
+        }
+      }
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  @Override
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
+      throws IOException {
+    log.info("using setQuota");
+    
+  }
+  
+  @Override
+  public boolean setReplication(String src, short replication)
+      throws IOException {
+    log.info("using setReplication");
+    return false;
+  }
+  
+  @Override
+  public boolean setSafeMode(SafeModeAction action) throws IOException {
+    log.info("using setSafeMode");
+    return false;
+  }
+  
+  
+  @Override
+  public void setTimes(String src, long mtime, long atime) throws IOException {
+    log.info("using setTimes");
+    
+  }
+  
+  public void stop() {
+  }
+  
+  @Override
+  public NamespaceInfo versionRequest() throws IOException {
+    log.info("using versionRequest");
+    // TODO: find out how to get namespace id
+    // could store this in the info of the / entry
+    NamespaceInfo nsi = new NamespaceInfo(384837986, 0, 0);
+    //throw new RuntimeException();
+    return nsi;
+  }
+  
+}

Propchange: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/FakeNameNode.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/FakeNameNode.java?rev=1371432&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/FakeNameNode.java (added)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/FakeNameNode.java Thu Aug  9 20:21:32 2012
@@ -0,0 +1,8 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+
+public interface FakeNameNode extends ClientProtocol, DatanodeProtocol {
+  
+}

Propchange: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/FakeNameNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java?rev=1371432&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java (added)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java Thu Aug  9 20:21:32 2012
@@ -0,0 +1,145 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DNNConstants;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy.ConnectInfo;
+import org.apache.log4j.Logger;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.CuratorFrameworkFactory.Builder;
+import com.netflix.curator.retry.RetryUntilElapsed;
+
+public class SwitchingNameNode {
+  private static final Logger log = Logger.getLogger(SwitchingNameNode.class);
+  static Pattern isRoot = Pattern.compile("/accumulo(|/instance_id.*|/version.*|/walogArchive|/recovery.*|/tables$|/tables/(\\!0|0|1|2)(/.*|$))");
+  
+  static private boolean isZooName(String path) {
+    boolean result = isRoot.matcher(path).matches();
+    log.info("Looking at " + path + " isZooName " + result);
+    return result;
+  }
+  
+  public static FakeNameNode create(final Configuration conf) {
+    try {
+      URI uri = new URI(conf.get("fs.default.name"));
+      ConnectInfo info = new ConnectInfo(uri);
+      Builder builder = CuratorFrameworkFactory.builder().namespace(DNNConstants.DNN);
+      builder.connectString(info.zookeepers);
+      builder.retryPolicy(new RetryUntilElapsed(120*1000, 500));
+      //builder.aclProvider(aclProvider);
+      CuratorFramework client = builder.build();
+      client.start();
+      ZookeeperNameNode zoo = new ZookeeperNameNode(client);
+      return SwitchingNameNode.create(zoo, info);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  
+  
+  public static FakeNameNode create(final FakeNameNode zoonode, final ConnectInfo info) {
+    
+    return (FakeNameNode) Proxy.newProxyInstance(SwitchingNameNode.class.getClassLoader(), new Class<?>[] {FakeNameNode.class}, new InvocationHandler() {
+      FakeNameNode distributed = null;
+      
+      @Override
+      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        log.info("calling " + method.getName());
+        if (method.getName().equals("toString")) {
+          return "FakeNameNode " + zoonode + " dynamic name node@ " + info.instance;
+        }
+        try {
+          // handle versionRequest with the zoonode
+          if (method.getName().equals("versionRequest")) {
+            return method.invoke(zoonode, args);
+          }
+          // split blocks manually to the separate nodes
+          if (method.getName().equals("blockReport")) {
+            long[] blocks = (long[])args[1];
+            long[] zooblocks = new long[blocks.length];
+            long[] distblocks = new long[blocks.length];
+            int zoocount = 0;
+            int distcount = 0;
+            for (long block : blocks)
+              if (block < 0)
+                zooblocks[zoocount++] = block;
+              else
+                distblocks[distcount++] = block;
+            zooblocks = Arrays.copyOf(zooblocks, zoocount);
+            distblocks = Arrays.copyOf(distblocks, distcount);
+            Object result = null;
+            if (zooblocks.length > 0) {
+              args[1] = zooblocks;
+              log.info("Calling zoo blockReport with " + zooblocks.length + " blocks");
+              result = method.invoke(zoonode, args);
+            }
+            if (distblocks.length > 0 && distributed != null) {
+              args[1] = distblocks;
+              log.info("Calling dist blockReport with " + distblocks.length + " blocks");
+              result = method.invoke(distributed, args);
+            }
+            return result;
+          }
+          if (method.getName().equals("blockReceived")) {
+            Block[] blocks = (Block[])args[1];
+            List<Block> zooblocks = new ArrayList<Block>();
+            List<Block> distblocks = new ArrayList<Block>();
+            for (Block block : blocks) {
+              if (block.getBlockId() < 0)
+                zooblocks.add(block);
+              else
+                distblocks.add(block);
+            }
+            Object result = null;
+            if (!zooblocks.isEmpty()) {
+              args[1] = zooblocks.toArray(new Block[0]);
+              result = method.invoke(zoonode, args);
+            }
+            if (distributed != null && !distblocks.isEmpty()) {
+              args[1] = distblocks.toArray(new Block[0]);
+              result = method.invoke(distributed, args);
+            }
+            return result;
+          }
+          // dispatch on filename
+          if (args != null && args.length > 0 && (args[0] instanceof String)) {
+            if (isZooName((String) args[0]))
+              return method.invoke(zoonode, args);
+          }
+          // try to send to the dnn, then to znn
+          synchronized (this) {
+            if (distributed == null) {
+              try {
+                Instance zinst = new ZooKeeperInstance(info.instance, info.zookeepers);
+                Connector conn = zinst.getConnector(info.username, info.passwd);
+                distributed = new DistributedNamenodeProxy(conn);
+              } catch (Exception ex) {
+                log.warn("error invoking " + method.getName() + ", invoking zookeeper version");
+                return method.invoke(zoonode, args);
+              }
+            }
+          }
+          return method.invoke(distributed, args);
+        } catch (InvocationTargetException ex) {
+          throw ex.getCause();
+        }
+      }
+    });
+    
+  }
+}

Propchange: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message