accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1383487 - in /accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs: DNNFileSystem.java server/namenode/DistributedNamenodeProxy.java server/namenode/ZookeeperNameNode.java
Date Tue, 11 Sep 2012 17:30:12 GMT
Author: ecn
Date: Tue Sep 11 17:30:11 2012
New Revision: 1383487

URL: http://svn.apache.org/viewvc?rev=1383487&view=rev
Log:
ACCUMULO-722: clean-up existing code; now runs continuous ingest at scale with agitation, and will shutdown cleanly

Modified:
    accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
    accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
    accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java

Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java?rev=1383487&r1=1383486&r2=1383487&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java (original)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java Tue Sep 11 17:30:11 2012
@@ -61,6 +61,7 @@ public class DNNFileSystem extends FileS
 
   public void initialize(URI uri, Configuration conf) throws IOException {
     super.initialize(uri, conf);
+    log.info("URI: " + uri);
     setConf(conf);
     FakeNameNode fake = null;
     try {
@@ -588,5 +589,4 @@ public class DNNFileSystem extends FileS
     dfs.setBalancerBandwidth(bandwidth);
   }
   
-  
 }

Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java?rev=1383487&r1=1383486&r2=1383487&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java (original)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java Tue Sep 11 17:30:11 2012
@@ -34,7 +34,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -70,8 +69,9 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -101,34 +101,24 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Logger;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import org.apache.zookeeper.WatchedEvent;
 
 import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.api.CuratorWatcher;
 
 public class DistributedNamenodeProxy implements FakeNameNode {
   Executor executor = Executors.newSingleThreadExecutor();
 
   public static class ConnectInfo {
-    public ConnectInfo(URI uri) {
-      String userInfo = uri.getUserInfo();
-      log.info("userInfo " + userInfo);
-      for (String part : userInfo.split(";")) {
-        String parts[] = part.split("=");
-        String attr = parts[0];
-        String value = parts[1];
-        if (attr.equals("user")) {
-          this.username = value;
-        } else if (attr.equals("pass")) {
-          this.passwd = value.getBytes();
-        } else if (attr.equals("keepers")) {
-          this.zookeepers = value;
-        } else if (attr.equals("instance")) {
-          this.instance = value;
-        } else {
-          throw new RuntimeException("unknown entry " + attr + " in authority information");
-        }
-      }
+    public ConnectInfo(Configuration conf) {
+      this.passwd = conf.get("dnn.user.password", "").getBytes();
+      if (passwd.length == 0)
+        throw new IllegalArgumentException("dnn.user.password not set");
+      this.username = conf.get("dnn.user.username", "root");
+      this.zookeepers = conf.get("dnn.zookeepers", "localhost"); 
+      this.instance = conf.get("dnn.instance.name", "");
+      if (instance.length() == 0)
+        throw new IllegalArgumentException("dnn.instance.name not set");
     }
     public String username;
     public byte[] passwd;
@@ -145,15 +135,31 @@ public class DistributedNamenodeProxy im
     Replicator() {
       targets = new HashSet<DatanodeInfo>();
     }
+
+    void start() {
+      zookeeper.getData().usingWatcher(new CuratorWatcher() {
+        @Override
+        public void process(WatchedEvent event) throws Exception {
+          scanDatanodes();
+          synchronized (this) {
+            this.notifyAll();
+          }
+        }});
+    }
     
     DatanodeInfo[] getReplicationTargets(int replicationFactor) throws IOException {
-      
-      // TODO: periodically scan the datanodes table to find new datanodes
-      while (targets.size() == 0) {
-        scanDatanodes();
-        if (targets.size() > 0)
-          break;
-        UtilWaitThread.sleep(250);
+
+      synchronized (this) {
+        if (targets.size() == 0) {
+          scanDatanodes();
+        }
+        while (targets.size() == 0) {
+          try {
+            wait(250);
+          } catch (InterruptedException e) {
+            //
+          }
+        }
       }
       
       List<DatanodeInfo> targetsCopy = new ArrayList<DatanodeInfo>();
@@ -179,8 +185,8 @@ public class DistributedNamenodeProxy im
     private void scanDatanodes() throws IOException {
       log.info("scanning datanodes table ..");
       HashSet<DatanodeInfo> updatedTargets = new HashSet<DatanodeInfo>();
-      BatchScanner scanner = createBatchScanner(datanodesTable, new Range());
-      infoIpcPort.fetch(scanner);
+      BatchScanner scanner = createBatchScanner(TABLES.DATANODES, new Range());
+      COLUMNS.IPC_PORT.fetch(scanner);
       try {
         for (Entry<Key,Value> entry : scanner) {
           String nodeName = entry.getKey().getRow().toString();
@@ -214,6 +220,16 @@ public class DistributedNamenodeProxy im
     }
   }
   
+  static private Scanner createScanner(Connector conn, String table, Range range) throws IOException {
+    try {
+      Scanner result = conn.createScanner(table, Constants.NO_AUTHS);
+      result.setRange(range);
+      return result;
+    } catch (TableNotFoundException ex) {
+      throw new IOException(ex);
+    }
+  }
+  
   private static BatchWriter createBatchWriter(Connector conn, String table) throws IOException {
     try {
       return conn.createBatchWriter(table, 10*1000, 1000, 4);
@@ -254,27 +270,31 @@ public class DistributedNamenodeProxy im
   private Random rand = new Random();
   private Replicator replicator = new Replicator();
   private final Connector conn;
-  private final static String namespaceTable = "namespace";
-  private final static String blocksTable = "blocks";
-  private final static String datanodesTable = "datanodes";
-  private final static Text infoFam = new Text("info");
-  private final static Text childrenFam = new Text("children");
-  private final static Text blocksFam = new Text("blocks");
-  private final static Text datanodesFam = new Text("datanodes");
-  private final static Text commandFam = new Text("command");
-
-  private final static ColumnFQ remaining = new ColumnFQ(infoFam, new Text("remaining"));
-  private final static ColumnFQ infoSize = new ColumnFQ(infoFam, new Text("size"));
-  private final static ColumnFQ isDir = new ColumnFQ(infoFam, new Text("isDir"));
-  private final static ColumnFQ infoCapacity = new ColumnFQ(infoFam, new Text("capacity"));
-  private final static ColumnFQ infoIpcPort = new ColumnFQ(infoFam, new Text("ipc_port"));
-  private final static ColumnFQ infoUsed = new ColumnFQ(infoFam, new Text("used"));
-  private final static ColumnFQ infoReplication = new ColumnFQ(infoFam, new Text("replication"));
-  private final static ColumnFQ infoBlockSize = new ColumnFQ(infoFam, new Text("blocksize"));
-  private final static ColumnFQ infoModificationTime = new ColumnFQ(infoFam, new Text("create_time"));
-  private final static ColumnFQ infoStorageID = new ColumnFQ(infoFam, new Text("storageID"));
-  private final static ColumnFQ infoPermission = new ColumnFQ(infoFam, new Text("permission"));
-  
+  static final class TABLES {
+    private final static String NAMESPACE = "namespace";
+    private final static String BLOCKS = "blocks";
+    private final static String DATANODES = "datanodes";
+  }
+  static final class FAMILIES {
+    private final static Text INFO = new Text("info");
+    private final static Text CHILDREN = new Text("children");
+    private final static Text BLOCKS = new Text("blocks");
+    private final static Text DATANODES = new Text("datanodes");
+    private final static Text COMMAND = new Text("command");
+  }
+  static final class COLUMNS {
+    private final static ColumnFQ REMAINING = new ColumnFQ(FAMILIES.INFO, new Text("remaining"));
+    private final static ColumnFQ SIZE = new ColumnFQ(FAMILIES.INFO, new Text("size"));
+    private final static ColumnFQ IS_DIR = new ColumnFQ(FAMILIES.INFO, new Text("isDir"));
+    private final static ColumnFQ CAPACITY = new ColumnFQ(FAMILIES.INFO, new Text("capacity"));
+    private final static ColumnFQ IPC_PORT = new ColumnFQ(FAMILIES.INFO, new Text("ipc_port"));
+    private final static ColumnFQ USED = new ColumnFQ(FAMILIES.INFO, new Text("used"));
+    private final static ColumnFQ REPLICATION = new ColumnFQ(FAMILIES.INFO, new Text("replication"));
+    private final static ColumnFQ BLOCK_SIZE = new ColumnFQ(FAMILIES.INFO, new Text("blocksize"));
+    private final static ColumnFQ MODIFICATION_TIME = new ColumnFQ(FAMILIES.INFO, new Text("mtime"));
+    private final static ColumnFQ STORAGE_ID = new ColumnFQ(FAMILIES.INFO, new Text("storageID"));
+    private final static ColumnFQ PERMISSION = new ColumnFQ(FAMILIES.INFO, new Text("permission"));
+  }
   private final static Value blank = new Value(new byte[]{});
   
   private final static int QUERY_THREADS = 10;
@@ -289,16 +309,17 @@ public class DistributedNamenodeProxy im
   
   private final CuratorFramework zookeeper;
   
-  public DistributedNamenodeProxy(CuratorFramework keeper, URI uri) throws IOException {
+  public DistributedNamenodeProxy(CuratorFramework keeper, Configuration conf) throws IOException {
     log.info("========= Distributed Name Node Proxy init =========");
-    ConnectInfo info = new ConnectInfo(uri);
+    ConnectInfo info = new ConnectInfo(conf);
     Instance instance = new ZooKeeperInstance(info.instance, info.zookeepers);
     zookeeper = keeper;
     try {
       this.conn = instance.getConnector(info.username, info.passwd);
     } catch (Exception e) {
       throw new IOException(e);
-    } 
+    }
+    replicator.start();
     //		String healthNodeHost = config.get("healthnode");
     //		if(healthNodeHost == null)
     //			throw new IOException("error: no healthnode address specified. add one to core-site.xml");
@@ -319,8 +340,8 @@ public class DistributedNamenodeProxy im
     log.info("using abandonBlock");
     
     // find the block's position in the list (probably the last one)
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    bs.fetchColumnFamily(blocksFam);
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+    bs.fetchColumnFamily(FAMILIES.BLOCKS);
     
     // delete it from the file
     Mutation m = new Mutation(new Text(src));
@@ -330,7 +351,7 @@ public class DistributedNamenodeProxy im
         String parts[] = cq.split("_");
         long block = Long.parseLong(parts[1]);
         if (b.getBlockId() == block) {
-          m.putDelete(blocksFam, entry.getKey().getColumnQualifier());
+          m.putDelete(FAMILIES.BLOCKS, entry.getKey().getColumnQualifier());
         }
       }
     } finally {
@@ -341,7 +362,7 @@ public class DistributedNamenodeProxy im
     }
     
     // delete the block size and location information
-    BatchWriter bw = createBatchWriter(namespaceTable);
+    BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
     try {
       bw.addMutation(m);
     } catch (MutationsRejectedException e) {
@@ -353,10 +374,10 @@ public class DistributedNamenodeProxy im
         throw new RuntimeException(e);
       }
     }
-    bw = createBatchWriter(blocksTable);
+    bw = createBatchWriter(TABLES.BLOCKS);
     try {
       Text row = new Text("" + b.getBlockId());
-      bs = createBatchScanner(blocksTable, new Range(row));
+      bs = createBatchScanner(TABLES.BLOCKS, new Range(row));
       m = new Mutation(row);
       for (Entry<Key,Value> entry : bs) {
         m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
@@ -394,9 +415,9 @@ public class DistributedNamenodeProxy im
     log.info("using addBlock " + src + " " + clientName);
 
     // get the last block ID and replication
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    bs.fetchColumnFamily(blocksFam);
-    infoReplication.fetch(bs);
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+    bs.fetchColumnFamily(FAMILIES.BLOCKS);
+    COLUMNS.REPLICATION.fetch(bs);
     
     // TODO: fetch from configuration
     int defaultReplication = 3;
@@ -404,9 +425,9 @@ public class DistributedNamenodeProxy im
     int blockPos = 0;
     try {
       for (Entry<Key,Value> entry : bs) {
-        if (entry.getKey().getColumnFamily().equals(blocksFam))
+        if (entry.getKey().getColumnFamily().equals(FAMILIES.BLOCKS))
           blockPos++;
-        if (infoReplication.hasColumns(entry.getKey()))
+        if (COLUMNS.REPLICATION.hasColumns(entry.getKey()))
           replication = Integer.parseInt(entry.getValue().toString());
       }
     } finally {
@@ -437,8 +458,8 @@ public class DistributedNamenodeProxy im
     
     // record file to block mapping
     Mutation nameData = new Mutation(new Text(src.getBytes()));
-    nameData.put(blocksFam, new Text(String.format("%08d_%d", blockPos, blockID).getBytes()), blank);
-    BatchWriter bw = createBatchWriter(namespaceTable);
+    nameData.put(FAMILIES.BLOCKS, new Text(String.format("%08d_%d", blockPos, blockID).getBytes()), blank);
+    BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
     try {
       try {
         bw.addMutation(nameData);
@@ -456,8 +477,7 @@ public class DistributedNamenodeProxy im
   @Override
   public LocatedBlock append(String src, String clientName)
       throws IOException {
-    log.info("using append");
-    return null;
+    throw new NotImplementedException();
   }
   
   /** ------------ Data Node Protocol Methods -----------
@@ -476,13 +496,13 @@ public class DistributedNamenodeProxy im
         
         // update blocks table
         try {
-          final BatchWriter bw = createBatchWriter(blocksTable);
+          final BatchWriter bw = createBatchWriter(TABLES.BLOCKS);
           try {
             for(Block b : blocks) {
               if (ZookeeperNameNode.isZooBlockId(b.getBlockId()))
                 continue;
               Mutation blockData = new Mutation(new Text(Long.toString(b.getBlockId())));
-              infoBlockSize.put(blockData, new Value(Long.toString(b.getNumBytes()).getBytes()));
+              COLUMNS.BLOCK_SIZE.put(blockData, new Value(Long.toString(b.getNumBytes()).getBytes()));
               bw.addMutation(blockData);
             }
           } finally {
@@ -510,17 +530,17 @@ public class DistributedNamenodeProxy im
     log.info(registration.getName() + " reports blocks " + current);
     if (current.isEmpty())
       return null;
-    BatchWriter bw = createBatchWriter(blocksTable);
+    BatchWriter bw = createBatchWriter(TABLES.BLOCKS);
     Mutation m = new Mutation(registration.getName());
-    Scanner scan = createScanner(datanodesTable);
+    Scanner scan = createScanner(TABLES.DATANODES);
     scan.setRange(new Range(registration.getName()));
-    scan.fetchColumnFamily(blocksFam);
+    scan.fetchColumnFamily(FAMILIES.BLOCKS);
     try {
       for (Entry<Key,Value> entry : scan) {
         long block = Long.parseLong(entry.getKey().getColumnQualifier().toString());
         if (!current.remove(block)) {
           // found some block, not in the blocklist, remove the entry
-          m.putDelete(blocksFam, entry.getKey().getColumnQualifier());
+          m.putDelete(FAMILIES.BLOCKS, entry.getKey().getColumnQualifier());
         }
       }
       if (!m.getUpdates().isEmpty())
@@ -566,8 +586,8 @@ public class DistributedNamenodeProxy im
     
     // write complete status to namespace?
     // does this just help avoid mutations to existent complete files?
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    bs.fetchColumnFamily(blocksFam);
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+    bs.fetchColumnFamily(FAMILIES.BLOCKS);
     List<Range> ranges = new ArrayList<Range>();
     try {
       for (Entry<Key,Value> entry : bs) {
@@ -580,8 +600,8 @@ public class DistributedNamenodeProxy im
     if (ranges.isEmpty())
       return true;
     long fileSize = 0;
-    BatchScanner blockScanner = createBatchScanner(blocksTable, ranges.toArray(new Range[]{}));
-    infoBlockSize.fetch(blockScanner);
+    BatchScanner blockScanner = createBatchScanner(TABLES.BLOCKS, ranges.toArray(new Range[]{}));
+    COLUMNS.BLOCK_SIZE.fetch(blockScanner);
     fileSize = 0;
     int count = 0;
     try {
@@ -603,8 +623,8 @@ public class DistributedNamenodeProxy im
     
     // write size to namespace table
     Mutation fileSizePut = new Mutation(new Text(src.getBytes()));
-    infoSize.put(fileSizePut, new Value(Long.toString(fileSize).getBytes()));
-    BatchWriter bw = createBatchWriter(namespaceTable);
+    COLUMNS.SIZE.put(fileSizePut, new Value(Long.toString(fileSize).getBytes()));
+    BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
     try {
       try {
         bw.addMutation(fileSizePut);
@@ -638,14 +658,14 @@ public class DistributedNamenodeProxy im
     // verify that parent directories exist
     byte[] parent = getParentPath(src);
     String isDirFlag = null;
-    ColumnFQ srcColumn = new ColumnFQ(childrenFam, new Text(src));
+    ColumnFQ srcColumn = new ColumnFQ(FAMILIES.CHILDREN, new Text(src));
     
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(parent)));
-    isDir.fetch(bs);
-    bs.fetchColumnFamily(childrenFam);
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(parent)));
+    COLUMNS.IS_DIR.fetch(bs);
+    bs.fetchColumnFamily(FAMILIES.CHILDREN);
     try {
       for (Entry<Key,Value> entry : bs) {
-        if (isDir.hasColumns(entry.getKey()))
+        if (COLUMNS.IS_DIR.hasColumns(entry.getKey()))
         {
           isDirFlag = new String(entry.getValue().get());
         }
@@ -673,29 +693,28 @@ public class DistributedNamenodeProxy im
     /*
      * not yet recorded:
      * 
-     * long length
-     * long modification_time
      * long access_time
      * String owner
      * String group
+     * Leases
      */
     
     // TODO: not atomic
     try {
-      BatchWriter bw = createBatchWriter(namespaceTable);
+      BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
       try {
         Mutation createRequest = new Mutation(new Text(src));
-        infoModificationTime.put(createRequest, now());
-        put(createRequest, infoReplication, Short.toString(replication));
-        put(createRequest, infoBlockSize, Long.toString(blockSize));
-        put(createRequest, infoPermission, masked.toString());
-        put(createRequest, isDir, "N");
+        COLUMNS.MODIFICATION_TIME.put(createRequest, now());
+        put(createRequest, COLUMNS.REPLICATION, Short.toString(replication));
+        put(createRequest, COLUMNS.BLOCK_SIZE, Long.toString(blockSize));
+        put(createRequest, COLUMNS.PERMISSION, masked.toString());
+        put(createRequest, COLUMNS.IS_DIR, "N");
         bw.addMutation(createRequest);
         
         // record existence of new file in parent dir now or on complete?
         Mutation childCreate = new Mutation(new Text(getParentPath(src)));
         // TODO: could store that this is a file in the Value
-        childCreate.put(childrenFam, new Text(src.getBytes()), blank);
+        childCreate.put(FAMILIES.CHILDREN, new Text(src.getBytes()), blank);
         bw.addMutation(childCreate);
       } finally {
         bw.close();
@@ -748,17 +767,17 @@ public class DistributedNamenodeProxy im
     byte[] parent = getParentPath(src);
     
     // determine whether this is a directory
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    isDir.fetch(bs);
-    bs.fetchColumnFamily(childrenFam);
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+    COLUMNS.IS_DIR.fetch(bs);
+    bs.fetchColumnFamily(FAMILIES.CHILDREN);
     
     String isDir_ = null;
     ArrayList<Text> children = new ArrayList<Text>();
     try {
       for (Entry<Key,Value> entry : bs) {
-        if (isDir.hasColumns(entry.getKey())) {
+        if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
           isDir_ = entry.getKey().getColumnQualifier().toString();
-        } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+        } else if (entry.getKey().getColumnFamily().equals(FAMILIES.CHILDREN)) {
           children.add(entry.getKey().getColumnQualifier());
         }
       }
@@ -770,15 +789,15 @@ public class DistributedNamenodeProxy im
     
     Mutation childDelete = new Mutation(new Text(parent));
     Text srcText = new Text(src);
-    childDelete.putDelete(childrenFam, srcText);
-    infoModificationTime.put(childDelete, now());
+    childDelete.putDelete(FAMILIES.CHILDREN, srcText);
+    COLUMNS.MODIFICATION_TIME.put(childDelete, now());
     
     ArrayList<Mutation> deletes = new ArrayList<Mutation>();
     getDeletes(srcText, deletes);
     deletes.add(childDelete);
     
     // delete everything at once
-    BatchWriter nw = createBatchWriter(namespaceTable);
+    BatchWriter nw = createBatchWriter(TABLES.NAMESPACE);
     Set<Text> blocks = new HashSet<Text>();
     try {
       try {
@@ -786,7 +805,7 @@ public class DistributedNamenodeProxy im
         for (Mutation m : deletes) {
           for (ColumnUpdate update : m.getUpdates()) {
             byte cf[] = update.getColumnFamily();
-            if (blocksFam.compareTo(cf, 0, cf.length) == 0) {
+            if (FAMILIES.BLOCKS.compareTo(cf, 0, cf.length) == 0) {
               blocks.add(new Text(new String(update.getColumnQualifier()).split("_", 2)[1]));
             }
           }
@@ -799,20 +818,20 @@ public class DistributedNamenodeProxy im
       log.info("deleting blocks "+ blocks);
       Map<String, List<String>> hostBlockMap = new HashMap<String, List<String>>();
       // Now remove the blocks
-      BatchWriter bw = createBatchWriter(blocksTable);
+      BatchWriter bw = createBatchWriter(TABLES.BLOCKS);
       // scan back the entries that go with the blocks
       List<Range> ranges = new ArrayList<Range>();
       for (Text row : blocks) {
         ranges.add(new Range(row));
       }
-      bs = createBatchScanner(blocksTable, ranges.toArray(new Range[0]));
+      bs = createBatchScanner(TABLES.BLOCKS, ranges.toArray(new Range[0]));
       // delete everything that matches our block list
       for (Entry<Key,Value> entry : bs) {
         if (blocks.contains(entry.getKey().getRow())) {
           Mutation m = new Mutation(entry.getKey().getRow());
           m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
           bw.addMutation(m);
-          if (entry.getKey().getColumnFamily().equals(datanodesFam)) {
+          if (entry.getKey().getColumnFamily().equals(FAMILIES.DATANODES)) {
             String host = entry.getKey().getColumnQualifier().toString();
             String block = entry.getKey().getRow().toString();
             List<String> blockList = null;
@@ -828,7 +847,7 @@ public class DistributedNamenodeProxy im
       log.info("Host -> block map " + hostBlockMap);
 
       // Create commands to remove the blocks on the datanodes at the next heartbeat
-      bw = createBatchWriter(datanodesTable);
+      bw = createBatchWriter(TABLES.DATANODES);
       for (Entry<String,List<String>> entry : hostBlockMap.entrySet()) {
         String host = entry.getKey();
         Block block[] = new Block[entry.getValue().size()];
@@ -838,7 +857,7 @@ public class DistributedNamenodeProxy im
         }
         Mutation m = new Mutation(host);
         DatanodeCommand cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, block);
-        m.put(commandFam, new Text(UUID.randomUUID().toString()), new Value(serialize(cmd)));
+        m.put(FAMILIES.COMMAND, new Text(UUID.randomUUID().toString()), new Value(serialize(cmd)));
         bw.addMutation(m);
       }
       bw.close();
@@ -887,15 +906,15 @@ public class DistributedNamenodeProxy im
     // get blocks from namespace table
     Value fileSizeBytes = null;
     java.util.Map<Text, Value> IDs = new TreeMap<Text, Value>();
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    bs.fetchColumnFamily(blocksFam);
-    bs.fetchColumnFamily(infoFam);
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+    bs.fetchColumnFamily(FAMILIES.BLOCKS);
+    bs.fetchColumnFamily(FAMILIES.INFO);
     try {
       log.info("getting blocks for " + src + " from namespace table");
       for (Entry<Key,Value> entry : bs) {
-        if (infoSize.hasColumns(entry.getKey())) {
+        if (COLUMNS.SIZE.hasColumns(entry.getKey())) {
             fileSizeBytes = entry.getValue();
-        } else if (entry.getKey().getColumnFamily().equals(blocksFam)) {
+        } else if (entry.getKey().getColumnFamily().equals(FAMILIES.BLOCKS)) {
           IDs.put(entry.getKey().getColumnQualifier(), entry.getValue());
         }
       }
@@ -932,19 +951,19 @@ public class DistributedNamenodeProxy im
       log.info("getting host data for block ...");
       long blockSize = 0;
       ArrayList<DatanodeInfo> dni = new ArrayList<DatanodeInfo>();
-      bs = createBatchScanner(blocksTable, new Range(idString));
-      bs.fetchColumnFamily(datanodesFam);
-      infoBlockSize.fetch(bs);
+      bs = createBatchScanner(TABLES.BLOCKS, new Range(idString));
+      bs.fetchColumnFamily(FAMILIES.DATANODES);
+      COLUMNS.BLOCK_SIZE.fetch(bs);
       try {
         for (Entry<Key,Value> entry : bs) {
-          if (infoBlockSize.hasColumns(entry.getKey())) {
+          if (COLUMNS.BLOCK_SIZE.hasColumns(entry.getKey())) {
             blockSize = Long.parseLong(new String(entry.getValue().get()));
             fileLength += blockSize;
             if (blockSize == 0) {
               underConst = true;
             }
             log.info("got size " + blockSize + " for block " + blockIDString);
-          } else if (entry.getKey().getColumnFamily().equals(datanodesFam)) {
+          } else if (entry.getKey().getColumnFamily().equals(FAMILIES.DATANODES)) {
             String host = entry.getKey().getColumnQualifier().toString();
             dni.add(new DatanodeInfo(new DatanodeID(host)));
             log.info("got host: " + new String(host) + " for block " + blockIDString);
@@ -983,7 +1002,28 @@ public class DistributedNamenodeProxy im
   @Override
   public ContentSummary getContentSummary(String path) throws IOException {
     log.info("using getContentSummary");
-    return null;
+    if (!path.endsWith("/"))
+      path += "/";
+    long summary[] = {0, 0, 1};
+    Text endRange = new Text(path);
+    endRange.append(new byte[]{(byte)0xff}, 0, 1);
+    Range range = new Range(new Text(path), true, endRange, false);
+    Scanner scanner = createScanner(conn, TABLES.NAMESPACE, range);
+    COLUMNS.SIZE.fetch(scanner);
+    COLUMNS.IS_DIR.fetch(scanner);
+    for (Entry<Key,Value> entry : scanner) {
+      Key key = entry.getKey();
+      if (COLUMNS.SIZE.hasColumns(key)) {
+        summary [0] += Long.parseLong(entry.getValue().toString());
+      }
+      if (COLUMNS.IS_DIR.hasColumns(key)) {
+        if ("Y".equals(entry.getValue().toString()))
+          summary[2]++;
+        else
+          summary[1]++;
+      }
+    }
+    return new ContentSummary(summary[0], summary[1], summary[2]);
   }
   
   @Override
@@ -1010,12 +1050,12 @@ public class DistributedNamenodeProxy im
   private void getDeletes(Text src, List<Mutation> deletes) throws IOException {
     
     // Maybe this list won't fit in memory?
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
     Mutation m = new Mutation(src);
     try {
       for (Entry<Key,Value> entry : bs) {
         Text columnFamily = entry.getKey().getColumnFamily();
-        if (columnFamily.equals(childrenFam)) {
+        if (columnFamily.equals(FAMILIES.CHILDREN)) {
           getDeletes(entry.getKey().getColumnQualifier(), deletes);
         }
         log.info("deleting " + src + " " + entry.getKey().getColumnFamily() + ":" + entry.getKey().getColumnQualifier());
@@ -1032,7 +1072,7 @@ public class DistributedNamenodeProxy im
   @Override
   public HdfsFileStatus getFileInfo(String src) throws IOException {
     log.info("using getFileInfo " + src);
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(src));
     try {
       return loadFileStatus(src, bs.iterator());
     } finally {
@@ -1051,17 +1091,17 @@ public class DistributedNamenodeProxy im
     ArrayList<HdfsFileStatus> files = new ArrayList<HdfsFileStatus>();
     
     String isDirFlag = null;
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    bs.fetchColumnFamily(childrenFam);
-    isDir.fetch(bs);
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+    bs.fetchColumnFamily(FAMILIES.CHILDREN);
+    COLUMNS.IS_DIR.fetch(bs);
     List<String> children = new ArrayList<String>();
     try {
       for (Entry<Key,Value> entry : bs) {
         log.info("Looking at entry " + entry.getKey() + " -> " + entry.getValue());
         String value = new String(entry.getValue().get());
-        if (isDir.hasColumns(entry.getKey())) {
+        if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
           isDirFlag = value;
-        } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+        } else if (entry.getKey().getColumnFamily().equals(FAMILIES.CHILDREN)) {
           children.add(entry.getKey().getColumnQualifier().toString());
         }
       }
@@ -1076,7 +1116,7 @@ public class DistributedNamenodeProxy im
     
     log.info("Looking at children " + children);
     for (String child : children) {
-      bs = createBatchScanner(namespaceTable, new Range(child));
+      bs = createBatchScanner(TABLES.NAMESPACE, new Range(child));
       try {
         HdfsFileStatus stat = loadFileStatus(child, bs.iterator());
         files.add(stat);
@@ -1084,7 +1124,6 @@ public class DistributedNamenodeProxy im
         bs.close();
       }
     }
-    log.info("files " + files);
     return new DirectoryListing(files.toArray(new HdfsFileStatus[files.size()]), 0);
   }
   
@@ -1121,17 +1160,17 @@ public class DistributedNamenodeProxy im
       Entry<Key, Value> entry = fileResult.next();
       log.info("looking at file data " + entry.getKey() + " -> " + entry.getValue());
       String value = new String(entry.getValue().get());
-      if (isDir.hasColumns(entry.getKey())) {
+      if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
         isDirFlag = value;
-      } else if (infoSize.hasColumns(entry.getKey())) {
+      } else if (COLUMNS.SIZE.hasColumns(entry.getKey())) {
         length = Long.parseLong(value);
-      } else if (infoReplication.hasColumns(entry.getKey())) {
+      } else if (COLUMNS.REPLICATION.hasColumns(entry.getKey())) {
         block_replication = Integer.parseInt(value);
-      } else if (infoBlockSize.hasColumns(entry.getKey())) {
+      } else if (COLUMNS.BLOCK_SIZE.hasColumns(entry.getKey())) {
         blocksize = Long.parseLong(value);
-      } else if (infoModificationTime.hasColumns(entry.getKey())) {
+      } else if (COLUMNS.MODIFICATION_TIME.hasColumns(entry.getKey())) {
         modification_time = Long.parseLong(value);
-      } else if (infoPermission.hasColumns(entry.getKey())) {
+      } else if (COLUMNS.PERMISSION.hasColumns(entry.getKey())) {
         permissionString = new String(entry.getValue().get());
       }
       row = entry.getKey().getRow();
@@ -1143,6 +1182,7 @@ public class DistributedNamenodeProxy im
     
     boolean isdir = isDirFlag.equals("Y");
     FsPermission permission = FsPermission.getDefault();
+    log.info("permission string " + permissionString);
     if (permissionString != null) {
       permission = FsPermission.valueOf((isdir ? "d":"-") + permissionString);
     }
@@ -1172,11 +1212,11 @@ public class DistributedNamenodeProxy im
     // TODO: check permissions
     
     src = normalizePath(src);
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
-    isDir.fetch(bs);
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+    COLUMNS.IS_DIR.fetch(bs);
     try {
       for (Entry<Key,Value> entry : bs) {
-        if (isDir.hasColumns(entry.getKey())) {
+        if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
           if (new String(entry.getValue().get()).equals("Y"))
             return true;
           else
@@ -1191,15 +1231,15 @@ public class DistributedNamenodeProxy im
     // verify parent path exists
     byte[] parentPath = getParentPath(src);
     
-    bs = createBatchScanner(namespaceTable, new Range(new Text(parentPath)));
-    isDir.fetch(bs);
-    bs.fetchColumnFamily(childrenFam);
+    bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(parentPath)));
+    COLUMNS.IS_DIR.fetch(bs);
+    bs.fetchColumnFamily(FAMILIES.CHILDREN);
     String isDirString = null;
     try {
       for (Entry<Key,Value> entry : bs) {
-        if (isDir.hasColumns(entry.getKey())) {
+        if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
           isDirString = new String(entry.getValue().get());
-        } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+        } else if (entry.getKey().getColumnFamily().equals(FAMILIES.CHILDREN)) {
         }
       }
     } finally {
@@ -1212,16 +1252,16 @@ public class DistributedNamenodeProxy im
       throw new IOException("error: parent " + src + " is not a directory");
     
     // edit namespace
-    BatchWriter bw = createBatchWriter(namespaceTable);
+    BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
     Mutation m = new Mutation(new Text(parentPath));
-    m.put(childrenFam, new Text(src), blank);
+    m.put(FAMILIES.CHILDREN, new Text(src), blank);
     //String dirName = getDirName(src);
     try {
       try {
         bw.addMutation(m);
         m = new Mutation(new Text(src));
-        isDir.put(m, new Value("Y".getBytes()));
-        infoModificationTime.put(m, new Value(Long.toString(System.currentTimeMillis()).getBytes()));
+        COLUMNS.IS_DIR.put(m, new Value("Y".getBytes()));
+        COLUMNS.MODIFICATION_TIME.put(m, now());
         bw.addMutation(m);
       } finally {
         bw.close();
@@ -1254,19 +1294,19 @@ public class DistributedNamenodeProxy im
       
       Mutation blockData = new Mutation(new Text(blockIDBytes));
       for(int i=0; i < hosts.length; i++)
-        blockData.put(datanodesFam, new Text(hosts[i].name.getBytes()), blank);
-      BatchWriter bw = createBatchWriter(blocksTable);
+        blockData.put(FAMILIES.DATANODES, new Text(hosts[i].name.getBytes()), blank);
+      BatchWriter bw = createBatchWriter(TABLES.BLOCKS);
       try {
         bw.addMutation(blockData);
       } finally {
         bw.close();
       }
       
-      bw = createBatchWriter(datanodesTable);
+      bw = createBatchWriter(TABLES.DATANODES);
       try {
         for(int i=0; i < hosts.length; i++) {
           Mutation host = new Mutation(new Text(hosts[i].name));
-          host.put(blocksFam, new Text(blockIDBytes), blank);
+          host.put(FAMILIES.BLOCKS, new Text(blockIDBytes), blank);
           bw.addMutation(host);
         }
       } finally {
@@ -1297,9 +1337,9 @@ public class DistributedNamenodeProxy im
     // record this datanode's info
     try {
       if (conn != null) {
-        BatchWriter bw = createBatchWriter(datanodesTable);
+        BatchWriter bw = createBatchWriter(TABLES.DATANODES);
         Mutation reg = new Mutation(new Text(registration.name.getBytes()));
-        infoStorageID.put(reg, new Value(registration.storageID.getBytes()));
+        COLUMNS.STORAGE_ID.put(reg, new Value(registration.storageID.getBytes()));
         try {
           try {
             bw.addMutation(reg);
@@ -1328,9 +1368,9 @@ public class DistributedNamenodeProxy im
   
   FileStatus getFileStatus(String src) throws IOException {
     FileStatus result = new FileStatus(false, false);
-    BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
+    BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(src));
     try {
-      isDir.fetch(bs);
+      COLUMNS.IS_DIR.fetch(bs);
       for (Entry<Key,Value> entry : bs) {
         result.exists = true;
         if (new String(entry.getValue().get()).equals("Y")) {
@@ -1369,8 +1409,8 @@ public class DistributedNamenodeProxy im
       if (dstStatus.exists)
         delete(dst, true);
       // copy file information
-      BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
-      BatchWriter bw = createBatchWriter(namespaceTable);
+      BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(src));
+      BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
       try {
         Mutation m = new Mutation(new Text(dst));
         Mutation s = new Mutation(new Text(src));
@@ -1383,11 +1423,11 @@ public class DistributedNamenodeProxy im
         bw.addMutation(s);
         // remove child link in src's parent
         m = new Mutation(new Text(getParentPath(src)));
-        m.putDelete(childrenFam, new Text(src));
+        m.putDelete(FAMILIES.CHILDREN, new Text(src));
         bw.addMutation(m);
         // add child link in dst's parent
         m = new Mutation(new Text(getParentPath(dst)));
-        m.put(childrenFam, new Text(dst), blank);
+        m.put(FAMILIES.CHILDREN, new Text(dst), blank);
       } finally {
         bs.close();
         bw.close();
@@ -1440,7 +1480,7 @@ public class DistributedNamenodeProxy im
         SendResult result = new SendResult();
         
         log.info("using sendHeartbeat");
-        if (!conn.tableOperations().exists(datanodesTable))
+        if (!conn.tableOperations().exists(TABLES.DATANODES))
           return result;
         // update datanodes table with info
         // skip this if none of the numbers have changed
@@ -1449,12 +1489,12 @@ public class DistributedNamenodeProxy im
             dfsUsed != lastDfsUsed ||
             remaining != lastRemaining) {
           try {
-            BatchWriter bw = createBatchWriter(conn, datanodesTable);
+            BatchWriter bw = createBatchWriter(conn, TABLES.DATANODES);
             Mutation m = new Mutation(new Text(registration.name.getBytes()));
-            infoCapacity.put(m, new Value(Long.toString(capacity).getBytes()));
-            infoUsed.put(m, new Value(Long.toString(dfsUsed).getBytes()));
-            infoIpcPort.put(m, new Value(Integer.toString(registration.getIpcPort()).getBytes()));
-            DistributedNamenodeProxy.remaining.put(m, new Value(Long.toString(remaining).getBytes()));
+            COLUMNS.CAPACITY.put(m, new Value(Long.toString(capacity).getBytes()));
+            COLUMNS.USED.put(m, new Value(Long.toString(dfsUsed).getBytes()));
+            COLUMNS.IPC_PORT.put(m, new Value(Integer.toString(registration.getIpcPort()).getBytes()));
+            COLUMNS.REMAINING.put(m, new Value(Long.toString(remaining).getBytes()));
             try {
               bw.addMutation(m);
             } finally {
@@ -1470,8 +1510,8 @@ public class DistributedNamenodeProxy im
         // return a list of commands for the data node
         List<DatanodeCommand> commands = new ArrayList<DatanodeCommand>();
         try {
-          BatchScanner bs = createBatchScanner(datanodesTable, new Range(registration.getName()));
-          bs.fetchColumnFamily(commandFam);
+          BatchScanner bs = createBatchScanner(TABLES.DATANODES, new Range(registration.getName()));
+          bs.fetchColumnFamily(FAMILIES.COMMAND);
           for (Entry<Key,Value> entry : bs) {
             Key key = entry.getKey();
             DatanodeCommand command = (DatanodeCommand)deserialize(entry.getValue().get());
@@ -1500,8 +1540,10 @@ public class DistributedNamenodeProxy im
     }
     try {
       if (future.isDone()) {
+        // Ok, we were able to do a heartbeat, so go ahead and delete the commands we're about to return
+        // TODO: this could fail, too.
         SendResult result = future.get();
-        BatchWriter bw = createBatchWriter(datanodesTable);
+        BatchWriter bw = createBatchWriter(TABLES.DATANODES);
         bw.addMutations(result.deletes);
         bw.close();
         return result.commands.toArray(new DatanodeCommand[0]);
@@ -1555,10 +1597,10 @@ public class DistributedNamenodeProxy im
     try {
       HdfsFileStatus fileInfo = getFileInfo(src);
       if (!fileInfo.getPermission().equals(permission)) {
-        BatchWriter bw = createBatchWriter(namespaceTable);
+        BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
         try {
           Mutation m = new Mutation(src);
-          infoPermission.put(m, new Value(permission.toString().getBytes()));
+          COLUMNS.PERMISSION.put(m, new Value(permission.toString().getBytes()));
           bw.addMutation(m);
         } finally {
           bw.close();
@@ -1578,8 +1620,28 @@ public class DistributedNamenodeProxy im
   @Override
   public boolean setReplication(String src, short replication)
       throws IOException {
-    unimplemented(src, replication);
-    return false;
+    log.info("using setReplicatoon");
+    try {
+      HdfsFileStatus fileInfo = getFileInfo(src);
+      if (fileInfo.isDir())
+        return false;
+      
+      if (fileInfo.getReplication() != replication) {
+        BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
+        try {
+          Mutation m = new Mutation(src);
+          COLUMNS.REPLICATION.put(m, new Value(Short.toString(replication).getBytes()));
+          bw.addMutation(m);
+        } finally {
+          bw.close();
+        }
+      }
+      return true;
+    } catch (FileNotFoundException ex) {
+      return false;
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
   }
   
   @Override

Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java?rev=1383487&r1=1383486&r2=1383487&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java (original)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java Tue Sep 11 17:30:11 2012
@@ -62,11 +62,17 @@ import com.netflix.curator.framework.Cur
 import com.netflix.curator.framework.CuratorFrameworkFactory.Builder;
 import com.netflix.curator.retry.RetryUntilElapsed;
 
+// Provide a limited NameNode interface that stores data into zookeeper; 
+// Lazily create and redirect requests to non-metadata files to the Accumulo-based NameNode.
 public class ZookeeperNameNode implements FakeNameNode {
   static private Logger log = Logger.getLogger(ZookeeperNameNode.class); 
   
-  CuratorFramework keeper;
-  Random random = new Random();
+  private final Configuration conf;
+  private final CuratorFramework keeper;
+  private final Random random = new Random();
+  private final String instance;
+
+  private DistributedNamenodeProxy dist = null;
   
   public static class FileInfo implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -85,6 +91,7 @@ public class ZookeeperNameNode implement
     public long size;
   }
   
+  // An object to store at a zookeeper node that represents a directory's metadata
   public static class DirInfo implements Serializable {
     private static final long serialVersionUID = 1L;
     
@@ -93,8 +100,10 @@ public class ZookeeperNameNode implement
     }
 
     public long createTime;
+    public String permission = "rwxrwxrwx";
   }
   
+  // An object to store at a zookeeeper node that represents block metadata
   public static class BlockInfo implements Serializable {
     private static final long serialVersionUID = 1L;
     
@@ -110,23 +119,20 @@ public class ZookeeperNameNode implement
     boolean complete;
   }
   
-  static Pattern isRoot = Pattern.compile("/accumulo(|/instance_id.*|/version.*|/walogArchive|/wal(/.*|$)|/recovery.*|/tables$|/tables/(\\!0|0|1|2)(/.*|$))");
+  // Metadata for these files are stored in zookeeper
+  static Pattern metaDataFileNames = Pattern.compile("/accumulo(|/instance_id.*|/version.*|/walogArchive|/wal(/.*|$)|/recovery.*|/tables$|/tables/(\\!0|\\!1|\\!2|\\!3)(/.*|$))");
   
   static private boolean isZooName(String path) {
-    boolean result = isRoot.matcher(path).matches();
+    boolean result = metaDataFileNames.matcher(path).matches();
     log.info("Looking at " + path + " isZooName " + result);
     return result;
   }
+  
+  // By convention, blockIds tracked in zookeeper are negative
   static public boolean isZooBlockId(long blockId) {
     return blockId < 0;
   }
   
-  private final URI uri;
-  private DistributedNamenodeProxy dist = null;
-  private final String instance;
-
-  private long start = System.currentTimeMillis();
-  
   private static URI getURI(Configuration conf) throws IOException {
     try {
       return new URI(conf.get("fs.default.name"));
@@ -140,16 +146,18 @@ public class ZookeeperNameNode implement
   }
 
   public ZookeeperNameNode(Configuration conf, URI uri) throws IOException {
-    ConnectInfo info = new ConnectInfo(uri);
+    this.conf = conf;
+    ConnectInfo info = new ConnectInfo(conf);
     instance = info.instance;
     Builder builder = CuratorFrameworkFactory.builder();
     builder.connectString(info.zookeepers);
+    // TODO: configure timeout to zookeeper
+    // TODO: get constant configuration 
     builder.retryPolicy(new RetryUntilElapsed(120*1000, 500));
     //builder.aclProvider(aclProvider);
     CuratorFramework client = builder.build();
     client.start();
     this.keeper = client;
-    this.uri = uri;
     try {
       findDatanodes();
     } catch (Exception e) {
@@ -157,12 +165,13 @@ public class ZookeeperNameNode implement
     }
   }
   
-  private static void unimplemented(Object ... args) {
+  private static void notImplementedWarning(Object ... args) {
     Throwable t = new Throwable();
     String method = t.getStackTrace()[1].getMethodName();
     log.warn(method + " unimplemented, args: " + Arrays.asList(args), t);
   }
   
+  // Create the truly distributed namenode connection, hopefully enough datanodes and tservers are running 
   private FakeNameNode dist() {
     try {
       if (dist == null) {
@@ -179,7 +188,7 @@ public class ZookeeperNameNode implement
           }
         }
         if (atLeastOneTserver)
-          dist = new DistributedNamenodeProxy(keeper, uri);
+          dist = new DistributedNamenodeProxy(keeper, conf);
       }
     } catch (Exception ex) {
       log.warn(ex, ex);
@@ -276,7 +285,8 @@ public class ZookeeperNameNode implement
         byte[] current = keeper.getData().forPath(path);
         log.info("Current value for " + src + " is " + new Text(current));
         if (overwrite) {
-          keeper.setData().forPath(path, data);
+          delete(src, true);
+          create(src, masked, clientName, overwrite, createParent, replication, blockSize);
         } else {
           throw new FileAlreadyExistsException(src);
         }
@@ -304,24 +314,65 @@ public class ZookeeperNameNode implement
   
   @Override
   public boolean recoverLease(String src, String clientName) throws IOException {
-    unimplemented(src, clientName);
+    notImplementedWarning(src, clientName);
     return true;
   }
   
   @Override
   public boolean setReplication(String src, short replication) throws IOException {
-    unimplemented(src, replication);
-    return true;
+    if (!isZooName(src))
+      return dist().setReplication(src, replication);
+    try {
+      String path = DNNConstants.NAMESPACE_PATH + src;
+      byte[] data = keeper.getData().forPath(path);
+      Object obj = deserialize(data);
+      if (obj == null) {
+        obj = new DirInfo(System.currentTimeMillis());
+      }
+      if (obj instanceof FileInfo) {
+        FileInfo info = (FileInfo)obj;
+        info.replication = replication;
+        keeper.setData().forPath(path, serialize(obj));
+        return true;
+      }
+      return false;
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
   }
   
   @Override
   public void setPermission(String src, FsPermission permission) throws IOException {
-    unimplemented(src, permission);
+    if (!isZooName(src)) {
+      dist().setPermission(src, permission);
+      return;
+    }
+    try {
+      String path = DNNConstants.NAMESPACE_PATH + src;
+      byte[] data = keeper.getData().forPath(path);
+      Object obj = deserialize(data);
+      if (obj == null) {
+        obj = new DirInfo(System.currentTimeMillis());
+      }
+      if (obj instanceof FileInfo) {
+        FileInfo info = (FileInfo)obj;
+        info.permission = permission.toString();
+        obj = info;
+      }
+      if (obj instanceof DirInfo) {
+        DirInfo info = (DirInfo)obj;
+        info.permission = permission.toString();
+        obj = info;
+      }
+      keeper.setData().forPath(path, serialize(obj));
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
   }
   
   @Override
   public void setOwner(String src, String username, String groupname) throws IOException {
-    unimplemented(src, username, groupname);
+    notImplementedWarning(src, username, groupname);
   }
   
   @Override
@@ -373,7 +424,7 @@ public class ZookeeperNameNode implement
       throw new IOException(e);
     }
     
-    short defaultReplication = 3; // TODO: read config
+    short defaultReplication = (short)conf.getInt("dfs.replication", 3);
     short replication = -1;
     try {
       String path = DNNConstants.NAMESPACE_PATH + src;
@@ -391,6 +442,8 @@ public class ZookeeperNameNode implement
     
     // DistibutedNameNode holds the positive blocks
     long blockID = -Math.abs(random.nextLong());
+    // probably never happen
+    if (blockID == 0) blockID = -1;
     Block b = new Block(blockID, 0, 0);
     List<String> replicas = randomList.subList(0, Math.min(replication, randomList.size()));
     List<DatanodeInfo> targets = new ArrayList<DatanodeInfo>();
@@ -472,7 +525,7 @@ public class ZookeeperNameNode implement
   
   @Override
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
-    unimplemented((Object[])blocks);
+    notImplementedWarning((Object[])blocks);
   }
 
   private Object getInfo(String path) throws Exception {
@@ -566,7 +619,6 @@ public class ZookeeperNameNode implement
     } catch (KeeperException.NoNodeException ex) {
       return;
     }
-    log.info("children of " + path + " is " + children);
     Object obj = deserialize(keeper.getData().forPath(path));
     if (removeBlocks && obj instanceof FileInfo) {
       // create the datanode command to (eventually) delete the blocks
@@ -691,57 +743,57 @@ public class ZookeeperNameNode implement
   
   @Override
   public long[] getStats() throws IOException {
-    unimplemented();
+    notImplementedWarning();
     return null;
   }
   
   @Override
   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) throws IOException {
-    unimplemented(type);
+    notImplementedWarning(type);
     return null;
   }
   
   @Override
   public long getPreferredBlockSize(String filename) throws IOException {
-    unimplemented(filename);
+    notImplementedWarning(filename);
     return 0;
   }
   
   @Override
   public boolean setSafeMode(SafeModeAction action) throws IOException {
-    unimplemented(action);
+    notImplementedWarning(action);
     return false;
   }
   
   @Override
   public void saveNamespace() throws IOException {
-   unimplemented();
+   notImplementedWarning();
   }
   
   @Override
   public void refreshNodes() throws IOException {
-    unimplemented();
+    notImplementedWarning();
   }
   
   @Override
   public void finalizeUpgrade() throws IOException {
-    unimplemented();
+    notImplementedWarning();
   }
   
   @Override
   public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException {
-    unimplemented();
+    notImplementedWarning();
     return null;
   }
   
   @Override
   public void metaSave(String filename) throws IOException {
-    unimplemented(filename);
+    notImplementedWarning(filename);
   }
   
   @Override
   public void setBalancerBandwidth(long bandwidth) throws IOException {
-    unimplemented(bandwidth);
+    notImplementedWarning(bandwidth);
   }
   
   @Override
@@ -767,47 +819,79 @@ public class ZookeeperNameNode implement
     return null;
   }
   
+  static private class Summary {
+    long length = 0;
+    long fileCount = 0;
+    long directoryCount = 0;
+  }
+  
   @Override
   public ContentSummary getContentSummary(String path) throws IOException {
-    unimplemented(path);
-    return null;
+    Summary result = new Summary();
+    DirectoryListing listing = this.getListing(path, null);
+    for (HdfsFileStatus child : listing.getPartialListing()) {
+      if (isZooName(child.getFullName(path))) {
+        zooRecurse(child.getFullName(path), result);
+      } else {
+        ContentSummary dnnSummary = dist.getContentSummary(path);
+        result.length += dnnSummary.getLength();
+        result.fileCount += dnnSummary.getFileCount();
+        result.directoryCount += dnnSummary.getDirectoryCount();
+      }
+    }
+    return new ContentSummary(result.length, result.fileCount, result.directoryCount);
+  }
+  
+  private void zooRecurse(String src, Summary summary) throws IOException {
+    HdfsFileStatus result = getFileInfo(src);
+    if (result.isDir()) {
+      summary.directoryCount++;
+      for (HdfsFileStatus child : getListing(src, null).getPartialListing()) {
+        zooRecurse(child.getFullName(src), summary);
+      }
+      return;
+    }
+    summary.fileCount++;
+    summary.length += result.getLen(); 
   }
   
   @Override
   public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {
-    unimplemented(path, namespaceQuota, diskspaceQuota);
+    notImplementedWarning(path, namespaceQuota, diskspaceQuota);
   }
   
   @Override
   public void fsync(String src, String client) throws IOException {
-    unimplemented(src, client);
+    FakeNameNode dist = dist();
+    if (dist != null)
+      dist.fsync(src, client);
   }
   
   @Override
   public void setTimes(String src, long mtime, long atime) throws IOException {
-    unimplemented(src, mtime, atime);
+    notImplementedWarning(src, mtime, atime);
   }
   
   @Override
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException {
-    unimplemented(renewer);
+    notImplementedWarning(renewer);
     return null;
   }
   
   @Override
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException {
-    unimplemented(token);
+    notImplementedWarning(token);
     return 0;
   }
   
   @Override
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException {
-    unimplemented(token);
+    notImplementedWarning(token);
   }
   
   @Override
   public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-    unimplemented(protocol, clientVersion);
+    notImplementedWarning(protocol, clientVersion);
     return 0;
   }
   
@@ -815,6 +899,7 @@ public class ZookeeperNameNode implement
   public DatanodeRegistration register(DatanodeRegistration registration) throws IOException {
     log.info("register " + registration);
     if (keeper != null) {
+      // TODO: don't need to *always* update zookeeper, right?
       log.info("registering in zookeeper as " + registration.name);
       ByteArrayOutputStream stream = new ByteArrayOutputStream();
       DataOutputStream data = new DataOutputStream(stream);
@@ -894,7 +979,7 @@ public class ZookeeperNameNode implement
   
   @Override
   public void blocksBeingWrittenReport(DatanodeRegistration registration, long[] blocks) throws IOException {
-    unimplemented(registration, new BlockListAsLongs(blocks));
+    notImplementedWarning(registration, new BlockListAsLongs(blocks));
   }
   
   @Override
@@ -925,7 +1010,7 @@ public class ZookeeperNameNode implement
   
   @Override
   public void errorReport(DatanodeRegistration registration, int errorCode, String msg) throws IOException {
-    unimplemented(registration, errorCode, msg);
+    notImplementedWarning(registration, errorCode, msg);
   }
   
   @Override
@@ -934,26 +1019,25 @@ public class ZookeeperNameNode implement
     // TODO: find out how to get namespace id
     // could store this in the info of the / entry
     NamespaceInfo nsi = new NamespaceInfo(384837986, 0, 0);
-    //throw new RuntimeException();
     return nsi;
   }
   
   @Override
   public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
-    unimplemented(comm);
+    notImplementedWarning(comm);
     return null;
   }
   
   @Override
   public long nextGenerationStamp(Block block, boolean fromNN) throws IOException {
-    unimplemented(block, fromNN);
+    notImplementedWarning(block, fromNN);
     return 0;
   }
   
   @Override
   public void commitBlockSynchronization(Block block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
       throws IOException {
-    unimplemented(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
+    notImplementedWarning(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
   }
   
 }



Mime
View raw message