hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r739388 [1/4] - in /hadoop/zookeeper/trunk: ./ src/contrib/ src/contrib/bookkeeper/ src/contrib/bookkeeper/benchmark/ src/contrib/bookkeeper/benchmark/org/ src/contrib/bookkeeper/benchmark/org/apache/ src/contrib/bookkeeper/benchmark/org/ap...
Date Fri, 30 Jan 2009 19:30:28 GMT
Author: mahadev
Date: Fri Jan 30 19:30:27 2009
New Revision: 739388

URL: http://svn.apache.org/viewvc?rev=739388&view=rev
Log:
ZOOKEEPER-276. Bookkeeper contribution (Flavio and Luca Telloli via mahadev)

Added:
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/README.txt
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/MySqlClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/TestClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/build.xml
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/conf/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/conf/log4j.properties
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AddCallback.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ReadEntryCallback.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/WriteCallback.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/ClientBase.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/build.xml
    hadoop/zookeeper/trunk/src/contrib/fatjar/build.xml
    hadoop/zookeeper/trunk/src/contrib/zkfuse/build.xml

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=739388&r1=739387&r2=739388&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Jan 30 19:30:27 2009
@@ -131,6 +131,10 @@
    ZOOKEEPER-269. connectionloss- add more documentation to detail. (phunt and
 flavio via mahadev)
 
+NEW FEATURES:
+
+   ZOOKEEPER-276. Bookkeeper contribution (Flavio and Luca Telloli via mahadev)
+
 Release 3.0.0 - 2008-10-21
 
 Non-backward compatible changes:

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/README.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/README.txt?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/README.txt (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/README.txt Fri Jan 30 19:30:27 2009
@@ -0,0 +1,62 @@
+BookKeeper README
+
+1- Overview
+BookKeeper is a highly available logging service. As many critical services rely upon write-ahead logs to provide persistence along with high performance, an alternative to make such a service highly available despite the failures of individual servers it to offload write-ahead logs to an external service. 
+
+This is exactly what BookKeeper provides. With BookKeeper, a service (or application) writes to a set of servers dedicated to storing such logs. An example of such an application is the Namenode of the Hadoop Distributed File System. 
+
+The main components of BookKeeper are:
+* Client: Applications interact with BookKeeper through the interface of of a BookKeeper client;
+* Ledger: A ledger is our equivalent to a log file. Clients read entries from and write entries to ledgers;  
+* Bookie: Bookies are BookKeeper servers and they store the content of ledgers. Typically there are multiple bookies implementing a ledger.
+
+2- How to compile
+Run "ant" from "trunk/contrib/bookkeeper". This will generate the bookkeeper jar in "trunk/build/contrib/bookkeeper".
+
+3- Setting up
+
+A typical BookKeeper configuration includes a set of bookies and a ZooKeeper ensemble, where the ZooKeeper instance stores metadata for BookKeeper. As an example of such metadata, BookKeeper clients learn about available bookies by consulting a ZooKeeper service. 
+
+To set up BookKeeper, follow these steps:
+* Once bookies and ZooKeeper servers are running, create two znodes: "/ledgers" and "/ledgers/available". 
+* To run a bookie, run the java class "org.apache.bookkeeper.proto.BookieServer". It takes 3 parameters: a port, one directory path for transaction logs, and one directory path for indexes and data. Here is an example: java -cp .:bookkeeper.jar:../ZooKeeper/zookeeper-dev.jar:/usr/local/apache-log4j-1.2.15/log4j-1.2.15.jar -Dlog4j.configuration=log4j.properties org.apache.bookkeeper.proto.BookieServer 3181 /disk1/bk/ /disk2/bk/
+* For each bookie b, if <host> is the host name of b and <port> is the bookie port, then create a znode "/ledgers/available/<host>:<port>".
+* It is ready to run! 
+
+For test purposes, there is a class named "org.apache.bookkeeper.util.LocalBookkeeper" which runs a custom number on BookKeeper servers, along with a ZooKeeper server, on a single node. A typical invocation would be: 
+java -cp:<classpath> org.apache.bookkeeper.util.LocalBookKeeper <number-of-bookies>
+
+4- Developing applications
+
+BookKeeper is written in Java. When implementing an application that uses BookKeeper, follow these steps:
+
+a. Instantiate a BookKeeper object. The single parameter to the BookKeeper constructor is a list of ZooKeeper servers;
+b. Once we have a BookKeeper object, we can create a ledger with createLedger. The default call to createLedger takes a single parameter, which is supposed to be for password authentication, but currently it has no effect. A call to createLedger returns a ledger handle (type LedgerHandle);
+c. Once we have a ledger, we can write to the ledger by calling either addEntry or asyncAddEntry. The first call is synchronous, whereas the second call is asynchronous, and both write byte arrays as entries. To use the asynchronous version, the application has to implement the AddCallback interface;
+d. Ideally, once the application finishes writing to the ledger, it should close it by calling close on the ledger handle. If it doesn't then BookKeeper will try to recover the ledger when a client tries to open it. By closing the ledger properly, we avoid this recovery step, which is recommended but not mandatory;
+e. Before reading from a ledger, a client has to open it by calling openLedger on a BookKeeper object, and readEntries or asycnReadEntries to read entries. Both read calls take as input two entry numbers, n1 and n2, and return all entries from n1 through n2.   
+
+Here is a simple example of a method that creates a BookKeeper object, creates a ledger, writes an entry to the ledger, and closes it:
+
+BookKeeper bk;
+LedgerHandle lh;
+
+public void allInOne(String servers) throws KeeperException, IOException, InterruptedException{
+        bk = new BookKeeper(servers);
+        try{
+          lh = bk.createLedger(new byte[] {'a', 'b'});
+          bk.addEntry(lh, new byte[]{'a', 'b'});
+          bk.close(lh);
+        } catch (BKException e) {
+            e.printStackTrace();
+        }
+    }
+
+5- Selecting quorum mode and number of bookies (advanced)
+
+There are two methods to store ledgers with BookKeeper:
+
+a. Self-verifying: Each entry includes a digest that is used to guarantee that upon a read, the value read is the same as the one written. This mode requires n > 2t bookies, and quorums of size t + 1. By default, a call to createLedger uses this method and 3 servers;
+b. Generic: Entries do not include a digest, and it requires more replicas: n > 3t and quorums of size 2t + 1. 
+
+The quorum mode and number of bookies can be selected through the createLedger method.

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/MySqlClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/MySqlClient.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/MySqlClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/MySqlClient.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,137 @@
+package org.apache.bookkeeper.benchmark;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.FileOutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.QuorumEngine;
+import org.apache.log4j.Logger;
+
+
+import org.apache.zookeeper.KeeperException;
+
+public class MySqlClient {
+	static Logger LOG = Logger.getLogger(QuorumEngine.class);
+
+	BookKeeper x;
+	LedgerHandle lh;
+	Integer entryId;
+	HashMap<Integer, Integer> map;
+
+	FileOutputStream fStream;
+	FileOutputStream fStreamLocal;
+	long start, lastId;
+	Connection con;
+	Statement stmt;
+	
+	
+	public MySqlClient(String hostport, String user, String pass) 
+			throws ClassNotFoundException {
+		entryId = 0;
+		map = new HashMap<Integer, Integer>();
+		Class.forName("com.mysql.jdbc.Driver");
+		// database is named "bookkeeper"
+		String url = "jdbc:mysql://" + hostport + "/bookkeeper";
+		try {
+			con = DriverManager.getConnection(url, user, pass);
+			stmt = con.createStatement();
+			// drop table and recreate it
+			stmt.execute("DROP TABLE IF EXISTS data;");
+			stmt.execute("create table data(transaction_id bigint PRIMARY KEY AUTO_INCREMENT, content TEXT);");
+			LOG.info("Database initialization terminated");
+		} catch (SQLException e) {
+			
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	public void closeHandle() throws KeeperException, InterruptedException, SQLException{
+		con.close();
+	}
+	/**
+	 * First parameter is an integer defining the length of the message 
+	 * Second parameter is the number of writes
+	 * Third parameter is host:port 
+	 * Fourth parameter is username
+	 * Fifth parameter is password
+	 * @param args
+	 * @throws ClassNotFoundException 
+	 * @throws SQLException 
+	 */
+	public static void main(String[] args) throws ClassNotFoundException, SQLException {		
+		int lenght = Integer.parseInt(args[1]);
+		StringBuffer sb = new StringBuffer();
+		while(lenght-- > 0){
+			sb.append('a');
+		}
+		try {
+			MySqlClient c = new MySqlClient(args[2], args[3], args[4]);
+			c.writeSameEntryBatch(sb.toString().getBytes(), Integer.parseInt(args[0]));
+			c.writeSameEntry(sb.toString().getBytes(), Integer.parseInt(args[0]));
+			c.closeHandle();
+		} catch (NumberFormatException e) {
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} catch (KeeperException e) {
+			e.printStackTrace();
+		} 
+
+	}
+
+	/**	
+	 * 	Adds  data entry to the DB 
+	 * 	@param data 	the entry to be written, given as a byte array 
+	 * 	@param times	the number of times the entry should be written on the DB	*/
+	void writeSameEntryBatch(byte[] data, int times) throws InterruptedException, SQLException{
+		start = System.currentTimeMillis();
+		int count = times;
+		String content = new String(data);
+		System.out.println("Data: " + content + ", " + data.length);
+		while(count-- > 0){
+			stmt.addBatch("insert into data(content) values(\"" + content + "\");");
+		}
+		LOG.info("Finished writing batch SQL command in ms: " + (System.currentTimeMillis() - start));
+		start = System.currentTimeMillis();
+		stmt.executeBatch();
+		System.out.println("Finished " + times + " writes in ms: " + (System.currentTimeMillis() - start));       
+		LOG.info("Ended computation");
+	}
+
+	void writeSameEntry(byte[] data, int times) throws InterruptedException, SQLException{
+		start = System.currentTimeMillis();
+		int count = times;
+		String content = new String(data);
+		System.out.println("Data: " + content + ", " + data.length);
+		while(count-- > 0){
+			stmt.executeUpdate("insert into data(content) values(\"" + content + "\");");
+		}
+		System.out.println("Finished " + times + " writes in ms: " + (System.currentTimeMillis() - start));       
+		LOG.info("Ended computation");
+	}
+
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/TestClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/TestClient.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/TestClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/benchmark/org/apache/bookkeeper/benchmark/TestClient.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,269 @@
+package org.apache.bookkeeper.benchmark;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.bookkeeper.client.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.QuorumEngine;
+import org.apache.bookkeeper.client.ReadCallback;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.KeeperException;
+
+public class TestClient 
+    implements AddCallback, ReadCallback{
+    Logger LOG = Logger.getLogger(QuorumEngine.class);
+    
+    BookKeeper x;
+    LedgerHandle lh;
+    Integer entryId;
+    HashMap<Integer, Integer> map;
+    
+    FileOutputStream fStream;
+    FileOutputStream fStreamLocal;
+    long start, lastId;
+    
+    public TestClient() {
+        entryId = 0;
+        map = new HashMap<Integer, Integer>();
+    }
+    
+    public TestClient(String servers) throws KeeperException, IOException, InterruptedException{
+        this();
+        x = new BookKeeper(servers);
+        try{
+        lh = x.createLedger(new byte[] {'a', 'b'});
+        } catch (BKException e) {
+            System.out.println(e.toString());
+        }
+    }
+    
+    public TestClient(String servers, int ensSize, int qSize)
+    throws KeeperException, IOException, InterruptedException{
+        this();
+        x = new BookKeeper(servers);
+        try{
+        lh = x.createLedger(ensSize, new byte[] {'a', 'b'}, qSize, QMode.VERIFIABLE);
+        } catch (BKException e) {
+            System.out.println(e.toString());
+        }
+    }
+    
+    public TestClient(FileOutputStream fStream)
+    throws FileNotFoundException {
+        this.fStream = fStream;
+        this.fStreamLocal = new FileOutputStream("./local.log");
+    }
+    
+    
+    public Integer getFreshEntryId(int val){
+        ++this.entryId;
+        synchronized (map) {
+            map.put(this.entryId, val);
+        }
+        return this.entryId;
+    }
+    
+    public boolean removeEntryId(Integer id){
+        boolean retVal = false;
+        //int val;
+        synchronized (map) {
+            //val = map.get(id);
+            //if(--val == 0){
+                map.remove(id);
+                retVal = true;
+            //} else {
+                //map.put(id, val);
+            //}
+     
+            if(map.size() == 0) map.notifyAll();
+            else{
+                if(map.size() < 4)
+                    LOG.error(map.toString());
+            }
+        }
+        return retVal;
+    }
+
+    public void closeHandle() throws KeeperException, InterruptedException{
+        x.closeLedger(lh);
+    }
+    /**
+     * First parameter is an integer defining the length of the message 
+     * Second parameter is the number of writes
+     * @param args
+     */
+    public static void main(String[] args) {
+        
+        int lenght = Integer.parseInt(args[1]);
+        StringBuffer sb = new StringBuffer();
+        while(lenght-- > 0){
+            sb.append('a');
+        }
+        
+        Integer selection = Integer.parseInt(args[0]);
+        switch(selection){
+        case 0:           
+            StringBuffer servers_sb = new StringBuffer();
+            for (int i = 4; i < args.length; i++){
+                servers_sb.append(args[i] + " ");
+            }
+        
+            String servers = servers_sb.toString().trim().replace(' ', ',');
+            try {
+                /*int lenght = Integer.parseInt(args[1]);
+                StringBuffer sb = new StringBuffer();
+                while(lenght-- > 0){
+                    sb.append('a');
+                }*/
+                TestClient c = new TestClient(servers, Integer.parseInt(args[3]), Integer.parseInt(args[4]));
+                c.writeSameEntryBatch(sb.toString().getBytes(), Integer.parseInt(args[2]));
+                //c.writeConsecutiveEntriesBatch(Integer.parseInt(args[0]));
+                c.closeHandle();
+            } catch (NumberFormatException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            break;
+        case 1:
+            
+            try{
+                TestClient c = new TestClient(new FileOutputStream(args[2]));
+                c.writeSameEntryBatchFS(sb.toString().getBytes(), Integer.parseInt(args[3]));
+            } catch(FileNotFoundException e){
+                e.printStackTrace();
+            }
+            break;
+        case 2:
+            break;
+        }
+    }
+
+    void writeSameEntryBatch(byte[] data, int times) throws InterruptedException{
+        start = System.currentTimeMillis();
+        int count = times;
+        System.out.println("Data: " + new String(data) + ", " + data.length);
+        while(count-- > 0){
+            x.asyncAddEntry(lh, data, this, this.getFreshEntryId(2));
+        }
+        System.out.println("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));       
+        synchronized (map) {
+            if(map.size() != 0)
+                map.wait();
+        }
+        System.out.println("Finished processing in ms: " + (System.currentTimeMillis() - start));
+        /*Integer mon = Integer.valueOf(0);
+        synchronized(mon){
+            
+                try{                  
+                    x.asyncReadEntries(lh, 0, times - 1, this, mon);
+                    mon.wait();
+                } catch (BKException e){
+                    LOG.error(e);
+                }
+        } */
+        LOG.error("Ended computation");
+    }
+    
+    void writeConsecutiveEntriesBatch(int times) throws InterruptedException{
+        start = System.currentTimeMillis();
+        int count = times;
+        while(count-- > 0){
+            byte[] write = new byte[2];
+            int j = count%100;
+            int k = (count+1)%100;
+            write[0] = (byte) j;
+            write[1] = (byte) k;
+            x.asyncAddEntry(lh, write, this, this.getFreshEntryId(2));
+        }
+        System.out.println("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));       
+        synchronized (map) {
+            if(map.size() != 0)
+                map.wait();
+        }
+        System.out.println("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
+        
+        Integer mon = Integer.valueOf(0);
+        synchronized(mon){
+            try{
+                x.asyncReadEntries(lh, 1, times - 1, this, mon);
+                mon.wait();
+            } catch (BKException e){
+                LOG.error(e);
+            }
+        }
+        LOG.error("Ended computation");
+    }
+
+    void writeSameEntryBatchFS(byte[] data, int times) {
+        int count = times;
+        System.out.println("Data: " + data.length + ", " + times);
+        try{
+            start = System.currentTimeMillis();
+            while(count-- > 0){
+                fStream.write(data);
+                fStreamLocal.write(data);
+                fStream.flush();
+            }
+            //fStream.flush();
+            fStream.close();
+            System.out.println("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
+        } catch(IOException e){
+            e.printStackTrace();
+        }
+    }
+        
+    @Override
+    public void addComplete(int rc, long ledgerId, long entryId, Object ctx) {
+        this.removeEntryId((Integer) ctx);
+        //if((entryId - lastId) > 1) LOG.error("Gap: " + entryId + ", " + lastId);
+        //lastId = entryId;
+        //if(entryId > 199000) LOG.error("Add completed: " + ledgerId + ", " + entryId + ", " + map.toString());
+        //System.out.println((System.currentTimeMillis() - start));
+    }
+    @Override
+    public void readComplete(int rc, long ledgerId, LedgerSequence seq, Object ctx){
+        System.out.println("Read callback: " + rc);
+        while(seq.hasMoreElements()){
+            LedgerEntry le = seq.nextElement();
+            System.out.println(new String(le.getEntry()));
+        }
+        synchronized(ctx){
+            ctx.notify();
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/build.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/build.xml?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/build.xml (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/build.xml Fri Jan 30 19:30:27 2009
@@ -0,0 +1,153 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="bookkeeper" default="jar">
+  <import file="../build-contrib.xml"/>
+
+	<property name="test.build.dir" value="${build.test}" />
+    <property name="test.src.dir" value="test"/>
+    <property name="test.log.dir" value="${test.build.dir}/logs" />
+    <property name="test.data.dir" value="${test.build.dir}/data" />
+    <property name="test.data.upgrade.dir" value="${test.data.dir}/upgrade" />
+    <property name="test.tmp.dir" value="${test.build.dir}/tmp" />
+    <property name="test.output" value="no" />
+    <property name="test.timeout" value="900000" />
+    <property name="test.junit.output.format" value="plain" />
+    <property name="test.junit.fork.mode" value="perTest" />
+    <property name="test.junit.printsummary" value="yes" />
+    <property name="test.junit.haltonfailure" value="no" />
+    <property name="test.junit.maxmem" value="512m" />
+
+  
+  <available classname="org.apache.zookeeper.ZooKeeperMain"
+             classpathref="classpath"
+             property="mainIsCompiled"/>
+
+  <target name="checkMainCompiled" unless="mainIsCompiled">
+    <fail message="ZooKeeper main must first be compiled (toplevel build.xml)"/>
+  </target>
+
+  <target name="versionedjarname" if="version">
+    <property name="jarname"
+              value="${build.dir}/zookeeper-${version}-${name}.jar"/>
+  </target>
+
+  <target name="unversionedjarname" unless="version">
+    <property name="jarname"
+              value="${build.dir}/zookeeper-dev-${name}.jar"/>
+  </target>
+
+  <target name="setjarname" depends="versionedjarname, unversionedjarname"/>
+
+  <!-- Override jar target to specify main class -->
+  <target name="jar" depends="checkMainCompiled, setjarname, compile">
+    <echo message="contrib: ${name}"/>
+
+    <jar jarfile="${jarname}">
+      <manifest>
+        <attribute name="Main-Class" value="org.apache.zookeeper.util.FatJarMain" />
+        <attribute name="Built-By" value="${user.name}"/>
+        <attribute name="Built-At" value="${build.time}"/>
+        <attribute name="Built-On" value="${host.name}" />
+        <attribute name="Implementation-Title" value="org.apache.zookeeper"/>
+        <attribute name="Implementation-Version" value="${revision}"/>
+        <attribute name="Implementation-Vendor" value="The Apache Software Foundation"/>
+      </manifest>
+      <fileset file="${zk.root}/LICENSE.txt" />
+      <fileset dir="${build.classes}"/>
+      <fileset dir="${build.test}"/>
+    </jar>
+  </target>
+
+	<target name="test" depends="compile-test,test-init,test-category,junit.run" />
+
+	<target name="compile-test" depends="compile">
+  		<property name="target.jdk" value="${ant.java.version}" />	
+		<property name="src.test.local" location="${basedir}/test" />
+		<mkdir dir="${build.test}"/>
+		<javac srcdir="${src.test.local}" 
+			destdir="${build.test}" 
+			target="${target.jdk}" 
+			debug="on" >
+			<classpath refid="classpath" />
+		</javac>
+	</target>
+	
+    <target name="test-init" depends="jar,compile-test">
+        <delete dir="${test.log.dir}" />
+        <delete dir="${test.tmp.dir}" />
+        <delete dir="${test.data.dir}" />
+        <mkdir dir="${test.log.dir}" />
+        <mkdir dir="${test.tmp.dir}" />
+        <mkdir dir="${test.data.dir}" />
+    </target>
+
+	<target name="test-category">
+         <property name="test.category" value=""/>
+    </target>
+
+	<target name="junit.run">
+		<echo message="${test.src.dir}" />
+        <junit showoutput="${test.output}"
+               printsummary="${test.junit.printsummary}"
+               haltonfailure="${test.junit.haltonfailure}"
+               fork="yes"
+               forkmode="${test.junit.fork.mode}"
+               maxmemory="${test.junit.maxmem}"
+               dir="${basedir}" timeout="${test.timeout}"
+               errorProperty="tests.failed" failureProperty="tests.failed">
+          <sysproperty key="build.test.dir" value="${test.tmp.dir}" />
+          <sysproperty key="test.data.dir" value="${test.data.dir}" />
+          <sysproperty key="log4j.configuration"
+                    value="file:${basedir}/conf/log4j.properties" />
+          <classpath refid="classpath"/>
+          <classpath>
+             <pathelement path="${build.test}" />
+          </classpath>
+          <formatter type="${test.junit.output.format}" />
+          <batchtest todir="${test.log.dir}" unless="testcase">
+              <fileset dir="${test.src.dir}"
+                     includes="**/*${test.category}Test.java"/>
+          </batchtest>
+          <batchtest todir="${test.log.dir}" if="testcase">
+              <fileset dir="${test.src.dir}" includes="**/${testcase}.java"/>
+          </batchtest>
+       </junit>
+            <fail if="tests.failed">Tests failed!</fail>
+    </target>
+
+  <target name="package" depends="jar, zookeeperbuildcontrib.package"
+          unless="skip.contrib">
+
+    <copy file="${basedir}/build.xml" todir="${dist.dir}/contrib/${name}"/>
+
+    <mkdir dir="${dist.dir}/contrib/${name}/test"/>
+    <copy todir="${dist.dir}/contrib/${name}/test">
+      <fileset dir="${basedir}/test"/>
+    </copy>
+    <mkdir dir="${dist.dir}/contrib/${name}/benchmark"/>
+    <copy todir="${dist.dir}/contrib/${name}/benchmark">
+      <fileset dir="${basedir}/benchmark"/>
+    </copy>
+    <mkdir dir="${dist.dir}/contrib/${name}/src"/>
+    <copy todir="${dist.dir}/contrib/${name}/src">
+      <fileset dir="${basedir}/src"/>
+    </copy>
+  </target>
+
+</project>
+

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/conf/log4j.properties?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/conf/log4j.properties (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/conf/log4j.properties Fri Jan 30 19:30:27 2009
@@ -0,0 +1,72 @@
+#
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# 
+#
+
+#
+# ZooKeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only
+log4j.rootLogger=TRACE, CONSOLE
+
+# Example with rolling log file
+#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
+
+# Example with rolling log file and tracing
+#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=TRACE
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.ConsoleAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=bookkeeper.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+
+
+#
+# Add TRACEFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
+log4j.appender.TRACEFILE.Threshold=TRACE
+log4j.appender.TRACEFILE.File=bookkeeper_trace.log
+
+log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
+### Notice we are including log4j's NDC here (%x)
+log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,343 @@
+package org.apache.bookkeeper.bookie;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.bookkeeper.client.AddCallback;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Implements a bookie.
+ *
+ */
+
+public class Bookie extends Thread {
+    HashMap<Long, LedgerDescriptor> ledgers = new HashMap<Long, LedgerDescriptor>();
+    Logger LOG = Logger.getLogger(Bookie.class);
+    /**
+     * 4 byte signature followed by 2-byte major and 2-byte minor versions
+     */
+    private static byte ledgerHeader[] =  { 0x42, 0x6f, 0x6f, 0x6b, 0, 0, 0, 0};
+    
+    final File journalDirectory;
+
+    final File ledgerDirectories[];
+    
+    public static class NoLedgerException extends IOException {
+        private static final long serialVersionUID = 1L;
+        private long ledgerId;
+        public NoLedgerException(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+        public long getLedgerId() {
+            return ledgerId;
+        }
+    }
+    public static class NoEntryException extends IOException {
+        private static final long serialVersionUID = 1L;
+        private long ledgerId;
+        private long entryId;
+        public NoEntryException(long ledgerId, long entryId) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+        public long getLedger() {
+            return ledgerId;
+        }
+        public long getEntry() {
+            return entryId;
+        }
+    }
+
+    public Bookie(File journalDirectory, File ledgerDirectories[]) {
+        this.journalDirectory = journalDirectory;
+        this.ledgerDirectories = ledgerDirectories;
+        setDaemon(true);
+        LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
+        start();
+    }
+
+    private void putHandle(LedgerDescriptor handle) {
+        synchronized (ledgers) {
+            handle.decRef();
+        }
+    }
+
+    private LedgerDescriptor getHandle(long ledgerId, boolean readonly) throws IOException {
+        LedgerDescriptor handle = null;
+        synchronized (ledgers) {
+            handle = ledgers.get(ledgerId);
+            if (handle == null) {
+                handle = createHandle(ledgerId, readonly);
+                ledgers.put(ledgerId, handle);
+            }
+            handle.incRef();
+        }
+        return handle;
+    }
+
+    private LedgerDescriptor createHandle(long ledgerId, boolean readOnly) throws IOException {
+        RandomAccessFile ledgerFile = null;
+        RandomAccessFile ledgerIndexFile = null;
+        String ledgerName = getLedgerName(ledgerId, false);
+        String ledgerIndexName = getLedgerName(ledgerId, true);
+        for (File d : ledgerDirectories) {
+            File lf = new File(d, ledgerName);
+            File lif = new File(d, ledgerIndexName);
+            if (lf.exists()) {
+                if (ledgerFile != null) {
+                    throw new IOException("Duplicate ledger file found for "
+                            + ledgerId);
+                }
+                ledgerFile = new RandomAccessFile(lf, "rw");
+            }
+            if (lif.exists()) {
+                if (ledgerIndexFile != null) {
+                    throw new IOException(
+                            "Duplicate ledger index file found for " + ledgerId);
+                }
+                ledgerIndexFile = new RandomAccessFile(lif, "rw");
+            }
+        }
+        if (ledgerFile == null && ledgerIndexFile == null) {
+            if (readOnly) {
+                throw new NoLedgerException(ledgerId);
+            }
+            File dirs[] = pickDirs(ledgerDirectories);
+            File lf = new File(dirs[0], ledgerName);
+            checkParents(lf);
+            ledgerFile = new RandomAccessFile(lf, "rw");
+            ledgerFile.write(ledgerHeader);
+            File lif = new File(dirs[1], ledgerIndexName);
+            checkParents(lif);
+            ledgerIndexFile = new RandomAccessFile(lif, "rw");
+        }
+        if (ledgerFile != null && ledgerIndexFile != null) {
+            return new LedgerDescriptor(ledgerId, ledgerFile.getChannel(),
+                    ledgerIndexFile.getChannel());
+        }
+        if (ledgerFile == null) {
+            throw new IOException("Found index but no data for " + ledgerId);
+        }
+        throw new IOException("Found data but no index for " + ledgerId);
+    }
+    
+    static final private void checkParents(File f) throws IOException {
+        File parent = f.getParentFile();
+        if (parent.exists()) {
+            return;
+        }
+        if (parent.mkdirs() == false) {
+            throw new IOException("Counldn't mkdirs for " + parent);
+        }
+    }
+
+    static final private Random rand = new Random();
+
+    static final private File[] pickDirs(File dirs[]) {
+        File rc[] = new File[2];
+        rc[0] = dirs[rand.nextInt(dirs.length)];
+        rc[1] = dirs[rand.nextInt(dirs.length)];
+        return rc;
+    }
+
+    static final private String getLedgerName(long ledgerId, boolean isIndex) {
+        int parent = (int) (ledgerId & 0xff);
+        int grandParent = (int) ((ledgerId & 0xff00) >> 8);
+        StringBuilder sb = new StringBuilder();
+        sb.append(Integer.toHexString(grandParent));
+        sb.append('/');
+        sb.append(Integer.toHexString(parent));
+        sb.append('/');
+        sb.append(Long.toHexString(ledgerId));
+        if (isIndex) {
+            sb.append(".idx");
+        }
+        return sb.toString();
+    }
+
+    static class QueueEntry {
+        QueueEntry(ByteBuffer entry, long ledgerId, long entryId, 
+                AddCallback cb, Object ctx) {
+            this.entry = entry.duplicate();
+            this.cb = cb;
+            this.ctx = ctx;
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+
+        ByteBuffer entry;
+        
+        long ledgerId;
+        
+        long entryId;
+
+        AddCallback cb;
+
+        Object ctx;
+    }
+
+    LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
+
+    public final static long preAllocSize = 4*1024*1024;
+    
+    public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+    
+    public void run() {
+        LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
+        ByteBuffer lenBuff = ByteBuffer.allocate(4);
+        try {
+            FileChannel logFile = new RandomAccessFile(new File(journalDirectory,
+                    Long.toHexString(System.currentTimeMillis()) + ".txn"),
+                    "rw").getChannel();
+            zeros.clear();
+            long nextPrealloc = preAllocSize;
+            logFile.write(zeros, nextPrealloc);
+            while (true) {
+                QueueEntry qe = null;
+                if (toFlush.isEmpty()) {
+                    qe = queue.take();
+                } else {
+                    qe = queue.poll();
+                    if (qe == null || toFlush.size() > 100) {
+                        logFile.force(false);
+                        for (QueueEntry e : toFlush) {
+                            e.cb.addComplete(0, e.ledgerId, e.entryId, e.ctx);
+                        }
+                        toFlush.clear();
+                    }
+                }
+                if (qe == null) {
+                    continue;
+                }
+                lenBuff.clear();
+                lenBuff.putInt(qe.entry.remaining());
+                lenBuff.flip();
+                logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
+                if (logFile.position() > nextPrealloc) {
+                    nextPrealloc = (logFile.size() / preAllocSize + 1) * preAllocSize;
+                    zeros.clear();
+                    logFile.write(zeros, nextPrealloc);
+                }
+                toFlush.add(qe);
+            }
+        } catch (InterruptedException e) {
+            LOG.debug("Bookie thread exiting due to interrupt");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void shutdown() throws InterruptedException {
+        this.interrupt();
+        this.join();
+        for(LedgerDescriptor d: ledgers.values()) {
+            d.close();
+        }
+    }
+    
+    public void addEntry(ByteBuffer entry, AddCallback cb, Object ctx)
+            throws IOException {
+        long ledgerId = entry.getLong();
+        LedgerDescriptor handle = getHandle(ledgerId, false);
+        try {
+            entry.rewind();
+            long entryId = handle.addEntry(entry);
+            entry.rewind();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Adding " + entryId + "@" + ledgerId);
+            }
+            queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
+        } finally {
+            putHandle(handle);
+        }
+    }
+
+    public ByteBuffer readEntry(long ledgerId, long entryId) throws IOException {
+        LedgerDescriptor handle = getHandle(ledgerId, true);
+        try {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Reading " + entryId + "@" + ledgerId);
+            }
+            return handle.readEntry(entryId);
+        } finally {
+            putHandle(handle);
+        }
+    }
+
+    // The rest of the code is test stuff
+    static class CounterCallback implements AddCallback {
+        int count;
+
+        synchronized public void addComplete(int rc, long l, long e, Object ctx) {
+            count--;
+            if (count == 0) {
+                notifyAll();
+            }
+        }
+
+        synchronized public void incCount() {
+            count++;
+        }
+
+        synchronized public void waitZero() throws InterruptedException {
+            while (count > 0) {
+                wait();
+            }
+        }
+    }
+
+    /**
+     * @param args
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static void main(String[] args) throws IOException,
+            InterruptedException {
+        Bookie b = new Bookie(new File("/tmp"), new File[] { new File("/tmp") });
+        CounterCallback cb = new CounterCallback();
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < 100000; i++) {
+            ByteBuffer buff = ByteBuffer.allocate(1024);
+            buff.putLong(1);
+            buff.putLong(i);
+            buff.limit(1024);
+            buff.position(0);
+            cb.incCount();
+            b.addEntry(buff, cb, null);
+        }
+        cb.waitZero();
+        long end = System.currentTimeMillis();
+        System.out.println("Took " + (end-start) + "ms");
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,154 @@
+package org.apache.bookkeeper.bookie;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Implements a ledger inside a bookie. In particular, it implements operations
+ * to write entries to a ledger and read entries from a ledger.
+ *
+ */
+public class LedgerDescriptor {
+    Logger LOG = Logger.getLogger(LedgerDescriptor.class);
+    LedgerDescriptor(long ledgerId, FileChannel ledger, FileChannel ledgerIndex) {
+        this.ledgerId = ledgerId;
+        this.ledger = ledger;
+        this.ledgerIndex = ledgerIndex;
+    }
+    private long ledgerId;
+    private FileChannel ledger;
+    private FileChannel ledgerIndex;
+    private int refCnt;
+    synchronized public void incRef() {
+        refCnt++;
+    }
+    synchronized public void decRef() {
+        refCnt--;
+    }
+    synchronized public int getRefCnt() {
+        return refCnt;
+    }
+    static private final long calcEntryOffset(long entryId) {
+        return 8L*entryId;
+    }
+    long addEntry(ByteBuffer entry) throws IOException {
+        ByteBuffer offsetBuffer = ByteBuffer.wrap(new byte[8]);
+        long ledgerId = entry.getLong();
+        if (ledgerId != this.ledgerId) {
+            throw new IOException("Entry for ledger " + ledgerId + " was sent to " + this.ledgerId);
+        }
+        /*
+         * Get entry id
+         */
+                
+        long entryId = entry.getLong();
+        entry.rewind();
+        
+        /*
+         * Set offset of entry id to be the current ledger position
+         */
+        offsetBuffer.rewind();
+        offsetBuffer.putLong(ledger.position());
+        LOG.debug("Offset: " + ledger.position() + ", " + entry.position() + ", " + calcEntryOffset(entryId) + ", " + entryId);
+        offsetBuffer.flip();
+        
+        /*
+         * Write on the index entry corresponding to entryId the position
+         * of this entry.
+         */
+        ledgerIndex.write(offsetBuffer, calcEntryOffset(entryId));
+        ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+        
+        
+        lenBuffer.putInt(entry.remaining());
+        lenBuffer.flip();
+        
+        /*
+         * Write length of entry first, then the entry itself
+         */
+        ledger.write(lenBuffer);
+        ledger.write(entry);
+        //entry.position(24);
+        //LOG.debug("Entry: " + entry.position() + ", " + new String(entry.array()));
+     
+        return entryId;
+    }
+    ByteBuffer readEntry(long entryId) throws IOException {
+        ByteBuffer buffer = ByteBuffer.wrap(new byte[8]);
+        long offset;
+        /*
+         * If entryId is -1, then return the last written.
+         */
+        if (entryId == -1) {
+            offset = ledgerIndex.size()-8; 
+        } else {
+            offset = calcEntryOffset(entryId);
+        }
+        int len = ledgerIndex.read(buffer, offset);
+        buffer.flip();
+        if (len != buffer.limit()) {
+            throw new Bookie.NoEntryException(ledgerId, entryId);
+        }
+        offset = buffer.getLong();
+        if (offset == 0) {
+            throw new Bookie.NoEntryException(ledgerId, entryId);
+        }
+        LOG.debug("Offset: " + offset);
+
+        buffer.limit(4);
+        buffer.rewind();
+        /*
+         * Read the length
+         */
+        ledger.read(buffer, offset);
+        buffer.flip();
+        len = buffer.getInt();
+        LOG.debug("Length of buffer: " + len);
+        buffer = ByteBuffer.allocate(len);
+        /*
+         * Read the rest. We add 4 to skip the length
+         */
+        ledger.read(buffer, offset + 4);
+        buffer.flip();
+        return buffer;
+    }
+    void close() {
+        try {
+            ledger.close();
+        } catch (IOException e) {
+            LOG.warn("Error closing ledger " + ledgerId, e);
+        }
+        try {
+            ledgerIndex.close();
+        } catch (IOException e) {
+            LOG.warn("Error closing index for ledger " + ledgerId, e);
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AddCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AddCallback.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AddCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AddCallback.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,37 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+/**
+ * Callback interface for add entry calls.
+ */
+
+public interface AddCallback {
+	/**
+	 * Callback declaration
+	 * 
+	 * @param rc	return code
+	 * @param ledgerId	ledger identifier
+	 * @param entryId	entry identifier
+	 * @param ctx	control object
+	 */
+    void addComplete(int rc, long ledgerId, long entryId, Object ctx);
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,130 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.lang.Exception;
+
+/**
+ * Implements BookKeeper exceptions. 
+ * 
+ */
+
+@SuppressWarnings("serial")
+public abstract class BKException extends Exception {
+
+    private int code;
+    public BKException(int code){
+        this.code = code;
+    }
+    
+    public static BKException create(int code){
+        switch(code){
+        case Code.ReadException:
+            return new BKReadException();
+        case Code.QuorumException:
+            return new BKQuorumException();
+        case Code.NoBookieAvailableException:
+            return new BKBookieException();
+        case Code.DigestNotInitializedException:
+            return new BKDigestNotInitializedException();
+        case Code.DigestMatchException:
+            return new BKDigestMatchException();
+        default:
+            return new BKIllegalOpException();
+        }
+    }
+    
+    public interface Code {
+        int OK = 0;
+        int ReadException = -1;
+        int QuorumException = -2;
+        int NoBookieAvailableException = -3;
+        int DigestNotInitializedException = -4;
+        int DigestMatchException = -5;
+        
+        int IllegalOpException = -100;
+    }
+    
+    public void setCode(int code){
+        this.code = code;
+    }
+    
+    public int getCode(){
+        return this.code;
+    }
+    
+    public String getMessage(int code){
+        switch(code){
+        case Code.OK:
+            return "No problem";
+        case Code.ReadException:
+            return "Error while reading ledger";
+        case Code.QuorumException:
+            return "Invalid quorum size on ensemble size";
+        case Code.NoBookieAvailableException:
+            return "Invalid quorum size on ensemble size";
+        case Code.DigestNotInitializedException:
+            return "Digest engine not initialized";
+        case Code.DigestMatchException:
+            return "Entry digest does not match";
+        default:
+            return "Invalid operation";
+        }
+    }
+    
+    public static class BKReadException extends BKException {
+        public BKReadException(){
+            super(Code.ReadException);
+        }   
+    }
+    
+    public static class BKQuorumException extends BKException {
+        public BKQuorumException(){
+            super(Code.QuorumException);
+        }   
+    }
+     
+    public static class BKBookieException extends BKException {
+        public BKBookieException(){
+            super(Code.NoBookieAvailableException);
+        }   
+    }
+    
+    public static class BKDigestNotInitializedException extends BKException {
+        public BKDigestNotInitializedException(){
+            super(Code.DigestNotInitializedException);
+        }   
+    }
+    
+    public static class BKDigestMatchException extends BKException {
+        public BKDigestMatchException(){
+            super(Code.DigestMatchException);
+        }   
+    }
+    
+    public static class BKIllegalOpException extends BKException {
+        public BKIllegalOpException(){
+            super(Code.IllegalOpException);
+        }   
+    }
+}
+    

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,701 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.HashSet;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Random;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+
+/**
+ * BookKeeper client. We assume there is one single writer 
+ * to a ledger at any time. 
+ * 
+ * There are three possible operations: start a new ledger, 
+ * write to a ledger, and read from a ledger.
+ * 
+ *
+ */
+
+public class BookKeeper 
+implements ReadCallback, AddCallback, Watcher {
+    
+    Logger LOG = Logger.getLogger(BookKeeper.class);
+
+    static public final String prefix = "/ledgers/L";
+    static public final String ensemble = "/ensemble"; 
+    static public final String quorumSize = "/quorum";
+    static public final String close = "/close";
+    static public final String quorumMode = "/mode";
+    
+    ZooKeeper zk = null;
+    QuorumEngine engine = null;
+    MessageDigest md = null;
+    //HashMap<Long, ArrayBlockingQueue<Operation> > qeMap;
+    HashMap<Long, QuorumEngine> engines;
+    HashSet<InetSocketAddress> bookieBlackList;
+    
+    LedgerSequence responseRead;
+    Long responseLong;
+    
+    public BookKeeper(String servers) 
+    throws KeeperException, IOException{
+    	LOG.debug("Creating BookKeeper for servers " + servers);
+        //Create ZooKeeper object
+        this.zk = new ZooKeeper(servers, 10000, this);
+        
+        //Create hashmap for quorum engines
+        //this.qeMap = new HashMap<Long, ArrayBlockingQueue<Operation> >();
+        this.engines = new HashMap<Long, QuorumEngine >();
+        //List to enable clients to blacklist bookies
+        this.bookieBlackList = new HashSet<InetSocketAddress>();
+    }
+    
+    /**
+     * Watcher method. 
+     */
+    synchronized public void process(WatchedEvent event) {
+        LOG.info("Process: " + event.getType() + " " + event.getPath());
+    }
+    
+    
+    /**
+     * Implements objects to help with the synchronization of asynchronous calls
+     * 
+     */
+    
+    private static class RetCounter {
+        int i;
+        int rc;
+        int total;
+        LedgerSequence seq = null;
+        
+        synchronized void inc() {
+            i++;
+            total++;
+        }
+        synchronized void dec() {
+            i--;
+            notifyAll();
+        }
+        synchronized void block(int limit) throws InterruptedException {
+            while(i > limit) {
+                int prev = i;
+                wait(15000);
+                if(i == prev){
+                    break;
+                }
+            }
+        }
+        synchronized int total() {
+            return total;
+        }
+        
+        void setrc(int rc){
+            this.rc = rc;
+        }
+        
+        int getrc(){
+            return rc;
+        }
+        
+        void setSequence(LedgerSequence seq){
+            this.seq = seq;
+        }
+        
+        LedgerSequence getSequence(){
+            return seq;
+        }
+    }
+    
+    /**
+     * Formats ledger ID according to ZooKeeper rules
+     * 
+     * @param id	znode id
+     */
+    private String getZKStringId(long id){
+        return String.format("%010d", id);        
+    }
+    
+    
+    /**
+     * Creates a new ledger. To create a ledger, we need to specify the ensemble
+     * size, the quorum size, the operation mode, and a password. The ensemble size
+     * and the quorum size depend upon the operation mode. The operation mode can be
+     * GENERIC, VERIFIABLE, or FREEFORM (debugging). The password is used not only
+     * to authenticate access to a ledger, but also to verify entries in verifiable
+     * ledgers.
+     * 
+     * @param ensSize   ensemble size
+     * @param qSize     quorum size
+     * @param mode      quorum mode: VERIFIABLE (default), GENERIC, or FREEFORM
+     * @param passwd    password
+     */
+    public LedgerHandle createLedger(int ensSize, int qSize, QMode mode,  byte passwd[])
+    throws KeeperException, InterruptedException, 
+    IOException, BKException {
+        // Check that quorum size follows the minimum
+        long t;
+        switch(mode){
+        case VERIFIABLE:
+            t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/2));
+            if(t == 0){
+                LOG.error("Tolerates 0 bookie failures"); 
+                throw BKException.create(Code.QuorumException);
+            }
+            break;
+        case GENERIC:
+            t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/3));
+            if(t == 0){
+                LOG.error("Tolerates 0 bookie failures"); 
+                throw BKException.create(Code.QuorumException);
+            }
+            break;
+        case FREEFORM:
+            break;
+        }
+        
+        /*
+         * Create ledger node on ZK.
+         * We get the id from the sequence number on the node.
+         */
+        
+        String path = zk.create(prefix, new byte[0], 
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+        
+        /* 
+         * Extract ledger id.
+         */
+        String parts[] = path.split("/");
+        String subparts[] = parts[2].split("L");
+        System.out.println("SubPath: " + subparts[0]);
+        long lId = Long.parseLong(subparts[1]);
+               
+        /* 
+         * Get children from "/ledgers/available" on zk
+         */
+        List<String> list = 
+            zk.getChildren("/ledgers/available", false);
+        ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
+        
+        /* 
+         * Select ensSize servers to form the ensemble
+         */
+        System.out.println("create: " + (prefix + getZKStringId(lId) + ensemble));
+        path = zk.create(prefix + getZKStringId(lId) + ensemble, new byte[0], 
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        
+        /* 
+         * Add quorum size to ZK metadata
+         */
+        ByteBuffer bb = ByteBuffer.allocate(4);
+        bb.putInt(qSize);
+        zk.create(prefix + getZKStringId(lId) + quorumSize, bb.array(), 
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        
+        /* 
+         * Quorum mode
+         */
+        bb = ByteBuffer.allocate(4);
+        bb.putInt(mode.ordinal());
+        zk.create(prefix + getZKStringId(lId) + quorumMode, bb.array(), 
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        
+        /* 
+         * Create QuorumEngine
+         */
+        LedgerHandle lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
+        //ArrayBlockingQueue<Operation> queue = new ArrayBlockingQueue<Operation>(200);
+        engines.put(lh.getId(), new QuorumEngine(lh)); //queue));
+        //qeMap.put(lId, queue);
+        
+        /*
+         * Adding bookies to ledger handle
+         */
+        Random r = new Random();
+        
+        for(int i = 0; i < ensSize; i++){
+        	int index = 0;
+        	if(list.size() > 1) 
+        		index = r.nextInt(list.size() - 1);
+        	else if(list.size() == 1)
+        	    index = 0;
+        	else {
+        	    LOG.error("Not enough bookies available");
+        	    engines.remove(lh.getId());
+        	    
+        	    return null;
+        	}
+            
+        	try{
+        	    String bookie = list.remove(index);
+        	    LOG.info("Bookie: " + bookie);
+        	    InetSocketAddress tAddr = parseAddr(bookie);
+        	    lh.addBookie(tAddr);         	
+        	    String pBookie = "/" + bookie;
+        	    zk.create(prefix + getZKStringId(lId) + ensemble + pBookie, new byte[0], 
+        	            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        	} catch (IOException e) {
+        	    LOG.error(e);
+        	    i--;
+        	} 
+        }
+      
+        LOG.debug("Created new ledger");
+        // Return ledger handler
+        return lh; 
+    }
+
+    /**
+     * Creates a new ledger. Default of 3 servers, and quorum of 2 servers,
+     * verifiable ledger.
+     * 
+     * @param passwd	password
+     */
+    public LedgerHandle createLedger(byte passwd[])
+    throws KeeperException, BKException, 
+    InterruptedException, IOException {
+        return createLedger(3, 2, QMode.VERIFIABLE, passwd);
+    }
+
+
+    
+    /**
+     * Open existing ledger for reading. Default for quorum size is 2.
+     * 
+     * @param long  the long corresponding to the ledger id
+     * @param byte[]    byte array corresponding to the password to access a ledger
+     * @param int   the quorum size, it has to be at least ceil(n+1/2)
+     */
+    public LedgerHandle openLedger(long lId, byte passwd[])
+    throws KeeperException, InterruptedException, IOException, BKException {
+        
+        Stat stat = null;
+        
+        /*
+         * Check if ledger exists
+         */
+        if(zk.exists(prefix + getZKStringId(lId), false) == null){
+            LOG.error("Ledger " + getZKStringId(lId) + " doesn't exist.");
+            return null;
+        }
+        
+        /*
+         * Get quorum size.
+         */
+        ByteBuffer bb = ByteBuffer.wrap(zk.getData(prefix + getZKStringId(lId) + quorumSize, false, stat));
+        int qSize = bb.getInt();
+         
+        /*
+         * Get last entry written from ZK 
+         */
+        
+        long last = 0;
+        LOG.debug("Close path: " + prefix + getZKStringId(lId) + close);
+        if(zk.exists(prefix + getZKStringId(lId) + close, false) == null){
+            recoverLedger(lId, passwd);
+        }
+            
+        stat = null;
+        byte[] data = zk.getData(prefix + getZKStringId(lId) + close, false, stat);
+        ByteBuffer buf = ByteBuffer.wrap(data);
+        last = buf.getLong();
+        //zk.delete(prefix + getZKStringId(lId) + close, -1);
+        
+        /*
+         * Quorum mode 
+         */
+        data = zk.getData(prefix + getZKStringId(lId) + quorumMode, false, stat);
+        buf = ByteBuffer.wrap(data);
+        //int ordinal = buf.getInt();
+        
+        QMode qMode;
+        switch(buf.getInt()){
+        case 1:
+            qMode = QMode.GENERIC;
+            LOG.info("Generic ledger");
+            break;
+        case 2:
+            qMode = QMode.FREEFORM;
+            break;
+        default:
+            qMode = QMode.VERIFIABLE;
+            LOG.info("Verifiable ledger");
+        }
+        
+        /*
+         *  Create QuorumEngine
+         */
+        LedgerHandle lh = new LedgerHandle(this, lId, last, qSize, qMode, passwd);
+        engines.put(lh.getId(), new QuorumEngine(lh));// queue));
+        
+        /*
+         * Get children of "/ledgers/id/ensemble" 
+         */
+        
+        List<String> list = 
+            zk.getChildren(prefix + getZKStringId(lId) + ensemble, false);
+                
+        for(String s : list){
+            try{
+                lh.addBookie(parseAddr(s));
+            } catch (IOException e){
+                LOG.error(e);
+            }
+        }
+      
+        // Return ledger handler
+        return lh;
+    }    
+    
+    /**
+     * Parses address into IP and port.
+     * 
+     *  @param addr	String
+     */
+    
+    private InetSocketAddress parseAddr(String s){
+        String parts[] = s.split(":");
+        if (parts.length != 2) {
+            System.out.println(s
+                    + " does not have the form host:port");
+        }
+        InetSocketAddress addr = new InetSocketAddress(parts[0],
+                Integer.parseInt(parts[1]));
+        return addr;
+    }
+    
+    public void initMessageDigest(String alg)
+    throws NoSuchAlgorithmException {
+        md = MessageDigest.getInstance(alg);
+    }
+    
+    /**
+     * Add entry synchronously to an open ledger.
+     * 
+     * @param	lh	LedgerHandle
+     * @param	data byte[]
+     */
+    
+    public long addEntry(LedgerHandle lh, byte[] data)
+    throws InterruptedException{
+        LOG.debug("Adding entry " + data);
+        RetCounter counter = new RetCounter();
+        counter.inc();
+        
+        if(lh != null){ 
+        	Operation r = new AddOp(lh, data, this, counter);
+        	engines.get(lh.getId()).sendOp(r);
+        	//qeMap.get(lh.getId()).put(r);
+        
+        	counter.block(0);
+        
+        	return counter.getrc();
+        } else return -1;
+    }
+    
+    /**
+     * Add entry asynchronously to an open ledger.
+     * 
+     * @param lh	ledger handle returned with create
+     * @param data	array of bytes to be written
+     * @param cb	object implementing callbackinterface
+     * @param ctx	some control object
+     */
+    public void asyncAddEntry(LedgerHandle lh, byte[] data, AddCallback cb, Object ctx)
+    throws InterruptedException {
+        LOG.debug("Adding entry asynchronously: " + data);
+        //lh.incLast();
+        if(lh != null){
+            AddOp r = new AddOp(lh, data, cb, ctx);
+            engines.get(lh.getId()).sendOp(r);
+        }
+        //qeMap.get(lh.getId()).put(r);
+    }
+    
+    /**
+     * Add entry asynchronously to an open ledger.
+     */
+    //public  void asyncAddEntryVerifiable(LedgerHandle lh, byte[] data, AddCallback cb, Object ctx)
+    //throws InterruptedException, IOException, BKException, NoSuchAlgorithmException {
+    //    if(md == null)
+    //        throw BKException.create(Code.DigestNotInitializedException);
+    //        
+    //    LOG.info("Data size: " + data.length);
+    //    AddOp r = new AddOp(lh, data, cb, ctx);
+    //    //r.addDigest();
+    //    LOG.info("Data length: " + r.data.length);
+    //    engines.get(lh.getId()).sendOp(r);
+    //    //qeMap.get(lh.getId()).put(r);
+    //}
+    
+    
+    /**
+     * Read a sequence of entries synchronously.
+     * 
+     * @param lh	ledger handle returned with create
+     * @param firstEntry	id of first entry of sequence
+     * @param lastEntry		id of last entry of sequence
+     *
+     */
+    public LedgerSequence readEntries(LedgerHandle lh, long firstEntry, long lastEntry) 
+    throws InterruptedException, BKException {
+        // Little sanity check
+        if((firstEntry > lh.getLast()) || (firstEntry > lastEntry))
+            throw BKException.create(Code.ReadException);
+        
+        RetCounter counter = new RetCounter();
+        counter.inc();
+        
+        Operation r = new ReadOp(lh, firstEntry, lastEntry, this, counter);
+        engines.get(lh.getId()).sendOp(r);
+        //qeMap.get(lh.getId()).put(r);
+        LOG.debug("Going to wait for read entries: " + counter.i);
+        counter.block(0);
+        LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
+        
+        if(counter.getSequence() == null) throw BKException.create(Code.ReadException);
+        return counter.getSequence();
+    }
+    
+    /**
+     * Read a sequence of entries asynchronously.
+     * 
+     * @param lh	ledger handle
+     * @param firstEntry	id of first entry of sequence
+     * @param lastEntry		id of last entry of sequence
+     * @param cb	object implementing read callback interface
+     * @param ctx	control object 
+     */
+    public void asyncReadEntries(LedgerHandle lh, long firstEntry, long lastEntry, ReadCallback cb, Object ctx)
+    throws BKException, InterruptedException {
+        // Little sanity check
+        if((firstEntry > lh.getLast()) || (firstEntry > lastEntry)) 
+            throw BKException.create(Code.ReadException);
+        
+        Operation r = new ReadOp(lh, firstEntry, lastEntry, cb, ctx);
+        engines.get(lh.getId()).sendOp(r); 
+        //qeMap.get(lh.getId()).put(r);
+    }
+    
+    /**
+     * Close ledger.
+     * 
+     * @param lh	handle of ledger to close
+     */
+    public void closeLedger(LedgerHandle lh) 
+    throws KeeperException, InterruptedException {
+        //Set data on zookeeper
+        ByteBuffer last = ByteBuffer.allocate(8);
+        last.putLong(lh.getLast());
+        LOG.info("Last saved on ZK is: " + lh.getLast());
+        String closePath = prefix + getZKStringId(lh.getId()) + close; 
+        if(zk.exists(closePath, false) == null){
+           zk.create(closePath, 
+                   last.array(), 
+                   Ids.OPEN_ACL_UNSAFE, 
+                   CreateMode.PERSISTENT); 
+        } else {
+            zk.setData(closePath, 
+                last.array(), -1);
+        }
+        lh.close();
+        for(QuorumEngine qe : engines.values()){
+        	StopOp sOp = new StopOp();
+        	qe.sendOp(sOp);
+        }
+    }
+    
+    /**
+     * Check if close node exists. 
+     * 
+     * @param ledgerId	id of the ledger to check
+     */
+    public boolean hasClosed(long ledgerId)
+    throws KeeperException, InterruptedException{
+        String closePath = prefix + getZKStringId(ledgerId) + close;
+        if(zk.exists(closePath, false) == null) return false;
+        else return true;
+    }
+    
+    /**
+     * Recover a ledger that was not closed properly.
+     * 
+     * @param lId	ledger identifier
+     * @param passwd	password
+     */
+    
+    private boolean recoverLedger(long lId, byte passwd[])
+    throws KeeperException, InterruptedException, IOException, BKException {
+        
+        Stat stat = null;
+       
+        LOG.info("Recovering ledger");
+        
+        /*
+         * Get quorum size.
+         */
+        ByteBuffer bb = ByteBuffer.wrap(zk.getData(prefix + getZKStringId(lId) + quorumSize, false, stat));
+        int qSize = bb.getInt();
+                
+        
+        /*
+         * Get children of "/ledgers/id/ensemble" 
+         */
+        
+        List<String> list = 
+            zk.getChildren(prefix + getZKStringId(lId) + ensemble, false);
+        
+        ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+        for(String s : list){
+            addresses.add(parseAddr(s));
+        }
+        
+        /*
+         * Quorum mode 
+         */
+        byte[] data = zk.getData(prefix + getZKStringId(lId) + quorumMode, false, stat);
+        ByteBuffer buf = ByteBuffer.wrap(data);
+        //int ordinal = buf.getInt();
+            
+        QMode qMode = QMode.VERIFIABLE;
+        switch(buf.getInt()){
+        case 0:
+            qMode = QMode.VERIFIABLE;
+            break;
+        case 1:
+            qMode = QMode.GENERIC;
+            break;
+        case 2:
+            qMode = QMode.FREEFORM;
+            break;
+        }
+        
+        /*
+         * Create ledger recovery monitor object
+         */
+        
+        LedgerRecoveryMonitor lrm = new LedgerRecoveryMonitor(this, lId, qSize, addresses, qMode);
+        
+        return lrm.recover(passwd);
+    }
+    
+    /**
+     * Get new bookies
+     * 
+     * @param addrList	list of bookies to replace
+     */
+    public InetSocketAddress getNewBookie(ArrayList<InetSocketAddress> addrList)
+    throws InterruptedException {
+        try{
+            // Get children from "/ledgers/available" on zk
+            List<String> list = 
+                zk.getChildren("/ledgers/available", false);
+            ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
+    
+            for(String addr : list){
+                InetSocketAddress nAddr = parseAddr(addr); 
+                if(!addrList.contains(nAddr) &&
+                        !bookieBlackList.contains(nAddr))
+                    return nAddr;
+            }
+        } catch (KeeperException e){
+            LOG.error("Problem accessing ZooKeeper: " + e);
+        }
+        
+        return null;
+    }
+    
+    /**
+     * Blacklists bookies.
+     * 
+     * @param addr 	address of bookie
+     */
+    void blackListBookie(InetSocketAddress addr){
+        bookieBlackList.add(addr);
+    }
+    
+    /**
+     * Implementation of callback interface for synchronous read method.
+     * 
+     * @param rc	return code
+     * @param leder	ledger identifier
+     * @param seq	sequence of entries
+     * @param ctx	control object
+     */
+    public void readComplete(int rc, 
+            long ledger, 
+            LedgerSequence seq,  
+            Object ctx){        
+        
+        RetCounter counter = (RetCounter) ctx;
+        counter.setSequence(seq);
+        LOG.debug("Read complete: " + seq.size() + ", " + counter.i);
+        counter.dec();
+    }
+    
+    /**
+     * Implementation of callback interface for synchronous read method.
+     * 
+     * @param rc	return code
+     * @param leder	ledger identifier
+     * @param entry	entry identifier
+     * @param ctx	control object
+     */
+    public void addComplete(int rc, 
+            long ledger, 
+            long entry, 
+            Object ctx){          
+        RetCounter counter = (RetCounter) ctx;
+        
+        counter.setrc(rc);
+        counter.dec();
+    }
+}



Mime
View raw message