accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1371432 [4/4] - in /accumulo/branches/ACCUMULO-722/distnn: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/hadoop/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop...
Date Thu, 09 Aug 2012 20:21:33 GMT
Added: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java?rev=1371432&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
(added)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
Thu Aug  9 20:21:32 2012
@@ -0,0 +1,791 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DNNConstants;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import com.netflix.curator.framework.CuratorFramework;
+
+public class ZookeeperNameNode implements FakeNameNode {
+  static private Logger log = Logger.getLogger(ZookeeperNameNode.class); 
+  
+  CuratorFramework keeper;
+  Random random = new Random();
+  
+  public static class FileInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+    
+    public FileInfo(long blocksize, long createTime, String permission, short replication,
long size) {
+      this.blocksize = blocksize;
+      this.createTime = createTime;
+      this.permission = permission;
+      this.replication = replication;
+      this.size = size;
+    }
+    public long blocksize;
+    public long createTime;
+    public String permission;
+    public short replication;
+    public long size;
+  }
+  
+  public static class DirInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+    
+    public DirInfo(long createTime) {
+      this.createTime = createTime;
+    }
+
+    public long createTime;
+  }
+  
+  public static class BlockInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+    
+    public BlockInfo(long id, String[] datanodes, long size) {
+      this.id = id;
+      this.datanodes = datanodes;
+      this.size = size;
+      this.complete = false;
+    }
+    long id;
+    String[] datanodes;
+    long size;
+    boolean complete;
+  }
+  
+  public ZookeeperNameNode(CuratorFramework client) {
+    this.keeper = client;
+  }
+  
+  private static void unimplemented(Object ... args) {
+    Throwable t = new Throwable();
+    String method = t.getStackTrace()[1].getMethodName();
+    log.warn(method + " unimplemented, args: " + Arrays.asList(args), t);
+  }
+  
+  @Override
+  public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException
{
+    log.info("getBlockLocation " + src + " offset " + offset + " length " + length);
+    try {
+      Map<String, BlockInfo> blocks = new TreeMap<String, BlockInfo>();
+      String blockpath = DNNConstants.NAMESPACE_PATH + src;
+      for (String child : keeper.getChildren().forPath(blockpath)) {
+        byte[] data = keeper.getData().forPath(blockpath + "/" + child);
+        Object obj = deserialize(data);
+        if (obj instanceof BlockInfo) {
+          BlockInfo info = (BlockInfo)obj;
+          data = keeper.getData().forPath(DNNConstants.BLOCKS_PATH + "/" + new Block(info.id).getBlockName());
+          obj = deserialize(data);
+          if (obj instanceof BlockInfo) {
+            info = (BlockInfo)obj;
+            blocks.put(child, info);
+            log.info(src + " block " + info.id + " size " + info.size);
+          }
+        }
+      }
+      log.info("Got " + blocks.size() + " blocks for " + src);
+      List<LocatedBlock> lblocks = new ArrayList<LocatedBlock>();
+      long currentOffset = 0;
+      for (Entry<String,BlockInfo> entry : blocks.entrySet()) {
+        BlockInfo binfo = entry.getValue();
+        DatanodeInfo[] info = new DatanodeInfo[binfo.datanodes.length];
+        for (int j = 0; j < info.length; j++) {
+          info[j] = new DatanodeInfo(new DatanodeID(binfo.datanodes[j]));
+        }
+        log.info("Found " + entry.getKey() + " "+ info.length + " locations for block " +
binfo.id);
+        if (currentOffset >= offset && currentOffset < offset + length)
+          lblocks.add(new LocatedBlock(new Block(binfo.id, binfo.size, 0), info, currentOffset));
+        currentOffset += binfo.size;
+      }
+      log.info("Returning fileLength " + currentOffset + " for " + src);
+      return new LocatedBlocks(currentOffset, lblocks, false);
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  public static Object deserialize(byte[] data) throws IOException, ClassNotFoundException
{
+    if (data.length == 0)
+      return new DirInfo(System.currentTimeMillis());
+    ByteArrayInputStream streamer = new ByteArrayInputStream(data);
+    ObjectInputStream deserializer = new ObjectInputStream(streamer);
+    try{ 
+      return deserializer.readObject();
+    } finally {
+      deserializer.close();
+    }
+  }
+  
+  public static byte[] serialize(Object obj) throws IOException {
+    ByteArrayOutputStream streamer = new ByteArrayOutputStream();
+    ObjectOutputStream serializer = new ObjectOutputStream(streamer);
+    serializer.writeObject(obj);
+    serializer.close();
+    return streamer.toByteArray();
+  }
+  
+  @Override
+  public void create(String src, FsPermission masked, String clientName, boolean overwrite,
boolean createParent, short replication, long blockSize)
+      throws IOException {
+    log.info("creating " + src);
+    try {
+      FileInfo fileInfo = new FileInfo(blockSize, System.currentTimeMillis(), masked.toString(),
replication, 0);
+      byte[] data = serialize(fileInfo);
+      String path = DNNConstants.NAMESPACE_PATH + src;
+      try {
+        byte[] current = keeper.getData().forPath(path);
+        log.info("Current value for " + src + " is " + new Text(current));
+        if (overwrite) {
+          keeper.setData().forPath(path, data);
+        } else {
+          throw new FileAlreadyExistsException(src);
+        }
+      } catch (KeeperException.NoNodeException node) {
+        if (createParent) {
+          keeper.create().creatingParentsIfNeeded().forPath(path, data);
+        } else {
+          keeper.create().forPath(path, data);
+        }
+      }
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  @Override
+  public void create(String src, FsPermission masked, String clientName, boolean overwrite,
short replication, long blockSize) throws IOException {
+    create(src, masked, clientName, overwrite, true, replication, blockSize);
+  }
+  
+  @Override
+  public LocatedBlock append(String src, String clientName) throws IOException {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public boolean recoverLease(String src, String clientName) throws IOException {
+    unimplemented(src, clientName);
+    return true;
+  }
+  
+  @Override
+  public boolean setReplication(String src, short replication) throws IOException {
+    unimplemented(src, replication);
+    return true;
+  }
+  
+  @Override
+  public void setPermission(String src, FsPermission permission) throws IOException {
+    unimplemented(src, permission);
+  }
+  
+  @Override
+  public void setOwner(String src, String username, String groupname) throws IOException
{
+    unimplemented(src, username, groupname);
+  }
+  
+  @Override
+  public void abandonBlock(Block b, String src, String holder) throws IOException {
+    unimplemented(b, src, holder);    
+  }
+  
+  @Override
+  public LocatedBlock addBlock(String src, String clientName) throws IOException {
+    return addBlock(src, clientName, new DatanodeInfo[]{});
+  }
+  
+  @Override
+  public LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludedNodes)
throws IOException {
+    // get the list of online data nodes
+    Map<String, DatanodeRegistration> nodes;
+    try {
+      nodes = findDatanodes();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    int replication = 1;
+    
+    if(nodes.size() < replication)
+      throw new IOException("unable to achieve required replication: too few datanodes running");
+    
+    List<String> randomList = new ArrayList<String>(nodes.keySet());
+    Collections.shuffle(randomList);
+    
+    // DistibutedNameNode holds the positive blocks
+    long blockID = -Math.abs(random.nextLong());
+    Block b = new Block(blockID, 0, 0);
+    List<String> replicas = randomList.subList(0, Math.min(replication, randomList.size()));
+    List<DatanodeInfo> targets = new ArrayList<DatanodeInfo>();
+    for (String replica : replicas) {
+      targets.add(new DatanodeInfo(new DatanodeID(replica)));
+    }
+    recordBlock(src, b, targets);
+    LocatedBlock newBlock = new LocatedBlock(b, targets.toArray(new DatanodeInfo[targets.size()]));
+    log.info("added block " + b + " on " + targets + " for " + src);
+    return newBlock;
+  }
+
+  private void recordBlock(String src, Block b, List<DatanodeInfo> targets) throws
IOException {
+    try {
+      String[] datanodes = new String[targets.size()];
+      for (int i = 0; i < targets.size(); i++) {
+        datanodes[i] = targets.get(i).name;
+      }
+      BlockInfo blockInfo = new BlockInfo(b.getBlockId(), datanodes, 0);
+      byte[] data = serialize(blockInfo);
+      for (String datanode : datanodes) {
+        keeper.create().forPath(DNNConstants.DATANODES_PATH + "/" + datanode + "/blocks/"
+ b.getBlockName(), data);
+      }
+      keeper.create().forPath(DNNConstants.BLOCKS_PATH + "/" + b.getBlockName(), data);
+      keeper.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(DNNConstants.NAMESPACE_PATH
+ src + "/blocks-", data);
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  private Map<String, DatanodeRegistration> findDatanodes() throws Exception {
+    List<String> children = keeper.getChildren().forPath(DNNConstants.DATANODES_PATH);
+    Map<String, DatanodeRegistration> nodes = new HashMap<String, DatanodeRegistration>();
+    for (String child : children) {
+      byte[] data = keeper.getData().forPath(DNNConstants.DATANODES_PATH + "/" + child);
+      ByteArrayInputStream bais = new ByteArrayInputStream(data);
+      DataInputStream ds = new DataInputStream(bais);
+      DatanodeRegistration registration = new DatanodeRegistration();
+      registration.readFields(ds);
+      nodes.put(child, registration);
+    }
+    return nodes;
+  }
+  
+  @Override
+  public boolean complete(String src, String clientName) throws IOException {
+    log.info("using complete " + src);
+    String path = DNNConstants.NAMESPACE_PATH + src;
+    while (true) {
+      try {
+        boolean retry = false;
+        long length = 0;
+        for (String child : keeper.getChildren().forPath(path)) {
+          Object object = deserialize(keeper.getData().forPath(path + "/" + child));
+          if (object instanceof BlockInfo) {
+            BlockInfo info = (BlockInfo)object;
+            Block block = new Block(info.id);
+            info = (BlockInfo)deserialize(keeper.getData().forPath(DNNConstants.BLOCKS_PATH
+ "/" + block.getBlockName()));
+            log.info("Block size for " + info.id + " is " + info.size);
+            length += info.size;
+            if (!info.complete) {
+              retry = true;
+              break;
+            }
+          }
+        }
+        if (retry) {
+          UtilWaitThread.sleep(250);
+          continue;
+        }
+        FileInfo info = (FileInfo)deserialize(keeper.getData().forPath(path));
+        info.size = length;
+        keeper.setData().forPath(path, serialize(info));
+        log.info("updated file length of " + src + " to " + length);
+        return true;
+      } catch (Exception ex) {
+        log.error(ex, ex);
+      }
+    }
+  }
+  
+  @Override
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    unimplemented((Object[])blocks);
+  }
+
+  private Object getInfo(String path) throws Exception {
+    byte data[];
+    try {
+      data = keeper.getData().forPath(path);
+    } catch (KeeperException.NoNodeException ex) {
+      return null;
+    }
+    if (data == null) return null;
+    return deserialize(data);
+  }
+  
+  private void recursivelyCopy(String src, String dst) throws Exception {
+    byte[] data = keeper.getData().forPath(src);
+    keeper.create().forPath(dst, data);
+    List<String> children = keeper.getChildren().forPath(src);
+    Collections.sort(children);
+    for (String child : children) {
+      recursivelyCopy(src + "/" + child, dst + "/" + child);
+    }
+  }
+  
+  @Override
+  public boolean rename(String src, String dst) throws IOException {
+    log.info("rename " + src + " -> " + dst);
+    try {
+      Object srcInfo = getInfo(DNNConstants.NAMESPACE_PATH + src);
+      Object dstInfo = getInfo(DNNConstants.NAMESPACE_PATH + dst);
+      String parent = getParent(dst);
+      Object parentInfo = getInfo(DNNConstants.NAMESPACE_PATH + parent);
+      if (srcInfo == null)
+        throw new FileNotFoundException(src);
+      if (!(srcInfo instanceof FileInfo))
+        throw new IOException(src + " is a directory");
+      if (parentInfo == null)
+        throw new IOException(parent + " does not exist");
+      if (!(parentInfo instanceof DirInfo))
+        throw new IOException(parent + " is not a directory");
+      if (dstInfo != null) {
+        if (dstInfo instanceof DirInfo)
+          return rename(src, dst + "/" + basename(src));
+        else
+          delete(dst);
+      }
+      recursivelyCopy(DNNConstants.NAMESPACE_PATH + src, DNNConstants.NAMESPACE_PATH + dst);
+      return true;
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  private String basename(String src) {
+    return src.substring(src.lastIndexOf("/") + 1);
+  }
+
+  private String getParent(String dst) {
+    return dst.substring(0, dst.lastIndexOf("/"));
+  }
+
+  @Override
+  public boolean delete(String src) throws IOException {
+    return delete(src, true);
+  }
+  
+  @Override
+  public boolean delete(String src, boolean recursive) throws IOException {
+    try {
+      recursivelyDelete(DNNConstants.NAMESPACE_PATH + src);
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    return true;
+  }
+  
+  private void recursivelyDelete(String path) throws Exception {
+    List<String> children = null;
+    try {
+      children = keeper.getChildren().forPath(path);
+    } catch (KeeperException.NoNodeException ex) {
+      return;
+    }
+    Object obj = deserialize(keeper.getData().forPath(path));
+    if (obj instanceof FileInfo) {
+      // create the datanode command to (eventually) delete the blocks
+      Map<String, List<Long>> hostToBlockMap = new HashMap<String, List<Long>>();
+      for (String child : children) {
+        Object childObject = deserialize(keeper.getData().forPath(path + "/" + child));
+        if (childObject instanceof BlockInfo) {
+          BlockInfo block = (BlockInfo)childObject;
+          for (String node : block.datanodes) {
+            List<Long> blocks = hostToBlockMap.get(node);
+            if (blocks == null)
+              hostToBlockMap.put(node, blocks = new ArrayList<Long>());
+            blocks.add(block.id);
+          }
+          keeper.delete().forPath(DNNConstants.BLOCKS_PATH + "/" + new Block(block.id, 0,
0).getBlockName());
+        }
+      }
+      for (Entry<String,List<Long>> entry : hostToBlockMap.entrySet()) {
+        String host = entry.getKey();
+        List<Block> blocks = new ArrayList<Block>();
+        for (Long blockId : entry.getValue())
+          blocks.add(new Block(blockId, 0, 0));
+        DatanodeCommand cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blocks.toArray(new
Block[0]));
+        byte[] data = DistributedNamenodeProxy.serialize(cmd);
+        keeper.create().forPath(DNNConstants.DATANODES_PATH + "/" + host + "/commands/" +
UUID.randomUUID().toString(), data);
+      }
+    }
+    for (String child : children) {
+      recursivelyDelete(path + "/" + child);
+    }
+    keeper.delete().forPath(path);
+  }
+
+  @Override
+  public boolean mkdirs(String src, FsPermission masked) throws IOException {
+    log.info("mkdirs " + src);
+    try {
+      DirInfo dirInfo = new DirInfo(System.currentTimeMillis());
+      byte[] data = serialize(dirInfo);
+      String path = DNNConstants.NAMESPACE_PATH + src;
+      keeper.create().creatingParentsIfNeeded().forPath(path, data);
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    return true;
+  }
+  
+  HdfsFileStatus decodeFile(String name, byte[] data) throws IOException, ClassNotFoundException
{
+    Object obj;
+    try {
+      obj = deserialize(data);
+    } catch (Exception ex) {
+      log.error(ex, ex);
+      return null;
+    }
+    if (obj instanceof FileInfo) {
+      FileInfo fileInfo = (FileInfo)obj;
+      
+      long length = fileInfo.size;
+      boolean isdir = false;
+      int block_replication = fileInfo.replication;
+      long blocksize = fileInfo.blocksize;
+      long modification_time = fileInfo.createTime;
+      long access_time = fileInfo.createTime;
+      FsPermission permission = FsPermission.valueOf("-" + fileInfo.permission);
+      String owner = "hdfs";
+      String group = "supergroup";
+      byte[] path = name.getBytes();
+      log.info(String.format("length %d isdir %s replication %d path %s", length, isdir,
block_replication, new String(path)));
+      return new HdfsFileStatus(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, path);
+    }
+    if (obj instanceof DirInfo) {
+      DirInfo dirInfo = (DirInfo)obj;
+      long length = 0;
+      boolean isdir = true;
+      int block_replication = 0;
+      long blocksize = 0;
+      long modification_time = dirInfo.createTime;
+      long access_time = dirInfo.createTime;
+      FsPermission permission = FsPermission.getDefault();
+      String owner = "hdfs";
+      String group = "supergroup";
+      byte[] path = name.getBytes();
+      log.info(String.format("length %d isdir %s replication %d path %s", length, isdir,
block_replication, new String(path)));
+      return new HdfsFileStatus(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, path);
+    }
+    return null;
+  }
+  
+  @Override
+  public DirectoryListing getListing(String src, byte[] startAfter) throws IOException {
+    try {
+      String basePath = DNNConstants.NAMESPACE_PATH + src;
+      List<String> children = keeper.getChildren().forPath(basePath);
+      List<HdfsFileStatus> stats = new ArrayList<HdfsFileStatus>();
+      for (String child : children) {
+        byte data[] = keeper.getData().forPath(basePath + "/" + child);
+        HdfsFileStatus status = decodeFile(child, data);
+        if (status != null)
+          stats.add(status);
+      }
+      return new DirectoryListing(stats.toArray(new HdfsFileStatus[]{}), 0);
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  @Override
+  public void renewLease(String clientName) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public long[] getStats() throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  @Override
+  public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  @Override
+  public long getPreferredBlockSize(String filename) throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+  
+  @Override
+  public boolean setSafeMode(SafeModeAction action) throws IOException {
+    // TODO Auto-generated method stub
+    return false;
+  }
+  
+  @Override
+  public void saveNamespace() throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public void refreshNodes() throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public void finalizeUpgrade() throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException
{
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  @Override
+  public void metaSave(String filename) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public HdfsFileStatus getFileInfo(String src) throws IOException {
+    log.info("Get file status");
+    try {
+      byte[] data = keeper.getData().forPath(DNNConstants.NAMESPACE_PATH + src);
+      HdfsFileStatus result = decodeFile(src, data);
+      if (result != null) {
+        log.info("Returning info for file " + result.getLocalName());
+      } else {
+        log.info("No file " + src);
+      }
+      return result;
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (Exception e) {
+      log.error(e, e);
+    }
+    return null;
+  }
+  
+  @Override
+  public ContentSummary getContentSummary(String path) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  @Override
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException
{
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public void fsync(String src, String client) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public void setTimes(String src, long mtime, long atime) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException
{
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  @Override
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException
{
+    // TODO Auto-generated method stub
+    return 0;
+  }
+  
+  @Override
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws
IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws IOException
{
+    // TODO Auto-generated method stub
+    return 0;
+  }
+  
+  @Override
+  public DatanodeRegistration register(DatanodeRegistration registration) throws IOException
{
+    if (keeper != null) {
+      log.info("registering in zookeeper as " + registration.name);
+      ByteArrayOutputStream stream = new ByteArrayOutputStream();
+      DataOutputStream data = new DataOutputStream(stream);
+      registration.write(data);
+      data.close();
+      try {
+        try {
+          for (String name : new String[]{DNNConstants.DNN, DNNConstants.DATANODES_PATH})
{
+            keeper.create().forPath(name);
+          }
+        } catch (KeeperException.NodeExistsException ex) {
+          // expected
+        }
+        String path = DNNConstants.DATANODES_PATH + "/" + registration.name;
+        try {
+          keeper.setData().forPath(path, stream.toByteArray());
+        } catch (KeeperException.NoNodeException e) {
+          keeper.create().forPath(path, stream.toByteArray());
+          keeper.create().forPath(path + "/blocks");
+          keeper.create().forPath(path + "/commands");
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    return registration;
+  }
+  
+  @Override
+  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, long capacity,
long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount)
+      throws IOException {
+    List<DatanodeCommand> commands = new ArrayList<DatanodeCommand>();
+    try {
+      String commandsPath = DNNConstants.DATANODES_PATH + "/" + registration.getName() +
"/commands";
+      for (String child : keeper.getChildren().forPath(commandsPath)) {
+        byte[] data = keeper.getData().forPath(commandsPath + "/" + child);
+        commands.add((DatanodeCommand)DistributedNamenodeProxy.deserialize(data));
+        keeper.delete().forPath(commandsPath + "/" + child);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return commands.toArray(new DatanodeCommand[0]);
+  }
+  
+  @Override
+  public DatanodeCommand blockReport(DatanodeRegistration registration, long[] blocks) throws
IOException {
+    BlockListAsLongs blist = new BlockListAsLongs(blocks);
+    Set<Long> current = new HashSet<Long>();
+    for (int i = 0; i < blist.getNumberOfBlocks(); i++) {
+      current.add(blist.getBlockId(i));
+    }
+    log.info(registration.name + " reports " + current);
+    return null;
+  }
+  
+  @Override
+  public void blocksBeingWrittenReport(DatanodeRegistration registration, long[] blocks)
throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public void blockReceived(DatanodeRegistration registration, Block[] blocks, String[] delHints)
throws IOException {
+    log.info("blockRecieved " + Arrays.asList(blocks));
+    for (Block block : blocks) {
+      String path = DNNConstants.BLOCKS_PATH + "/" + block.getBlockName();
+      try {
+        BlockInfo info = (BlockInfo)deserialize(keeper.getData().forPath(path));
+        info.size = block.getNumBytes();
+        info.complete = true;
+        byte[] data = serialize(info);
+        keeper.setData().forPath(path, data);
+        log.info("Block size updated on " + block + " to " + info.size);
+      } catch (Exception e) {
+        log.error(e, e);
+      }
+    }
+  }
+  
+  @Override
+  public void errorReport(DatanodeRegistration registration, int errorCode, String msg) throws
IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+  @Override
+  public NamespaceInfo versionRequest() throws IOException {
+    log.info("using versionRequest");
+    // TODO: find out how to get namespace id
+    // could store this in the info of the / entry
+    NamespaceInfo nsi = new NamespaceInfo(384837986, 0, 0);
+    //throw new RuntimeException();
+    return nsi;
+  }
+  
+  @Override
+  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  @Override
+  public long nextGenerationStamp(Block block, boolean fromNN) throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+  
+  @Override
+  public void commitBlockSynchronization(Block block, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
+      throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+  
+}

Propchange: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/protocol/HealthProtocol.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/protocol/HealthProtocol.java?rev=1371432&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/protocol/HealthProtocol.java
(added)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/protocol/HealthProtocol.java
Thu Aug  9 20:21:32 2012
@@ -0,0 +1,21 @@
+/**
+ * this class is the portion of DatanodeProtocol that has to do with health
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+public interface HealthProtocol extends VersionedProtocol {
+
+	public static final long versionID = 0;
+
+	public DatanodeCommand[] heartbeat(DatanodeRegistration reg);
+	
+	public void errorReport(DatanodeRegistration registration,
+            int errorCode, 
+            String msg) throws IOException;
+	
+	
+}

Propchange: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/protocol/HealthProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message