Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 932CBDBD8 for ; Wed, 29 Aug 2012 13:38:42 +0000 (UTC) Received: (qmail 80513 invoked by uid 500); 29 Aug 2012 13:38:42 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 80388 invoked by uid 500); 29 Aug 2012 13:38:42 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 80311 invoked by uid 99); 29 Aug 2012 13:38:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Aug 2012 13:38:41 +0000 X-ASF-Spam-Status: No, hits=-1998.0 required=5.0 tests=ALL_TRUSTED,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Aug 2012 13:38:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 03D232388900 for ; Wed, 29 Aug 2012 13:37:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1378547 - in /accumulo/branches/ACCUMULO-722/distnn: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ Date: Wed, 29 Aug 2012 13:37:54 -0000 To: commits@accumulo.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120829133755.03D232388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ecn Date: Wed Aug 29 13:37:54 2012 New Revision: 1378547 URL: http://svn.apache.org/viewvc?rev=1378547&view=rev Log: ACCUMULO-722: move DistributedNameNodeProxy behind ZookeeeperNameNode, remove SwitchingNameNode, implement abandonBlock, support incomplete file reading Removed: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java Modified: accumulo/branches/ACCUMULO-722/distnn/pom.xml accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java Modified: accumulo/branches/ACCUMULO-722/distnn/pom.xml URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/pom.xml?rev=1378547&r1=1378546&r2=1378547&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-722/distnn/pom.xml (original) +++ accumulo/branches/ACCUMULO-722/distnn/pom.xml Wed Aug 29 13:37:54 2012 @@ -4,6 +4,25 @@ distnn 0.0.1-SNAPSHOT distnn + + + + accumulo-1.4 + + 1.4.1 + + + + accumulo-1.5 + + true + + + 1.5.0-SNAPSHOT + + + + org.apache.hadoop @@ -11,24 +30,45 @@ 1.0.3 - org.apache.hadoop - hadoop-common - 2.0.0-alpha - - org.apache.accumulo accumulo-core - 1.4.1 + ${accumulo.version} org.apache.accumulo accumulo-server - 1.4.1 + ${accumulo.version} com.netflix.curator curator-framework 1.1.15 + + com.google.guava + guava + 12.0.1 + + + org.apache.hadoop + hadoop-core + 1.0.3 + + + org.apache.hadoop + zookeeper + 3.3.1 + + + javax.servlet + servlet-api + 2.4 + + + org.mortbay.jetty + jetty + [5.1,) + + \ No newline at end of file Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java?rev=1378547&r1=1378546&r2=1378547&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java (original) +++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java Wed Aug 29 13:37:54 2012 @@ -1,10 +1,8 @@ package org.apache.hadoop.hdfs; public class DNNConstants { - public static final String DNN = "/dnn"; - public static final String DATANODES_PATH = "/datanodes"; - public static final String NAMESPACE_PATH = "/namespace"; - public static final String BLOCKS_PATH = "/blocks"; - + public static final String DATANODES_PATH = DNN + "/datanodes"; + public static final String NAMESPACE_PATH = DNN + "/namespace"; + public static final String BLOCKS_PATH = DNN + "/blocks"; } Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java?rev=1378547&r1=1378546&r2=1378547&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java (original) +++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java Wed Aug 29 13:37:54 2012 @@ -4,7 +4,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -27,11 +26,8 @@ import org.apache.hadoop.hdfs.protocol.H import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; -import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy; -import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy.ConnectInfo; import org.apache.hadoop.hdfs.server.namenode.FakeNameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.SwitchingNameNode; import org.apache.hadoop.hdfs.server.namenode.ZookeeperNameNode; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.AccessControlException; @@ -40,11 +36,6 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; -import com.netflix.curator.framework.CuratorFramework; -import com.netflix.curator.framework.CuratorFrameworkFactory; -import com.netflix.curator.framework.CuratorFrameworkFactory.Builder; -import com.netflix.curator.retry.RetryUntilElapsed; - // Basically a copy of DistributedFileSystem providing a different NN client implementation public class DNNFileSystem extends FileSystem { private static Logger log = Logger.getLogger(DNNFileSystem.class); @@ -71,22 +62,14 @@ public class DNNFileSystem extends FileS public void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); setConf(conf); - ConnectInfo info = new ConnectInfo(uri); - FakeNameNode fakefakefake = null; + FakeNameNode fake = null; try { - Builder builder = CuratorFrameworkFactory.builder().namespace(DNNConstants.DNN); - builder.connectString(info.zookeepers); - builder.retryPolicy(new RetryUntilElapsed(120*1000, 500)); - //builder.aclProvider(aclProvider); - CuratorFramework client = builder.build(); - client.start(); - ZookeeperNameNode zoo = new ZookeeperNameNode(client); - fakefakefake = SwitchingNameNode.create(zoo, info); + fake = new ZookeeperNameNode(conf, uri); } catch (Exception ex) { throw new IOException(ex); } - log.info("Creating DFSClient with fake name node " + fakefakefake); - this.dfs = new DFSClient(null, fakefakefake, conf, statistics); + log.info("Creating DFSClient with fake name node " + fake); + this.dfs = new DFSClient(null, fake, conf, statistics); this.uri = uri; this.workingDir = getHomeDirectory(); } Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1378547&r1=1378546&r2=1378547&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Aug 29 13:37:54 2012 @@ -85,13 +85,12 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeInstrumentation; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; -import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets; import org.apache.hadoop.hdfs.server.namenode.JspHelper; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.StreamFile; -import org.apache.hadoop.hdfs.server.namenode.SwitchingNameNode; +import org.apache.hadoop.hdfs.server.namenode.ZookeeperNameNode; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo; @@ -360,7 +359,7 @@ public class DataNode extends Configured // DatanodeProtocol.versionID, // nameNodeAddr, // conf); - this.namenode = SwitchingNameNode.create(conf); + this.namenode = new ZookeeperNameNode(conf); // get version and id info from the name-node NamespaceInfo nsInfo = handshake(); StartupOption startOpt = getStartupOption(conf); Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java?rev=1378547&r1=1378546&r2=1378547&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java (original) +++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java Wed Aug 29 13:37:54 2012 @@ -4,14 +4,10 @@ * add hierarchical locking * add support for permissions * add support for leasing - * add support for block generations? * * finish namenode actions: - * TEST::implement delete - * TEST::implement mkdirs * * finish datanode actions: - * error reporting * * * @@ -41,6 +37,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -49,8 +46,13 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.TimerTask; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchScanner; @@ -69,10 +71,9 @@ import org.apache.accumulo.core.data.Val import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.hadoop.conf.Configuration; +import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DNNConstants; @@ -97,18 +98,17 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; + +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import com.netflix.curator.framework.CuratorFramework; public class DistributedNamenodeProxy implements FakeNameNode { + Executor executor = Executors.newSingleThreadExecutor(); + public static class ConnectInfo { public ConnectInfo(URI uri) { String userInfo = uri.getUserInfo(); @@ -138,55 +138,37 @@ public class DistributedNamenodeProxy im private static Logger log = Logger.getLogger(DistributedNamenodeProxy.class); - long start = System.currentTimeMillis(); - private class Replicator { - // TODO: make this respect configured replication settings - - private HashSet targets; + private HashSet targets; Replicator() { - targets = new HashSet(); + targets = new HashSet(); } - DatanodeInfo[] getReplicationTargets() throws IOException { + DatanodeInfo[] getReplicationTargets(int replicationFactor) throws IOException { // TODO: periodically scan the datanodes table to find new datanodes - if(targets.size() == 0) + while (targets.size() == 0) { scanDatanodes(); + if (targets.size() > 0) + break; + UtilWaitThread.sleep(250); + } + List targetsCopy = new ArrayList(); + synchronized (targets) { + targetsCopy.addAll(targets); + } // pick nodes at random // TODO: take into account whether a datanode is too full to host another block - // the old namenode would also have a hard limit on the total number - // of fs objects it could store - int replicationFactor = 1; + Collections.shuffle(targetsCopy); - if(targets.size() < replicationFactor) + if(targetsCopy.size() < 1) throw new IOException("unable to achieve required replication: too few datanodes running"); - HashSet targetSetNames = new HashSet(); - - HashSet targetSet = new HashSet(); - for(int i=0; i < replicationFactor; i++) { - int r = rand.nextInt(targets.size()); - - Iterator iter = targets.iterator(); - for(int j=0; j < r-1; iter.next()); - String target = iter.next(); - - // don't create two replicas on the same target - while(targetSetNames.contains(target)) { - r = rand.nextInt(targets.size()); - iter = targets.iterator(); - for(int j=0; j < r-1; iter.next()); - target = iter.next(); - } - - targetSet.add(new DatanodeInfo(new DatanodeID(target))); - } - - DatanodeInfo[] targetSetArray = targetSet.toArray(new DatanodeInfo[targetSet.size()]); + targetsCopy = targetsCopy.subList(0, Math.min(replicationFactor, targetsCopy.size())); + DatanodeInfo[] targetSetArray = targetsCopy.toArray(new DatanodeInfo[targetsCopy.size()]); return targetSetArray; } @@ -196,16 +178,29 @@ public class DistributedNamenodeProxy im // failed interactions with a datanode private void scanDatanodes() throws IOException { log.info("scanning datanodes table .."); - targets.clear(); + HashSet updatedTargets = new HashSet(); BatchScanner scanner = createBatchScanner(datanodesTable, new Range()); - ColumnFQ.fetch(scanner, remaining); + infoIpcPort.fetch(scanner); try { for (Entry entry : scanner) { - targets.add(entry.getKey().getRow().toString()); + String nodeName = entry.getKey().getRow().toString(); + int ipcPort = Integer.parseInt(entry.getValue().toString()); + updatedTargets.add(new DatanodeInfo(new DatanodeID(nodeName, "", -1, ipcPort))); } } finally { scanner.close(); } + try { + if (updatedTargets.isEmpty()) { + log.info("scanning datanodes from zookeeper .."); + for (String nodeName : zookeeper.getChildren().forPath(DNNConstants.DATANODES_PATH)) + updatedTargets.add(new DatanodeInfo(new DatanodeID(nodeName))); + } + } catch (Exception ex) { + log.warn(ex, ex); + } + log.info("there are " + updatedTargets.size() + " datanodes"); + targets = updatedTargets; } } @@ -258,7 +253,7 @@ public class DistributedNamenodeProxy im private Random rand = new Random(); private Replicator replicator = new Replicator(); - private Connector conn; + private final Connector conn; private final static String namespaceTable = "namespace"; private final static String blocksTable = "blocks"; private final static String datanodesTable = "datanodes"; @@ -267,11 +262,12 @@ public class DistributedNamenodeProxy im private final static Text blocksFam = new Text("blocks"); private final static Text datanodesFam = new Text("datanodes"); private final static Text commandFam = new Text("command"); + private final static ColumnFQ remaining = new ColumnFQ(infoFam, new Text("remaining")); - private final static ColumnFQ infoSize = new ColumnFQ(infoFam, new Text("size")); private final static ColumnFQ isDir = new ColumnFQ(infoFam, new Text("isDir")); private final static ColumnFQ infoCapacity = new ColumnFQ(infoFam, new Text("capacity")); + private final static ColumnFQ infoIpcPort = new ColumnFQ(infoFam, new Text("ipc_port")); private final static ColumnFQ infoUsed = new ColumnFQ(infoFam, new Text("used")); private final static ColumnFQ infoReplication = new ColumnFQ(infoFam, new Text("replication")); private final static ColumnFQ infoBlockSize = new ColumnFQ(infoFam, new Text("blocksize")); @@ -291,51 +287,18 @@ public class DistributedNamenodeProxy im private long lastRemaining = -1; - private ZooKeeper zookeeper = null; - private String instanceName = null; - private String keepers = null; - private final String username = "root"; - private byte[] passwd = "secret".getBytes(); - - private Connector getConnector() { - synchronized (this) { - if (conn == null) { - try { - Instance instance = new ZooKeeperInstance(instanceName, keepers); - conn = instance.getConnector(username, passwd); - } catch (Exception ex) { - conn = null; - log.warn("Unable to get connector " + ex); - } - } - } - return conn; - } - - public DistributedNamenodeProxy(Configuration conf) throws IOException { - instanceName = conf.get("accumulo.zookeeper.instance"); - keepers = conf.get("accumulo.zookeeper.keepers"); - zookeeper = new ZooKeeper(keepers, 30000, new Watcher() { - @Override - public void process(WatchedEvent arg0) { - log.info("zookeeper says " + arg0); - } - }); - for (String name : new String[] {DNNConstants.DNN, DNNConstants.BLOCKS_PATH, DNNConstants.DATANODES_PATH, DNNConstants.NAMESPACE_PATH}) { - try { - zookeeper.create(name, new byte[]{}, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException ex) { - // ya, ya, don't care - } catch (Exception ex) { - throw new IOException(ex); - } - } - } + private final CuratorFramework zookeeper; - public DistributedNamenodeProxy(Connector conn) throws IOException { + public DistributedNamenodeProxy(CuratorFramework keeper, URI uri) throws IOException { log.info("========= Distributed Name Node Proxy init ========="); - this.conn = conn; - + ConnectInfo info = new ConnectInfo(uri); + Instance instance = new ZooKeeperInstance(info.instance, info.zookeepers); + zookeeper = keeper; + try { + this.conn = instance.getConnector(info.username, info.passwd); + } catch (Exception e) { + throw new IOException(e); + } // String healthNodeHost = config.get("healthnode"); // if(healthNodeHost == null) // throw new IOException("error: no healthnode address specified. add one to core-site.xml"); @@ -355,6 +318,57 @@ public class DistributedNamenodeProxy im throws IOException { log.info("using abandonBlock"); + // find the block's position in the list (probably the last one) + BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src))); + bs.fetchColumnFamily(blocksFam); + + // delete it from the file + Mutation m = new Mutation(new Text(src)); + try { + for (Entry entry : bs) { + String cq = entry.getKey().getColumnQualifier().toString(); + String parts[] = cq.split("_"); + long block = Long.parseLong(parts[1]); + if (b.getBlockId() == block) { + m.putDelete(blocksFam, entry.getKey().getColumnQualifier()); + } + } + } finally { + bs.close(); + } + if (m.getUpdates().isEmpty()) { + throw new IOException("Block " + b.getBlockId() + " not found to abandon for " + src); + } + + // delete the block size and location information + BatchWriter bw = createBatchWriter(namespaceTable); + try { + bw.addMutation(m); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } finally { + try { + bw.close(); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + } + bw = createBatchWriter(blocksTable); + try { + Text row = new Text("" + b.getBlockId()); + bs = createBatchScanner(blocksTable, new Range(row)); + m = new Mutation(row); + for (Entry entry : bs) { + m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); + } + } finally { + try { + bw.close(); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + bs.close(); + } } @Override @@ -378,6 +392,29 @@ public class DistributedNamenodeProxy im public LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludeNodes) throws IOException { log.info("using addBlock " + src + " " + clientName); + + // get the last block ID and replication + BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src))); + bs.fetchColumnFamily(blocksFam); + infoReplication.fetch(bs); + + // TODO: fetch from configuration + int defaultReplication = 3; + int replication = -1; + int blockPos = 0; + try { + for (Entry entry : bs) { + if (entry.getKey().getColumnFamily().equals(blocksFam)) + blockPos++; + if (infoReplication.hasColumns(entry.getKey())) + replication = Integer.parseInt(entry.getValue().toString()); + } + } finally { + bs.close(); + } + if (replication < 1) { + replication = defaultReplication; + } // create new blocks on data nodes // zookeeper holds the negative numbered blocks @@ -387,7 +424,8 @@ public class DistributedNamenodeProxy im Block b = new Block(blockID, 0, 0); // choose a set of nodes on which to replicate block - DatanodeInfo[] targets = replicator.getReplicationTargets(); + DatanodeInfo[] targets = replicator.getReplicationTargets(replication); + log.info("replicating " + blockID + " to " + Arrays.asList(targets)); // TODO: get a lease to the first // TODO: can we record all this in the namespace table? @@ -397,19 +435,6 @@ public class DistributedNamenodeProxy im // record block to host mapping and vice versa recordBlockHosts(blockIDBytes, targets); - // get the last block ID - BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src))); - bs.fetchColumnFamily(blocksFam); - - int blockPos = 0; - try { - for (@SuppressWarnings("unused") Entry entry : bs) { - blockPos++; - } - } finally { - bs.close(); - } - // record file to block mapping Mutation nameData = new Mutation(new Text(src.getBytes())); nameData.put(blocksFam, new Text(String.format("%08d_%d", blockPos, blockID).getBytes()), blank); @@ -440,29 +465,35 @@ public class DistributedNamenodeProxy im */ @Override - public void blockReceived(DatanodeRegistration registration, Block[] blocks, String[] delHints) throws IOException { + public void blockReceived(DatanodeRegistration registration, final Block[] blocks, String[] delHints) throws IOException { log.info("using blockReceived"); - - // for each block we should have recorded its existence already - // we should also know about the datanode - - // update blocks table - BatchWriter bw = createBatchWriter(blocksTable); - try { - try { - for(Block b : blocks) { - Mutation blockData = new Mutation(new Text(Long.toString(b.getBlockId()))); - ColumnFQ.put(blockData, infoBlockSize, new Value(Long.toString(b.getNumBytes()).getBytes())); - bw.addMutation(blockData); + SimpleTimer.getInstance().schedule(new TimerTask() { + @Override + public void run() { + + // for each block we should have recorded its existence already + // we should also know about the datanode + + // update blocks table + try { + final BatchWriter bw = createBatchWriter(blocksTable); + try { + for(Block b : blocks) { + if (ZookeeperNameNode.isZooBlockId(b.getBlockId())) + continue; + Mutation blockData = new Mutation(new Text(Long.toString(b.getBlockId()))); + infoBlockSize.put(blockData, new Value(Long.toString(b.getNumBytes()).getBytes())); + bw.addMutation(blockData); + } + } finally { + bw.close(); + } + // update total file space ? + } catch (Exception ex) { + log.info(ex, ex); } - } finally { - bw.close(); } - // update total file space ? - } catch (MutationsRejectedException ex) { - throw new IOException(ex); - } - + }, 0); } @Override @@ -472,9 +503,13 @@ public class DistributedNamenodeProxy im BlockListAsLongs blist = new BlockListAsLongs(blocks); Set current = new HashSet(); for (int i = 0; i < blist.getNumberOfBlocks(); i++) { - current.add(blist.getBlockId(i)); + if (blist.getBlockId(i) > 0) { + current.add(blist.getBlockId(i)); + } } log.info(registration.getName() + " reports blocks " + current); + if (current.isEmpty()) + return null; BatchWriter bw = createBatchWriter(blocksTable); Mutation m = new Mutation(registration.getName()); Scanner scan = createScanner(datanodesTable); @@ -542,41 +577,33 @@ public class DistributedNamenodeProxy im } finally { bs.close(); } - log.info("have ranges " + ranges); if (ranges.isEmpty()) return true; long fileSize = 0; - retry: - while (true) { - BatchScanner blockScanner = createBatchScanner(blocksTable, ranges.toArray(new Range[]{})); - ColumnFQ.fetch(blockScanner, infoBlockSize); - fileSize = 0; - int count = 0; - try { - for (Entry entry : blockScanner) { - log.info("Looking at block sizes " + entry.getKey() + " -> " + entry.getValue()); - long blockSize = Long.parseLong(new String(entry.getValue().get())); - if (blockSize == 0) { - UtilWaitThread.sleep(250); - continue retry; - } - fileSize += blockSize; - count++; - } - } finally { - blockScanner.close(); - } - if (count != ranges.size()) { - log.info("Did not read block sizes for all blocks on file " + src + " read " + count + " but expected " + ranges.size()); - UtilWaitThread.sleep(250); - continue; + BatchScanner blockScanner = createBatchScanner(blocksTable, ranges.toArray(new Range[]{})); + infoBlockSize.fetch(blockScanner); + fileSize = 0; + int count = 0; + try { + for (Entry entry : blockScanner) { + log.info("Looking at block sizes " + entry.getKey() + " -> " + entry.getValue()); + long blockSize = Long.parseLong(new String(entry.getValue().get())); + if (blockSize == 0) + break; + fileSize += blockSize; + count++; } - break; + } finally { + blockScanner.close(); + } + if (count != ranges.size()) { + log.info("Did not read block sizes for all blocks on file " + src + " read " + count + " but expected " + ranges.size()); + return false; } // write size to namespace table Mutation fileSizePut = new Mutation(new Text(src.getBytes())); - ColumnFQ.put(fileSizePut, infoSize, new Value(Long.toString(fileSize).getBytes())); + infoSize.put(fileSizePut, new Value(Long.toString(fileSize).getBytes())); BatchWriter bw = createBatchWriter(namespaceTable); try { try { @@ -597,7 +624,7 @@ public class DistributedNamenodeProxy im } static private void put(Mutation m, ColumnFQ cfq, String value) { - ColumnFQ.put(m, cfq, new Value(value.getBytes())); + cfq.put(m, new Value(value.getBytes())); } static private Value now() { @@ -614,7 +641,7 @@ public class DistributedNamenodeProxy im ColumnFQ srcColumn = new ColumnFQ(childrenFam, new Text(src)); BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(parent))); - ColumnFQ.fetch(bs, isDir); + isDir.fetch(bs); bs.fetchColumnFamily(childrenFam); try { for (Entry entry : bs) { @@ -658,7 +685,7 @@ public class DistributedNamenodeProxy im BatchWriter bw = createBatchWriter(namespaceTable); try { Mutation createRequest = new Mutation(new Text(src)); - ColumnFQ.put(createRequest, infoModificationTime, now()); + infoModificationTime.put(createRequest, now()); put(createRequest, infoReplication, Short.toString(replication)); put(createRequest, infoBlockSize, Long.toString(blockSize)); put(createRequest, infoPermission, masked.toString()); @@ -702,7 +729,7 @@ public class DistributedNamenodeProxy im } } - private BatchWriter createBatchWriter(String table) throws IOException { + public BatchWriter createBatchWriter(String table) throws IOException { return createBatchWriter(conn, table); } @@ -722,7 +749,7 @@ public class DistributedNamenodeProxy im // determine whether this is a directory BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src))); - ColumnFQ.fetch(bs, isDir); + isDir.fetch(bs); bs.fetchColumnFamily(childrenFam); String isDir_ = null; @@ -744,7 +771,7 @@ public class DistributedNamenodeProxy im Mutation childDelete = new Mutation(new Text(parent)); Text srcText = new Text(src); childDelete.putDelete(childrenFam, srcText); - ColumnFQ.put(childDelete, infoModificationTime, now()); + infoModificationTime.put(childDelete, now()); ArrayList deletes = new ArrayList(); getDeletes(srcText, deletes); @@ -796,6 +823,7 @@ public class DistributedNamenodeProxy im } } } + bs.close(); bw.close(); log.info("Host -> block map " + hostBlockMap); @@ -889,6 +917,7 @@ public class DistributedNamenodeProxy im throw new IOException("file not found: " + src); } + boolean underConst = false; fileLength = 0L; long blockOffset = 0L; for(Text id : IDs.keySet()) { @@ -905,12 +934,15 @@ public class DistributedNamenodeProxy im ArrayList dni = new ArrayList(); bs = createBatchScanner(blocksTable, new Range(idString)); bs.fetchColumnFamily(datanodesFam); - ColumnFQ.fetch(bs, infoBlockSize); + infoBlockSize.fetch(bs); try { for (Entry entry : bs) { if (infoBlockSize.hasColumns(entry.getKey())) { blockSize = Long.parseLong(new String(entry.getValue().get())); fileLength += blockSize; + if (blockSize == 0) { + underConst = true; + } log.info("got size " + blockSize + " for block " + blockIDString); } else if (entry.getKey().getColumnFamily().equals(datanodesFam)) { String host = entry.getKey().getColumnQualifier().toString(); @@ -935,17 +967,16 @@ public class DistributedNamenodeProxy im } // TODO: sort locatedBlocks by network-distance from client - boolean underConst = false; - log.info("Reporting file size of " + fileLength); - BatchWriter bw = createBatchWriter(namespaceTable); - try { - Mutation m = new Mutation(src); - ColumnFQ.put(m, infoSize, new Value(Long.toString(fileLength).getBytes())); - bw.addMutation(m); - bw.close(); - } catch (Exception ex) { - throw new IOException(ex); - } + log.info("Reporting file size of " + fileLength + " underConstruction = " + true); +// BatchWriter bw = createBatchWriter(namespaceTable); +// try { +// Mutation m = new Mutation(src); +// ColumnFQ.put(m, infoSize, new Value(Long.toString(fileLength).getBytes())); +// bw.addMutation(m); +// bw.close(); +// } catch (Exception ex) { +// throw new IOException(ex); +// } return new LocatedBlocks(fileLength, locatedBlocks, underConst); } @@ -1013,8 +1044,7 @@ public class DistributedNamenodeProxy im * This method is currently doing a lot of lookups ... */ @Override - public DirectoryListing getListing(String src, byte[] startAfter) - throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { + public DirectoryListing getListing(String src, byte[] startAfter) throws IOException { log.info("using getListing " + src); // TODO: use startAfter and needLocation @@ -1023,7 +1053,7 @@ public class DistributedNamenodeProxy im String isDirFlag = null; BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src))); bs.fetchColumnFamily(childrenFam); - ColumnFQ.fetch(bs, isDir); + isDir.fetch(bs); List children = new ArrayList(); try { for (Entry entry : bs) { @@ -1143,7 +1173,7 @@ public class DistributedNamenodeProxy im src = normalizePath(src); BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src))); - ColumnFQ.fetch(bs, isDir); + isDir.fetch(bs); try { for (Entry entry : bs) { if (isDir.hasColumns(entry.getKey())) { @@ -1162,7 +1192,7 @@ public class DistributedNamenodeProxy im byte[] parentPath = getParentPath(src); bs = createBatchScanner(namespaceTable, new Range(new Text(parentPath))); - ColumnFQ.fetch(bs, isDir); + isDir.fetch(bs); bs.fetchColumnFamily(childrenFam); String isDirString = null; try { @@ -1190,8 +1220,8 @@ public class DistributedNamenodeProxy im try { bw.addMutation(m); m = new Mutation(new Text(src)); - ColumnFQ.put(m, isDir, new Value("Y".getBytes())); - ColumnFQ.put(m, infoModificationTime, new Value(Long.toString(System.currentTimeMillis()).getBytes())); + isDir.put(m, new Value("Y".getBytes())); + infoModificationTime.put(m, new Value(Long.toString(System.currentTimeMillis()).getBytes())); bw.addMutation(m); } finally { bw.close(); @@ -1213,7 +1243,7 @@ public class DistributedNamenodeProxy im @Override public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException { - log.info("using processUpgradeCommand"); + unimplemented(comm); return null; } @@ -1248,44 +1278,6 @@ public class DistributedNamenodeProxy im } - /** - * helpers - * both of these write to blocksTable and datanodesTable - * - * @param host - * @param hblocks - * @throws IOException - */ - private void recordHostBlocks(String host, long[] hblocks) throws IOException { - try { - if(hblocks.length == 0) - return; - - Mutation hostData = new Mutation(new Text(host)); - for(int i=0; i < hblocks.length; i++) - hostData.put(blocksFam, new Text(Long.toString(hblocks[i]).getBytes()), blank); - BatchWriter writer = createBatchWriter(datanodesTable); - try { - writer.addMutation(hostData); - } finally { - writer.close(); - } - - writer = createBatchWriter(blocksTable); - try { - for(int i=0; i < hblocks.length; i++) { - Mutation block = new Mutation(new Text(Long.toString(hblocks[i]).getBytes())); - block.put(datanodesFam, new Text(host.getBytes()), blank); - writer.addMutation(block); - } - } finally { - writer.close(); - } - } catch (MutationsRejectedException ex) { - throw new IOException(ex); - } - } - @Override public boolean recoverLease(String src, String clientName) throws IOException { unimplemented(src, clientName); @@ -1294,8 +1286,7 @@ public class DistributedNamenodeProxy im @Override public void refreshNodes() throws IOException { - log.info("using refreshNodes"); - + unimplemented(); } @Override @@ -1305,11 +1296,10 @@ public class DistributedNamenodeProxy im // record this datanode's info try { - Connector conn = getConnector(); if (conn != null) { BatchWriter bw = createBatchWriter(datanodesTable); Mutation reg = new Mutation(new Text(registration.name.getBytes())); - ColumnFQ.put(reg, infoStorageID, new Value(registration.storageID.getBytes())); + infoStorageID.put(reg, new Value(registration.storageID.getBytes())); try { try { bw.addMutation(reg); @@ -1321,7 +1311,7 @@ public class DistributedNamenodeProxy im } } } catch (Throwable ex) { - log.info("Ignoring exceptiong, maybe accumulo is not yet initialized? " + ex); + log.info("Ignoring exception, maybe accumulo is not yet initialized? " + ex); } // clients get this info in a list of targets from addBlock() return registration; @@ -1337,14 +1327,18 @@ public class DistributedNamenodeProxy im } FileStatus getFileStatus(String src) throws IOException { - BatchScanner bs = createBatchScanner(namespaceTable, new Range(src)); - ColumnFQ.fetch(bs, isDir); FileStatus result = new FileStatus(false, false); - for (Entry entry : bs) { - result.exists = true; - if (new String(entry.getValue().get()).equals("Y")) { - result.isDir = true; + BatchScanner bs = createBatchScanner(namespaceTable, new Range(src)); + try { + isDir.fetch(bs); + for (Entry entry : bs) { + result.exists = true; + if (new String(entry.getValue().get()).equals("Y")) { + result.isDir = true; + } } + } finally { + bs.close(); } return result; } @@ -1416,83 +1410,106 @@ public class DistributedNamenodeProxy im @Override public void renewLease(String clientName) throws IOException { - log.info("using renewLease"); - + unimplemented(clientName); } @Override public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { - log.info("using reportBadBlocks"); + unimplemented((Object)blocks); } @Override public void saveNamespace() throws IOException { - log.info("using saveNamespace"); - + unimplemented(); } + private static class SendResult { + List commands = new ArrayList(); + List deletes = new ArrayList(); + } + + // try to send a heartbeat.. if it times out, do nothing: we are probably recovering the metadata tables @Override public DatanodeCommand[] sendHeartbeat(final DatanodeRegistration registration, final long capacity, final long dfsUsed, final long remaining, final int xmitsInProgress, final int xceiverCount) throws IOException { - log.info("using sendHeartbeat"); - if (System.currentTimeMillis() - start < 10*1000) - return new DatanodeCommand[0]; - - // update datanodes table with info - // skip this if none of the numbers have changed - // TODO: get last numbers from a lookup - if(capacity != lastCapacity || - dfsUsed != lastDfsUsed || - remaining != lastRemaining) { - Connector conn; - try { - conn = getConnector(); - } catch (Throwable ex) { - // probably not initialized - return new DatanodeCommand[0]; - } - try { - if (conn != null) { - BatchWriter bw = createBatchWriter(conn, datanodesTable); - Mutation m = new Mutation(new Text(registration.name.getBytes())); - ColumnFQ.put(m, infoCapacity, new Value(Long.toString(capacity).getBytes())); - ColumnFQ.put(m, infoUsed, new Value(Long.toString(dfsUsed).getBytes())); - ColumnFQ.put(m, DistributedNamenodeProxy.remaining, new Value(Long.toString(remaining).getBytes())); + + FutureTask future = new FutureTask(new Callable() { + @Override + public SendResult call() throws Exception { + SendResult result = new SendResult(); + + log.info("using sendHeartbeat"); + if (!conn.tableOperations().exists(datanodesTable)) + return result; + // update datanodes table with info + // skip this if none of the numbers have changed + // TODO: get last numbers from a lookup + if(capacity != lastCapacity || + dfsUsed != lastDfsUsed || + remaining != lastRemaining) { try { - bw.addMutation(m); - } finally { - bw.close(); + BatchWriter bw = createBatchWriter(conn, datanodesTable); + Mutation m = new Mutation(new Text(registration.name.getBytes())); + infoCapacity.put(m, new Value(Long.toString(capacity).getBytes())); + infoUsed.put(m, new Value(Long.toString(dfsUsed).getBytes())); + infoIpcPort.put(m, new Value(Integer.toString(registration.getIpcPort()).getBytes())); + DistributedNamenodeProxy.remaining.put(m, new Value(Long.toString(remaining).getBytes())); + try { + bw.addMutation(m); + } finally { + bw.close(); + } + } catch (Exception ex) { + log.error(ex, ex); } } - } catch (Exception ex) { - log.error(ex, ex); + lastCapacity = capacity; + lastDfsUsed = dfsUsed; + lastRemaining = remaining; + // return a list of commands for the data node + List commands = new ArrayList(); + try { + BatchScanner bs = createBatchScanner(datanodesTable, new Range(registration.getName())); + bs.fetchColumnFamily(commandFam); + for (Entry entry : bs) { + Key key = entry.getKey(); + DatanodeCommand command = (DatanodeCommand)deserialize(entry.getValue().get()); + log.info("found datanode Command " + command); + commands.add(command); + Mutation m = new Mutation(key.getRow()); + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); + result.deletes.add(m); + } + bs.close(); + } catch (Exception ex) { + throw new IOException(ex); + } + + return result; + } + }); + + executor.execute(future); + try { + synchronized(future) { + future.wait(1000); } + } catch (InterruptedException ex) { + // ignored } - lastCapacity = capacity; - lastDfsUsed = dfsUsed; - lastRemaining = remaining; - // return a list of commands for the data node - List commands = new ArrayList(); try { - BatchScanner bs = createBatchScanner(datanodesTable, new Range(registration.getName())); - bs.fetchColumnFamily(commandFam); - BatchWriter bw = createBatchWriter(datanodesTable); - for (Entry entry : bs) { - Key key = entry.getKey(); - DatanodeCommand command = (DatanodeCommand)deserialize(entry.getValue().get()); - log.info("found datanode Command " + command); - commands.add(command); - Mutation m = new Mutation(key.getRow()); - m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); - bw.addMutation(m); + if (future.isDone()) { + SendResult result = future.get(); + BatchWriter bw = createBatchWriter(datanodesTable); + bw.addMutations(result.deletes); + bw.close(); + return result.commands.toArray(new DatanodeCommand[0]); } - bs.close(); - bw.close(); } catch (Exception ex) { - throw new IOException(ex); + log.error(ex, ex); } - return commands.toArray(new DatanodeCommand[0]); + return new DatanodeCommand[0]; } @@ -1528,8 +1545,7 @@ public class DistributedNamenodeProxy im @Override public void setOwner(String src, String username, String groupname) throws IOException { - log.info("using setOwner"); - + unimplemented(src, username, groupname); } @Override @@ -1542,7 +1558,7 @@ public class DistributedNamenodeProxy im BatchWriter bw = createBatchWriter(namespaceTable); try { Mutation m = new Mutation(src); - ColumnFQ.put(m, infoPermission, new Value(permission.toString().getBytes())); + infoPermission.put(m, new Value(permission.toString().getBytes())); bw.addMutation(m); } finally { bw.close(); @@ -1556,41 +1572,31 @@ public class DistributedNamenodeProxy im @Override public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException { - log.info("using setQuota"); - + unimplemented(path, namespaceQuota, diskspaceQuota); } @Override public boolean setReplication(String src, short replication) throws IOException { - log.info("using setReplication"); + unimplemented(src, replication); return false; } @Override public boolean setSafeMode(SafeModeAction action) throws IOException { - log.info("using setSafeMode"); + unimplemented(action); return false; } @Override public void setTimes(String src, long mtime, long atime) throws IOException { - log.info("using setTimes"); - - } - - public void stop() { + unimplemented(src, mtime, atime); } @Override public NamespaceInfo versionRequest() throws IOException { - log.info("using versionRequest"); - // TODO: find out how to get namespace id - // could store this in the info of the / entry - NamespaceInfo nsi = new NamespaceInfo(384837986, 0, 0); - //throw new RuntimeException(); - return nsi; + throw new NotImplementedException(); } } Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java?rev=1378547&r1=1378546&r2=1378547&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java (original) +++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java Wed Aug 29 13:37:54 2012 @@ -8,6 +8,8 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -15,15 +17,16 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.UUID; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import java.util.TreeMap; +import java.util.UUID; +import java.util.regex.Pattern; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.permission.FsPermission; @@ -41,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; +import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy.ConnectInfo; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -54,6 +58,9 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.CuratorFrameworkFactory; +import com.netflix.curator.framework.CuratorFrameworkFactory.Builder; +import com.netflix.curator.retry.RetryUntilElapsed; public class ZookeeperNameNode implements FakeNameNode { static private Logger log = Logger.getLogger(ZookeeperNameNode.class); @@ -103,8 +110,51 @@ public class ZookeeperNameNode implement boolean complete; } - public ZookeeperNameNode(CuratorFramework client) { + static Pattern isRoot = Pattern.compile("/accumulo(|/instance_id.*|/version.*|/walogArchive|/wal(/.*|$)|/recovery.*|/tables$|/tables/(\\!0|0|1|2)(/.*|$))"); + + static private boolean isZooName(String path) { + boolean result = isRoot.matcher(path).matches(); + log.info("Looking at " + path + " isZooName " + result); + return result; + } + static public boolean isZooBlockId(long blockId) { + return blockId < 0; + } + + private final URI uri; + private DistributedNamenodeProxy dist = null; + private final String instance; + + private long start = System.currentTimeMillis(); + + private static URI getURI(Configuration conf) throws IOException { + try { + return new URI(conf.get("fs.default.name")); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + public ZookeeperNameNode(Configuration conf) throws IOException { + this(conf, getURI(conf)); + } + + public ZookeeperNameNode(Configuration conf, URI uri) throws IOException { + ConnectInfo info = new ConnectInfo(uri); + instance = info.instance; + Builder builder = CuratorFrameworkFactory.builder(); + builder.connectString(info.zookeepers); + builder.retryPolicy(new RetryUntilElapsed(120*1000, 500)); + //builder.aclProvider(aclProvider); + CuratorFramework client = builder.build(); + client.start(); this.keeper = client; + this.uri = uri; + try { + findDatanodes(); + } catch (Exception e) { + // ignored + } } private static void unimplemented(Object ... args) { @@ -113,12 +163,41 @@ public class ZookeeperNameNode implement log.warn(method + " unimplemented, args: " + Arrays.asList(args), t); } + private FakeNameNode dist() { + try { + if (dist == null) { + String instanceId = new String(keeper.getData().forPath("/accumulo/instances/" + instance)); + log.info("Looking at instance " + instance + " id " + instanceId); + String tservers = "/accumulo/" + instanceId + "/tservers"; + List children = keeper.getChildren().forPath(tservers); + boolean atLeastOneTserver = false; + for (String child : children) { + List locks = keeper.getChildren().forPath(tservers + "/" + child); + if (locks != null && locks.size() > 1) { + atLeastOneTserver = true; + break; + } + } + if (atLeastOneTserver) + dist = new DistributedNamenodeProxy(keeper, uri); + } + } catch (Exception ex) { + log.warn(ex, ex); + } + return dist; + } + @Override public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException { + log.info("getBlockLocations " + src); + if (!isZooName(src)) + return dist().getBlockLocations(src, offset, length); + log.info("getBlockLocation " + src + " offset " + offset + " length " + length); try { Map blocks = new TreeMap(); String blockpath = DNNConstants.NAMESPACE_PATH + src; + boolean underConstruction = false; for (String child : keeper.getChildren().forPath(blockpath)) { byte[] data = keeper.getData().forPath(blockpath + "/" + child); Object obj = deserialize(data); @@ -130,9 +209,12 @@ public class ZookeeperNameNode implement info = (BlockInfo)obj; blocks.put(child, info); log.info(src + " block " + info.id + " size " + info.size); + if (!info.complete) + underConstruction = true; } } } + Map datanodes = findDatanodes(); log.info("Got " + blocks.size() + " blocks for " + src); List lblocks = new ArrayList(); long currentOffset = 0; @@ -140,7 +222,11 @@ public class ZookeeperNameNode implement BlockInfo binfo = entry.getValue(); DatanodeInfo[] info = new DatanodeInfo[binfo.datanodes.length]; for (int j = 0; j < info.length; j++) { - info[j] = new DatanodeInfo(new DatanodeID(binfo.datanodes[j])); + DatanodeRegistration dn = datanodes.get(binfo.datanodes[j]); + if (dn != null) + info[j] = new DatanodeInfo(new DatanodeID(binfo.datanodes[j], dn.getStorageID(), dn.getInfoPort(), dn.getIpcPort())); + else + info[j] = new DatanodeInfo(new DatanodeID(binfo.datanodes[j])); } log.info("Found " + entry.getKey() + " "+ info.length + " locations for block " + binfo.id); if (currentOffset >= offset && currentOffset < offset + length) @@ -148,7 +234,7 @@ public class ZookeeperNameNode implement currentOffset += binfo.size; } log.info("Returning fileLength " + currentOffset + " for " + src); - return new LocatedBlocks(currentOffset, lblocks, false); + return new LocatedBlocks(currentOffset, lblocks, underConstruction); } catch (Exception ex) { throw new IOException(ex); } @@ -178,6 +264,10 @@ public class ZookeeperNameNode implement public void create(String src, FsPermission masked, String clientName, boolean overwrite, boolean createParent, short replication, long blockSize) throws IOException { log.info("creating " + src); + if (!isZooName(src)) { + dist().create(src, masked, clientName, overwrite, createParent, replication, blockSize); + return; + } try { FileInfo fileInfo = new FileInfo(blockSize, System.currentTimeMillis(), masked.toString(), replication, 0); byte[] data = serialize(fileInfo); @@ -236,7 +326,33 @@ public class ZookeeperNameNode implement @Override public void abandonBlock(Block b, String src, String holder) throws IOException { - unimplemented(b, src, holder); + log.info("abandonBlock " + b.getBlockId() + ": " + src); + if (!isZooName(src)) { + dist().abandonBlock(b, src, holder); + return; + } + try { + String path = DNNConstants.NAMESPACE_PATH + src; + for (String child : keeper.getChildren().forPath(path)) { + byte[] blockData = keeper.getData().forPath(path + "/" + child); + Object obj = deserialize(blockData); + if (obj instanceof BlockInfo) { + BlockInfo info = (BlockInfo) obj; + if (info.id == b.getBlockId()) { + keeper.delete().forPath(path + "/" + child); + try { + keeper.delete().forPath(DNNConstants.BLOCKS_PATH + "/" + b.getBlockName()); + } catch (KeeperException.NoNodeException ex) { + // ignored + } + return; + } + } + } + } catch (Exception ex) { + throw new IOException(ex); + } + throw new IOException("Unexpected error abandoning block " + b.getBlockId() + " block not found!"); } @Override @@ -246,6 +362,9 @@ public class ZookeeperNameNode implement @Override public LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludedNodes) throws IOException { + log.info("addBlock " + src); + if (!isZooName(src)) + return dist().addBlock(src, clientName, excludedNodes); // get the list of online data nodes Map nodes; try { @@ -253,10 +372,19 @@ public class ZookeeperNameNode implement } catch (Exception e) { throw new IOException(e); } - int replication = 1; - if(nodes.size() < replication) - throw new IOException("unable to achieve required replication: too few datanodes running"); + short defaultReplication = 3; // TODO: read config + short replication = -1; + try { + String path = DNNConstants.NAMESPACE_PATH + src; + byte data[] = keeper.getData().forPath(path); + HdfsFileStatus status = decodeFile(path, data); + replication = status.getReplication(); + } catch (Exception ex) { + throw new IOException(ex); + } + if (replication < 0) + replication = defaultReplication; List randomList = new ArrayList(nodes.keySet()); Collections.shuffle(randomList); @@ -293,16 +421,20 @@ public class ZookeeperNameNode implement } } - private Map findDatanodes() throws Exception { - List children = keeper.getChildren().forPath(DNNConstants.DATANODES_PATH); + private synchronized Map findDatanodes() throws IOException { Map nodes = new HashMap(); - for (String child : children) { - byte[] data = keeper.getData().forPath(DNNConstants.DATANODES_PATH + "/" + child); - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream ds = new DataInputStream(bais); - DatanodeRegistration registration = new DatanodeRegistration(); - registration.readFields(ds); - nodes.put(child, registration); + try { + List children = keeper.getChildren().forPath(DNNConstants.DATANODES_PATH); + for (String child : children) { + byte[] data = keeper.getData().forPath(DNNConstants.DATANODES_PATH + "/" + child); + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream ds = new DataInputStream(bais); + DatanodeRegistration registration = new DatanodeRegistration(); + registration.readFields(ds); + nodes.put(child, registration); + } + } catch (Exception ex) { + throw new IOException(ex); } return nodes; } @@ -310,37 +442,31 @@ public class ZookeeperNameNode implement @Override public boolean complete(String src, String clientName) throws IOException { log.info("using complete " + src); + if (!isZooName(src)) + return dist().complete(src, clientName); String path = DNNConstants.NAMESPACE_PATH + src; - while (true) { - try { - boolean retry = false; - long length = 0; - for (String child : keeper.getChildren().forPath(path)) { - Object object = deserialize(keeper.getData().forPath(path + "/" + child)); - if (object instanceof BlockInfo) { - BlockInfo info = (BlockInfo)object; - Block block = new Block(info.id); - info = (BlockInfo)deserialize(keeper.getData().forPath(DNNConstants.BLOCKS_PATH + "/" + block.getBlockName())); - log.info("Block size for " + info.id + " is " + info.size); - length += info.size; - if (!info.complete) { - retry = true; - break; - } - } + try { + long length = 0; + for (String child : keeper.getChildren().forPath(path)) { + Object object = deserialize(keeper.getData().forPath(path + "/" + child)); + if (object instanceof BlockInfo) { + BlockInfo info = (BlockInfo)object; + Block block = new Block(info.id); + info = (BlockInfo)deserialize(keeper.getData().forPath(DNNConstants.BLOCKS_PATH + "/" + block.getBlockName())); + log.info("Block size for " + info.id + " is " + info.size); + length += info.size; + if (!info.complete) + return false; } - if (retry) { - UtilWaitThread.sleep(250); - continue; - } - FileInfo info = (FileInfo)deserialize(keeper.getData().forPath(path)); - info.size = length; - keeper.setData().forPath(path, serialize(info)); - log.info("updated file length of " + src + " to " + length); - return true; - } catch (Exception ex) { - log.error(ex, ex); } + FileInfo info = (FileInfo)deserialize(keeper.getData().forPath(path)); + info.size = length; + keeper.setData().forPath(path, serialize(info)); + log.info("updated file length of " + src + " to " + length); + return true; + } catch (Exception ex) { + log.error(ex, ex); + return false; } } @@ -373,6 +499,10 @@ public class ZookeeperNameNode implement @Override public boolean rename(String src, String dst) throws IOException { log.info("rename " + src + " -> " + dst); + if (isZooName(src) != isZooName(dst)) + throw new IOException("You cannot rename files across zookeeper metadata and accumulo metadata"); + if (!isZooName(src)) + return dist().rename(src, dst); try { Object srcInfo = getInfo(DNNConstants.NAMESPACE_PATH + src); Object dstInfo = getInfo(DNNConstants.NAMESPACE_PATH + dst); @@ -392,18 +522,20 @@ public class ZookeeperNameNode implement else delete(dst); } - recursivelyCopy(DNNConstants.NAMESPACE_PATH + src, DNNConstants.NAMESPACE_PATH + dst); + String nsSrc = DNNConstants.NAMESPACE_PATH + src; + recursivelyCopy(nsSrc, DNNConstants.NAMESPACE_PATH + dst); + recursivelyDelete(nsSrc, false); return true; } catch (Exception ex) { throw new IOException(ex); } } - private String basename(String src) { + private static String basename(String src) { return src.substring(src.lastIndexOf("/") + 1); } - private String getParent(String dst) { + private static String getParent(String dst) { return dst.substring(0, dst.lastIndexOf("/")); } @@ -414,23 +546,29 @@ public class ZookeeperNameNode implement @Override public boolean delete(String src, boolean recursive) throws IOException { + log.info("attempting delete " + src); + if (!isZooName(src)) + return dist().delete(src); + try { - recursivelyDelete(DNNConstants.NAMESPACE_PATH + src); + recursivelyDelete(DNNConstants.NAMESPACE_PATH + src, true); } catch (Exception ex) { throw new IOException(ex); } return true; } - private void recursivelyDelete(String path) throws Exception { + private void recursivelyDelete(String path, boolean removeBlocks) throws Exception { + log.info("deleting " + path); List children = null; try { children = keeper.getChildren().forPath(path); } catch (KeeperException.NoNodeException ex) { return; } + log.info("children of " + path + " is " + children); Object obj = deserialize(keeper.getData().forPath(path)); - if (obj instanceof FileInfo) { + if (removeBlocks && obj instanceof FileInfo) { // create the datanode command to (eventually) delete the blocks Map> hostToBlockMap = new HashMap>(); for (String child : children) { @@ -443,7 +581,9 @@ public class ZookeeperNameNode implement hostToBlockMap.put(node, blocks = new ArrayList()); blocks.add(block.id); } - keeper.delete().forPath(DNNConstants.BLOCKS_PATH + "/" + new Block(block.id, 0, 0).getBlockName()); + String blockPath = DNNConstants.BLOCKS_PATH + "/" + new Block(block.id, 0, 0).getBlockName(); + log.info("deleting " + blockPath); + keeper.delete().forPath(blockPath); } } for (Entry> entry : hostToBlockMap.entrySet()) { @@ -457,14 +597,18 @@ public class ZookeeperNameNode implement } } for (String child : children) { - recursivelyDelete(path + "/" + child); + recursivelyDelete(path + "/" + child, removeBlocks); } + log.info("deleting " + path); keeper.delete().forPath(path); } @Override public boolean mkdirs(String src, FsPermission masked) throws IOException { log.info("mkdirs " + src); + if (!isZooName(src)) + return dist().mkdirs(src, masked); + try { DirInfo dirInfo = new DirInfo(System.currentTimeMillis()); byte[] data = serialize(dirInfo); @@ -520,6 +664,10 @@ public class ZookeeperNameNode implement @Override public DirectoryListing getListing(String src, byte[] startAfter) throws IOException { + log.info("getListing " + src); + if (!isZooName(src)) + return dist().getListing(src, startAfter); + try { String basePath = DNNConstants.NAMESPACE_PATH + src; List children = keeper.getChildren().forPath(basePath); @@ -538,73 +686,70 @@ public class ZookeeperNameNode implement @Override public void renewLease(String clientName) throws IOException { - // TODO Auto-generated method stub - + log.info("renewLease " + clientName); } @Override public long[] getStats() throws IOException { - // TODO Auto-generated method stub + unimplemented(); return null; } @Override public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) throws IOException { - // TODO Auto-generated method stub + unimplemented(type); return null; } @Override public long getPreferredBlockSize(String filename) throws IOException { - // TODO Auto-generated method stub + unimplemented(filename); return 0; } @Override public boolean setSafeMode(SafeModeAction action) throws IOException { - // TODO Auto-generated method stub + unimplemented(action); return false; } @Override public void saveNamespace() throws IOException { - // TODO Auto-generated method stub - + unimplemented(); } @Override public void refreshNodes() throws IOException { - // TODO Auto-generated method stub - + unimplemented(); } @Override public void finalizeUpgrade() throws IOException { - // TODO Auto-generated method stub - + unimplemented(); } @Override public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException { - // TODO Auto-generated method stub + unimplemented(); return null; } @Override public void metaSave(String filename) throws IOException { - // TODO Auto-generated method stub - + unimplemented(filename); } @Override public void setBalancerBandwidth(long bandwidth) throws IOException { - // TODO Auto-generated method stub - + unimplemented(bandwidth); } @Override public HdfsFileStatus getFileInfo(String src) throws IOException { - log.info("Get file status"); + log.info("getFileInfo " + src); + if (!isZooName(src)) + return dist().getFileInfo(src); + try { byte[] data = keeper.getData().forPath(DNNConstants.NAMESPACE_PATH + src); HdfsFileStatus result = decodeFile(src, data); @@ -624,54 +769,51 @@ public class ZookeeperNameNode implement @Override public ContentSummary getContentSummary(String path) throws IOException { - // TODO Auto-generated method stub + unimplemented(path); return null; } @Override public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException { - // TODO Auto-generated method stub - + unimplemented(path, namespaceQuota, diskspaceQuota); } @Override public void fsync(String src, String client) throws IOException { - // TODO Auto-generated method stub - + unimplemented(src, client); } @Override public void setTimes(String src, long mtime, long atime) throws IOException { - // TODO Auto-generated method stub - + unimplemented(src, mtime, atime); } @Override public Token getDelegationToken(Text renewer) throws IOException { - // TODO Auto-generated method stub + unimplemented(renewer); return null; } @Override public long renewDelegationToken(Token token) throws IOException { - // TODO Auto-generated method stub + unimplemented(token); return 0; } @Override public void cancelDelegationToken(Token token) throws IOException { - // TODO Auto-generated method stub - + unimplemented(token); } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - // TODO Auto-generated method stub + unimplemented(protocol, clientVersion); return 0; } @Override public DatanodeRegistration register(DatanodeRegistration registration) throws IOException { + log.info("register " + registration); if (keeper != null) { log.info("registering in zookeeper as " + registration.name); ByteArrayOutputStream stream = new ByteArrayOutputStream(); @@ -698,12 +840,18 @@ public class ZookeeperNameNode implement throw new IOException(e); } } + findDatanodes(); + FakeNameNode dist = dist(); + if (dist != null) { + dist.register(registration); + } return registration; } @Override public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, long capacity, long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount) throws IOException { + log.info("sendHeartbeat " + registration); List commands = new ArrayList(); try { String commandsPath = DNNConstants.DATANODES_PATH + "/" + registration.getName() + "/commands"; @@ -715,15 +863,30 @@ public class ZookeeperNameNode implement } catch (Exception e) { throw new IOException(e); } + findDatanodes(); + if (dist != null) { + DatanodeCommand[] cmds = dist.sendHeartbeat(registration, capacity, dfsUsed, remaining, xmitsInProgress, xceiverCount); + if (cmds != null) { + for (DatanodeCommand cmd : cmds) { + commands.add(cmd); + } + } + } return commands.toArray(new DatanodeCommand[0]); } @Override public DatanodeCommand blockReport(DatanodeRegistration registration, long[] blocks) throws IOException { + log.info("blockReport " + registration); + if (dist != null) { + return dist.blockReport(registration, blocks); + } BlockListAsLongs blist = new BlockListAsLongs(blocks); Set current = new HashSet(); for (int i = 0; i < blist.getNumberOfBlocks(); i++) { - current.add(blist.getBlockId(i)); + if (isZooBlockId(blist.getBlockId(i))) { + current.add(blist.getBlockId(i)); + } } log.info(registration.name + " reports " + current); return null; @@ -731,32 +894,38 @@ public class ZookeeperNameNode implement @Override public void blocksBeingWrittenReport(DatanodeRegistration registration, long[] blocks) throws IOException { - // TODO Auto-generated method stub - + unimplemented(registration, new BlockListAsLongs(blocks)); } @Override public void blockReceived(DatanodeRegistration registration, Block[] blocks, String[] delHints) throws IOException { log.info("blockRecieved " + Arrays.asList(blocks)); + FakeNameNode dist = dist(); for (Block block : blocks) { - String path = DNNConstants.BLOCKS_PATH + "/" + block.getBlockName(); - try { - BlockInfo info = (BlockInfo)deserialize(keeper.getData().forPath(path)); - info.size = block.getNumBytes(); - info.complete = true; - byte[] data = serialize(info); - keeper.setData().forPath(path, data); - log.info("Block size updated on " + block + " to " + info.size); - } catch (Exception e) { - log.error(e, e); + if (!isZooBlockId(block.getBlockId()) && dist == null) { + throw new IOException("blockReceived for distributed name node, but there are no data nodes yet! " + block.getBlockId()); + } + if (isZooBlockId(block.getBlockId())) { + String path = DNNConstants.BLOCKS_PATH + "/" + block.getBlockName(); + try { + BlockInfo info = (BlockInfo)deserialize(keeper.getData().forPath(path)); + info.size = block.getNumBytes(); + info.complete = true; + byte[] data = serialize(info); + keeper.setData().forPath(path, data); + log.info("Block size updated on " + block + " to " + info.size); + } catch (Exception e) { + log.error(e, e); + } } } + if (dist != null) + dist.blockReceived(registration, blocks, delHints); } @Override public void errorReport(DatanodeRegistration registration, int errorCode, String msg) throws IOException { - // TODO Auto-generated method stub - + unimplemented(registration, errorCode, msg); } @Override @@ -771,21 +940,20 @@ public class ZookeeperNameNode implement @Override public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException { - // TODO Auto-generated method stub + unimplemented(comm); return null; } @Override public long nextGenerationStamp(Block block, boolean fromNN) throws IOException { - // TODO Auto-generated method stub + unimplemented(block, fromNN); return 0; } @Override public void commitBlockSynchronization(Block block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) throws IOException { - // TODO Auto-generated method stub - + unimplemented(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets); } }