hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r987314 [9/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cpp...
Date Thu, 19 Aug 2010 21:25:22 GMT
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash Thu Aug 19 21:25:13 2010
@@ -0,0 +1,734 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# This file is a collection of script functions. To use, just run this file
+# with the function you want to execute as the first argument and arguments to
+# the function following that.
+#
+# For instance, to install the prereq jars:
+#
+#   ./hw.bash install-zk
+#
+# Or to run BookKeeper:
+#
+#   ./hw.bash bk 3181 /tmp/bk/{journal,ledgers}
+
+# Note: unbuffer can cause all sorts of funky problems, especially when dealing
+# with high volumes of output from multiple sources!  Problems aren't just
+# about causing your terminal to get garbled; they may be as severe as killing
+# this script itself.
+
+set -o errexit # -o nounset
+
+dark_blue="\e[0;34m";     bright_blue="\e[1;34m"
+dark_green="\e[0;32m";    bright_green="\e[1;32m"
+dark_cyan="\e[0;36m";     bright_cyan="\e[1;36m"
+dark_red="\e[0;31m";      bright_red="\e[1;31m"
+dark_magenta="\e[0;35m";  bright_magenta="\e[1;35m"
+brown="\e[0;33m";         yellow="\e[1;33m"
+black="\e[0;30m";         dark_gray="\e[1;30m";
+bold_white="\e[1;37m"     white="\e[0;37m"
+normal_color="\e[0m"
+
+# Default server ports used
+PROFILER_PORT=9874
+SERVER_PORT=9875
+SSL_SERVER_PORT=9876
+ZOOKEEPER_PORT=9877
+BOOKKEEPER_PORT=9878
+
+: ${script:=$0}
+serverdir="$(readlink -f "$(dirname "$script")/../server/")" || serverdir=
+cmd="$(basename "$script")"
+jarbase=server-1.0-SNAPSHOT-jar-with-dependencies.jar
+if [[ -f "$(dirname "$script")/$jarbase" ]]
+then jar="$(readlink -f "$(dirname "$script")/$jarbase")"
+else jar="$serverdir/target/$jarbase"
+fi
+already_pushed=false
+: ${push_jar:=false} ${push_script:=true} ${use_yjp=false} ${unbuffer:=false}
+: ${loglevel:=} ${dbg:=false} ${attach:=false} ${logconf:=} ${asserts:=false}
+if $unbuffer
+then unbuffercmd='unbuffer -p'
+else unbuffercmd=
+fi
+if $dbg
+then set -x
+fi
+if $asserts
+then JAVAFLAGS=-ea
+fi
+
+#
+# General utilities.
+#
+
+trace() { echo "$@" ; eval "$@" ; }
+
+# Add the given prefix (first arg) to all subsequent words (latter args).
+prefix-words() {
+  local prefix="$1"
+  shift
+  for i in "$@"
+  do echo "$prefix" "$i"
+  done
+}
+
+quote(){
+  "$(dirname "$script")/quote" "$@"
+}
+
+# Retrieve the substring of a string with a given prefix and suffix.
+# For example, substr "Hello World!" "ell" "rld" returns "o Wo".
+# Everything before the prefix and after the suffix (inclusive)
+# is stripped off and the remaining substring is returned.
+substr() {
+    if [ $# == 3 ]
+	then
+	nopref="${1#${1%${2}*}${2}}"
+	echo "${nopref%${3}*}"
+    else
+	echo "Usage: substr string prefix suffix"
+    fi
+}
+
+# Must test connectivity via ssh because we may be firewalled.
+wait-connect() {
+  local prof="${1%:*}" port="${1#*:}"
+  local host="$(ssh-hosts $prof)"
+  while ! echo | ssh "$prof" nc -w 1 localhost "$port"
+  do sleep 1
+  done
+}
+
+#
+# Java runners.
+#
+
+# Launch with yourkit profiler.
+java-yjp() {
+  if $use_yjp
+  then LD_LIBRARY_PATH="${YJP:-$HOME/yjp-8.0.15}/bin/linux-x86-32/" \
+       java -agentlib:yjpagent $JAVAFLAGS "$@"
+  else java $JAVAFLAGS "$@"
+  fi
+}
+
+with-attach() {
+  if $attach
+  then JAVAFLAGS="-agentlib:jdwp=transport=dt_socket,server=y,address=$PROFILER_PORT $JAVAFLAGS" "$@"
+  else "$@"
+  fi
+}
+
+with-logging() {
+  if [[ $loglevel ]] ; then
+    logconf="
+log4j.rootLogger=$loglevel, A1
+log4j.logger.org.apache.zookeeper = ERROR
+log4j.logger.org.apache.bookkeeper.client.QuorumOpMonitor = ERROR
+log4j.logger.org.apache.bookkeeper.proto.BookieClient = ERROR
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d %-4r [%t] %-5p %c %x - %m%n
+"
+  fi
+  if [[ "$logconf" ]] ; then
+    mkdir -p /tmp/$USER/logging/
+    local logdir="$(mktemp -d -p /tmp/$USER/logging/)"
+    echo "$logconf" > "$logdir/log4j.properties"
+    CLASSPATH="$logdir:$CLASSPATH" "$@"
+  else
+    "$@"
+  fi
+}
+
+try-ulimit() { ulimit -n $((1024**2)) || true ; }
+j() { CLASSPATH="$jar" with-logging with-attach java-yjp "$@" ; }
+bk() { j org.apache.bookkeeper.proto.BookieServer "$@" ; }
+#bk() { j com.yahoo.pubsub.client.benchmark.FakeBookie "$@" ; }
+zk() { j org.apache.zookeeper.server.quorum.QuorumPeerMain "$@" ; }
+zkc() { j org.apache.zookeeper.ZooKeeperMain -server "$@" ; }
+hw() { try-ulimit; j org.apache.hedwig.server.netty.PubSubServer "$@" ; }
+hwc() { try-ulimit; hwc-soft "$@" ; }
+hwc-soft() { j org.apache.hedwig.client.benchmark.HedwigBenchmark "$@" ; }
+
+ssh-zkc() {
+  local server="$1" port="$2"
+  ssh "$server" "hedwig/hw.bash zkc localhost:$port"
+}
+
+#
+# Setup
+#
+
+# Get/build the ZK dependencies that must be manually installed and place those
+# jars in the current directory.
+get-zk() {
+  local stagedir="$(pwd)" dstdir="$(pwd)" ver=3.2.0
+  local unpacked="$stagedir/zookeeper-$ver/"
+  local url="http://archive.apache.org/dist/hadoop/zookeeper/zookeeper-$ver/zookeeper-$ver.tar.gz"
+
+  if [[ ! -d "$unpacked" ]]
+  then 
+       echo $url
+       wget -q -O - "$url" | tar xzf - -C "$stagedir"
+  fi
+  ant -q -buildfile "$unpacked/build.xml" compile-test
+  cp -r "$unpacked/src/java/test/"{config,data}/ "$unpacked/build/testclasses/"
+  jar cf zookeeper-test-$ver.jar -C "$(readlink -f "$unpacked/build/testclasses/")" .
+  cp "$unpacked/zookeeper-$ver.jar" .
+  cp "$unpacked/contrib/bookkeeper/zookeeper-$ver-bookkeeper.jar" bookkeeper-$ver.jar
+  jar cf zookeeper-test-$ver-sources.jar -C "$(readlink -f "$unpacked/src/java/test/")" .
+  jar cf zookeeper-$ver-sources.jar -C "$(readlink -f "$unpacked/src/java/main/")" .
+  jar cf bookkeeper-$ver-sources.jar -C "$(readlink -f "$unpacked/src/contrib/bookkeeper/src/java/")" .
+}
+
+get-bk() {
+  local svn="$serverdir/../Zookeeper/" svnver=3.3.0-SNAPSHOT
+  ant -q -buildfile "$svn/build.xml" compile-test
+  ant -q -buildfile "$svn/src/contrib/bookkeeper/build.xml"
+  jar cf bookkeeper-$svnver-sources.jar -C "$(readlink -f "$svn/src/contrib/bookkeeper/src/java/")" .
+  cp "$svn/build/contrib/bookkeeper/zookeeper-dev-bookkeeper.jar" bookkeeper-$svnver.jar
+}
+
+# Install the jars from the current directory, as obtained by get-zk.
+# For now, we will use the checked in ZK/BK jars in the server/lib directory.
+# When an official ZK/BK release for those changes is done, then we can 
+# modify the get-zk function to get the latest code.
+install-zk-bk() {
+  for pkg in zookeeper zookeeper-test bookkeeper ; do
+    local grp="${pkg%-*}" ver=SNAPSHOT
+    for srcs in '' -sources
+    do trace mvn -q install:install-file -Dfile="$pkg-$ver$srcs.jar" \
+        -DgroupId=org.apache.$grp -DartifactId=$pkg -Dpackaging=jar \
+        -Dversion=$ver ${srcs:+-Dclassifier=sources}
+    done
+  done
+}
+
+setup-java() {
+  # wget 'http://cds.sun.com/is-bin/INTERSHOP.enfinity/WFS/CDS-CDS_Developer-Site/en_US/-/USD/VerifyItem-Start/jdk-7-ea-linux-i586.bin?BundledLineItemUUID=dXBIBe.m0.UAAAEiZUUKrYfz&OrderID=BnZIBe.mencAAAEiU0UKrYfz&ProductID=O29IBe.py.oAAAEhK1kP50GU&FileName=/jdk-7-ea-linux-i586.bin'
+  local jdk="$1"
+  parscp "$jdk" ^:/tmp/jdk6
+  parssh "
+    echo yes | /tmp/jdk6 > /tmp/java-install-log &&
+    if ! fgrep jdk1.6.0 ~/.bashrc > /dev/null
+    then echo 'export PATH=~/jdk1.6.0_14/bin/:\$PATH' >> ~/.bashrc
+    fi
+  "
+}
+
+setup-yjp() {
+  local pkg="$1"
+  parscp "$pkg" ^:/tmp/yjp.zip
+  parssh "
+    yes A | unzip -q /tmp/yjp.zip
+    if ! fgrep YJP= ~/.bashrc > /dev/null
+    then echo 'export YJP=~/yjp-8.0.15/' >> ~/.bashrc
+    fi
+  "
+}
+
+# Usage: setup-bk ZKSERVER ZKSERVER_PORT
+#
+# Create the /ledgers and /ledgers/available ZK nodes on the given ZK server.
+# The bookie servers will register themselves once they are up on ZK but they
+# need these nodes to exist first.
+setup-bk() {
+  local server="$1" port="$2"
+  shift 2
+  ssh-zkc "$server" "$port" << EOF || true
+create /ledgers 0
+create /ledgers/available 0
+EOF
+}
+
+# Get rid of duplicate files in a jar.
+strip-jar() {
+  local jar="${1:-$jar}" tmpdir=/tmp/$USER/jar
+  rm -rf "$tmpdir"
+  mkdir -p "$tmpdir"
+  (
+    cd "$tmpdir"
+    jar xf "$jar"
+    jar cf "$jar" .
+  )
+}
+
+# Inspect the current logging level.
+get-logging() {
+  local jar="${1:-$jar}" tmpdir=/tmp/$USER/jar
+  mkdir -p "$tmpdir"
+  (
+    cd "$tmpdir"
+    jar xf "$jar" log4j.properties
+    grep rootLogger= log4j.properties
+  )
+}
+
+# Adjust the log level but without modifying the original source tree or going
+# through the full rebuild process.
+set-logging() {
+  local level="$1" tmpdir=/tmp/$USER/jarlog
+  mkdir -p "$tmpdir"
+  (
+    cd "$tmpdir"
+    jar xf "$jar" log4j.properties
+    sed -i "s/\(rootLogger\)=[[:alpha:]]\+/\1=$level/" log4j.properties
+    jar uf "$jar" log4j.properties
+  )
+}
+
+#
+# General testbed tools.
+#
+
+hosts() {
+  if [[ ! "$hosts" ]]
+  then echo '$hosts not set' 1>&2 ; return 1
+  fi
+  echo $hosts | sed 's/[[:space:]]\+/\n/g' | sort -u
+}
+
+hostargs() { "$@" $hosts ; }
+tagssh() {
+  local prof="$1"
+  shift
+  {
+    ssh "$prof" "export use_yjp=$use_yjp loglevel=$loglevel asserts=$asserts logconf=$(quote "$logconf"); attach=$attach; $@" 2>&1 &&
+    echo -e "$bright_green[SUCCESS]$normal_color" ||
+    echo -e "$bright_red[FAILURE: $?]$normal_color" && false
+  } | $unbuffercmd sed "s/^/$prof: /g"
+}
+parssh() { hosts | xargs ${xargs--P0} -i^ "$script" tagssh ^ "$@" ; }
+parscp() { hosts | xargs ${xargs--P0} -i^ scp "$@" ; }
+
+# Resolves profile names to actual hostnames according to the user's .ssh/config.
+ssh-hosts() {
+  python -c "
+from __future__ import with_statement
+import sys, os
+hosts = {}
+with file(os.environ['HOME'] + '/.ssh/config') as f:
+  for line in f:
+    words = line.split('#')[0].split()
+    if len(words) == 2:
+      if words[0] == 'Host': key = words[1]
+      if words[0] == 'HostName': hosts[key] = words[1]
+for profile in sys.argv[1:]:
+  parts = profile.split(':', 1)
+  key, rest = parts[0], ('' if len(parts) == 1 else ':' + parts[1])
+  print hosts.get(key, key) + rest,
+" "$@"
+}
+
+#
+# Hedwig testbed tools.
+#
+
+# Usage: hosts='wilbur2 wilbur3 wilbur4' ./hw.bash push
+push() {
+  if ! $already_pushed ; then
+    if $push_jar || $push_script
+    then parssh 'mkdir -p hedwig/'
+    fi
+    if $push_jar && $push_script
+    then parscp -q "$script" "$jar" ^:hedwig/
+    elif $push_jar && ! $push_script
+    then parscp -q "$jar" ^:hedwig/
+    elif ! $push_jar && $push_script
+    then parscp -q "$script" ^:hedwig/
+    else :
+    fi
+    already_pushed=true
+  fi
+}
+
+# Kill processes and garbage-collect the log4j temp directories.
+dkill() { parssh 'pkill java; rm -rf /tmp/$USER/logging/' ; }
+
+# Pass in any argument to get a long listing.
+lstatus() {
+  for port in 2181 3181 4080 {9874..9878} ; do
+    if /usr/sbin/lsof -i tcp:$port 2>&1 | fgrep 'LISTEN' > /dev/null ; then
+      if (( $# > 0 )) ; then
+        echo "$port:"
+        ps u $(/usr/sbin/lsof -t -i tcp:$port) | cat
+      else
+        echo -n "$port "
+      fi
+    fi
+  done
+  (( $# > 0 )) || echo
+}
+
+# Pass in any argument to get a long listing.
+dstatus() {
+  push
+  xargs= parssh "hedwig/hw.bash lstatus $@"
+}
+
+# See if anything is running on each machine.
+tops() {
+  xargs= parssh '
+    echo
+    hostname
+    echo =====
+    top -b -n 1 | fgrep -A3 COMMAND
+  '
+}
+
+# Familiarize this machine with the given hosts' keys.
+warmup() {
+  hosts | xargs -P0 -i^ ssh -o StrictHostKeyChecking=no ^ hostname
+}
+
+# Add yourself to nopasswd sudoers for all hosts in $hosts.
+setup-sudo() {
+  local cmd='sudo su - -c "
+  if ! fgrep \"$USER ALL=(yahoo) NOPASSWD:ALL\" /etc/sudoers >& /dev/null
+  then echo -e \"$USER ALL=(ALL) ALL\n$USER ALL=(yahoo) NOPASSWD:ALL\" >> /etc/sudoers
+  fi"'
+  python -c '
+import getpass, os, pexpect, sys
+pw = getpass.getpass()
+for host in os.environ["hosts"].split():
+  c = pexpect.spawn(sys.argv[1], [host] + sys.argv[2:])
+  i = c.expect(["Password:", pexpect.EOF])
+  if i == 0: c.sendline(pw); c.read()
+  filtered = filter(lambda x: pw not in x, c.before.split("\n"))
+  sys.stdout.write("\n".join(filtered).lstrip("\n"))
+  ' ssh "$cmd"
+}
+
+setup-limits() {
+  ssh '
+    if ! sudo fgrep "* hard nofile $((1024**2))" /etc/security/limits.conf >& /dev/null
+    then sudo su - -c "echo \"* hard nofile $((1024**2))\" >> /etc/security/limits.conf"
+    fi
+  '
+}
+
+mkhomes() {
+  ssh 'sudo mkdir -p /home/yang && sudo chown -R yang /home/yang'
+}
+
+#
+# Distributed launchers.
+#
+
+# Given a hostname (*not* an ssh profile), figure out how to utilize the
+# machine's disks for BK.
+bk-journal() {
+  case "$@" in
+    * ) echo '"/d1/$USER/bk/journal"' ;;
+  esac
+}
+bk-ledger() {
+  case "$@" in
+    * ) echo '"/home/$USER/bk/ledger"' ;;
+  esac
+}
+bk-paths() {
+  echo "$(bk-journal "$@") $(bk-ledger "$@")"
+}
+
+# Start ZK on the first arg host and BKs on the second argument which is
+# a string list of hosts.
+start-zk-bks() {
+  hosts="$*" push jar
+  local zk="$1" abks=( $2 )
+  shift
+  tagssh $zk "
+    rm -rf /tmp/$USER/zk/
+    mkdir -p /tmp/$USER/zk/
+    cat > /tmp/$USER/zoo.cfg << EOF
+tickTime=2000
+dataDir=/tmp/$USER/zk/
+clientPort=$ZOOKEEPER_PORT
+EOF
+    hedwig/hw.bash eval 'set -x; zk /tmp/$USER/zoo.cfg'
+  " &
+  wait-connect $zk:$ZOOKEEPER_PORT
+  setup-bk $(ssh-hosts $zk) $ZOOKEEPER_PORT
+  hosts="$*" parssh "
+    rm -rf $(bk-paths "$1")
+    mkdir -p $(bk-paths "$1")
+    hedwig/hw.bash eval $(quote "set -x; bk $BOOKKEEPER_PORT $( ssh-hosts $zk ):$ZOOKEEPER_PORT $(bk-paths "$1")")
+  " &
+  for bk in "${abks[@]}" ; do
+    wait-connect $bk:$BOOKKEEPER_PORT
+  done
+}
+
+# The first argument is a string list of remote region default Hedwig servers
+# in a multi-region setup (if any). The second argument is a string list of
+# Hedwig hubs to start for this local region. The third argument is the single
+# ZooKeeper server host the hubs should connect to.
+start-hw() {
+  local allhws="$1" ahws=( $2 ) zk="$3"
+  if [[ $region ]]
+  then regionconf="region=$region"
+  fi
+  shift
+  hosts="$@" push
+  for hw in "${ahws[@]}" ; do
+    tagssh $hw "
+    mkdir -p /tmp/$USER/
+    cat > /tmp/$USER/hw.conf << EOF
+zk_host=$( ssh-hosts $zk ):$ZOOKEEPER_PORT
+regions=$( ssh-hosts $allhws )
+server_port=$SERVER_PORT
+ssl_server_port=$SSL_SERVER_PORT
+ssl_enabled=true
+inter_region_ssl_enabled=true
+cert_name=/server.p12
+password=eUySvp2phM2Wk
+$regionconf
+$extraconf
+EOF
+    hedwig/hw.bash eval 'set -x; hw /tmp/$USER/hw.conf'
+  " &
+    wait-connect $hw:$SERVER_PORT
+    wait-connect $hw:$SSL_SERVER_PORT
+  done
+}
+
+# The arguments are similar to those for start-hw() above.
+# The additional 4th argument is a string list of BookKeeper servers to start up.
+start-region() {
+  local allhws="$1" hws="$2" zk="$3" bks="$4"
+  shift
+  hosts="$*" push jar
+  start-zk-bks "$zk" "$bks"
+  start-hw "$allhws" "$hws" "$zk"
+}
+
+# Start multiple regions from a file configuration. The format that is expected
+# is to have each region on a separate line with the following format:
+# region=<Region name>, hub=<list of hub servers>, default=<single hub server>, zk=<single ZK server>, bk=<list of BK servers>
+# This will create all of the regions with an all-to-all topology. Each region 
+# is connected to the default hub server of every other region.
+start-regions() {
+  local cfg="$1"
+  local regionPref="region=" hubPref=", hub=" defaultPref=", default=" zkPref=", zk=" bkPref=", bk="
+  while read line ; do
+    local otherhws=
+    while read subline ; do
+      local profile="$(substr "$subline" "$defaultPref" "$zkPref")"
+      if [[ $profile != "$(substr "$line" "$defaultPref" "$zkPref")" ]]
+      then otherhws="$otherhws $profile:$SERVER_PORT:$SSL_SERVER_PORT"
+      fi
+    done < "$cfg"    
+    local curRegion="$(substr "$line" "$regionPref" "$hubPref")" 
+    local curHub="$(substr "$line" "$hubPref" "$defaultPref")"
+    local curZk="$(substr "$line" "$zkPref" "$bkPref")" 
+    local curBk="$(substr "$line" "$bkPref" "")"
+    hosts="$curHub $curZk $curBk" push jar
+    region="$curRegion" start-region "$otherhws" "$curHub" "$curZk" "$curBk" &
+  done < "$cfg"
+  wait
+}
+
+app() {
+  local hw="$1" # the server to connect to
+  push
+  parssh "
+    set -o errexit -x
+    mkdir -p /tmp/\$USER/
+    echo $(quote "default_server_host=$(ssh-hosts "$hw"):$SERVER_PORT:$SSL_SERVER_PORT") > /tmp/\$USER/hwc.conf
+    JAVAFLAGS=$(quote "$JAVAFLAGS") hedwig/hw.bash hwc /tmp/\$USER/hwc.conf
+  "
+}
+
+#
+# Experiments.
+#
+
+sub-exp() {
+  : ${start:=0}
+  push
+  for sync in ${syncs:-true false} ; do
+    for count in ${counts:-1000} ; do
+      for npar in ${npars:-1 10 20 30 40 50} ; do
+        if (( $npar <= $count )) ; then
+          for rep in {1..3} ; do
+            echo JAVAFLAGS="-Dmode=sub -Dsynchronous=$sync -Dcount=$count -Dnpar=$npar -Dstart=$start $JAVAFLAGS" app "$@"
+            JAVAFLAGS="-Dmode=sub -Dcount=$count -Dnpar=$npar -Dstart=$start $JAVAFLAGS" app "$@" >& ${outfile:-sub/sync-$sync-count-$count-npar-$npar-rep-$rep.out}
+            let start+=$count
+          done
+        fi
+      done
+    done
+  done
+}
+
+wait-sub() {
+  local zk="$1" topic="$2" subid="$3" region="$4"
+  while ! echo "ls /hedwig/$region/topics/$topic/subscribers/$subid" | \
+          ssh-zkc "$zk" $ZOOKEEPER_PORT 2>&1 | \
+          tee "${dbgfile:-/dev/null}" | \
+          grep '^\[\]$' > /dev/null
+  do sleep 1
+  done
+}
+
+# Note: this code waits for subscribers to show up in ZK, so when running this
+# multiple times on the same servers, adjust `start` to use a different topic
+# each time; otherwise, you'll immediately see the subscribers from last time,
+# thus causing the script to not wait for the current session's subscribers.
+# Alternatively, adjust recvid.
+#
+# Params:
+#
+# recvs: the list of local receivers
+# pubs: the list of publishers
+# hw: the local hedwig node
+# zk: the local zookeeper node (used to wait for receivers to join)
+#
+# Optional group of params:
+#
+# rrecv: the remote receiver
+# rhw: the remote hedwig node
+# rzk: the remote zookeeper node
+pub-exp() {
+  (( $# >= 4 ))
+  local recvs="$1" pubs="$2" hw="$3" zk="$4"
+  # Optional remote args.
+  if (( $# > 4 ))
+  then local remote=true rrecv="$5" rhw="$6" rzk="$7"
+  else local remote=false
+  fi
+  : ${start:=0} ${count:=100000} ${recvid:=0} ${dir="pub"}
+  hosts="$recvs $pubs $rrecv" push
+  # Convert to arrays.
+  local arecvs=( $recvs ) apubs=( $pubs )
+  mkdir -p "$dir"
+
+  #rregion="$(ssh $rhw cat /tmp/$USER/hw.conf)"
+  region=$hw rregion=$rhw
+
+  # Default to only using all recvs (rather than iterating over subsets).
+  for nrecvs in ${nrecvss:-${#arecvs[*]}} ; do
+    # Ditto for publishers.
+    for npubs in ${npubss:-${#apubs[*]}} ; do
+      # Default to only using a single value of npar.
+      for npar in ${npars:-100} ; do
+        # Default to repeating each trial thrice.
+        for rep in $(seq ${nreps:-3}) ; do
+
+          echo -n "nrecvs=$nrecvs npubs=$npubs npar=$npar rep=$rep"
+
+          local outbase="$dir/nrecvs-$nrecvs-npubs-$npubs-npar-$npar-rep-$rep"
+
+          # Skip if already done.
+          if [[ -f "$outbase"* ]]
+          then echo '...skipped' ; continue
+          else echo
+          fi
+
+          if $remote ; then
+            # Start remote receiver.
+            hosts=$rrecv JAVAFLAGS="-Dmode=recv -DsubId=recv-$recvid -Dstart=$start -Dcount=$((count/npubs*npubs)) $JAVAFLAGS" app "$rhw" >& "${outfile:-$outbase-rrecv.out}" &
+          fi
+
+          # Start all receivers.
+          for ((irecv = 0; irecv < nrecvs; irecv++)) ; do
+            hosts="${arecvs[$irecv]}" JAVAFLAGS="-Dmode=recv -DsubId=recv-$((recvid+irecv)) -Dstart=$start -Dcount=$((count/npubs*npubs)) $JAVAFLAGS" app "$hw" >& "${outfile:-$outbase-recv-$irecv.out}" &
+          done
+
+          # Wait till subscribed.
+          sleep 1
+          for ((irecv = 0; irecv < nrecvs; irecv++))
+          do wait-sub $zk topic-$start recv-$((recvid+irecv)) $region
+          done
+          if $remote ; then
+            # Wait till remote subscribed.
+            wait-sub $rzk topic-$start recv-$recvid $rregion
+            # Wait till cross-region subscribed, since default is async subs.
+            # This should only happen once.
+            wait-sub $zk topic-$start hub-$rregion $region
+          fi
+
+          # Launch all publishers.
+          for ((ipub = 0; ipub < npubs; ipub++)) ; do
+            hosts="${apubs[$ipub]}" JAVAFLAGS="-Dmode=pub -Dnpar=$npar -Dstart=$start -Dcount=$((count/npubs)) $JAVAFLAGS" app "$hw" >& "${outfile:-$outbase-pub-$ipub.out}" &
+          done
+
+          # Wait for everyone to terminate.
+          wait
+
+          # To avoid reusing the same subscriber ID.
+          let recvid+=$nrecvs
+
+        done
+      done
+    done
+  done
+}
+
+pub-exps() {
+  local pool="$1" hw="$2" zk="$3" rrecv="$4" rhw="$5" rzk="$6"
+  local apool=( $pool ) npool=${#apool[*]}
+  echo $apool $npool
+  : ${start:=0}
+  hosts="$pool $rrecv $rhw" push
+
+  quote start=$start npars="${npars:-20 40 60 80 100}" pub-exp ${apool[0]} ${apool[1]} "$hw" "$zk" $rrecv $rhw $rzk
+  start=$start npars="${npars:-20 40 60 80 100}" pub-exp ${apool[0]} ${apool[1]} "$hw" "$zk" $rrecv $rhw $rzk
+
+  pubs=${apool[0]}
+  for ((i = 1; i < npool; i++))
+  do recvs="${recvs:-} ${apool[$i]}"
+  done
+  quote start=$((start+1)) nrecvss="$( seq -s' ' $(( npool - 1 )) )" pub-exp "$recvs" "$pubs" "$hw" "$zk" $rrecv $rhw $rzk
+  start=$((start+1)) nrecvss="$( seq -s' ' $(( npool - 1 )) )" pub-exp "$recvs" "$pubs" "$hw" "$zk" $rrecv $rhw $rzk
+
+  recvs=${apool[0]}
+  for ((i = 1; i < npool; i++))
+  do pubs="${pubs:-} ${apool[$i]}"
+  done
+  quote start=$((start+2)) npubss="$( seq -s' ' $(( npool - 1 )) )" pub-exp "$recvs" "$pubs" "$hw" "$zk" $rrecv $rhw $rzk
+  start=$((start+2)) npubss="$( seq -s' ' $(( npool - 1 )) )" pub-exp "$recvs" "$pubs" "$hw" "$zk" $rrecv $rhw $rzk
+}
+
+#
+# Post-processing
+#
+
+# Consolidate to a directory.
+sub-agg() {
+  local dst="$1"
+  mkdir -p "$dst"
+  for s in 0 1 ; do
+    for i in sync$s/*.out
+    do cp "$i" "$dst/sync-$s-$(basename "$i")"
+    done
+  done
+}
+
+if [[ "$(type -t "$cmd")" == function ]]
+then "$cmd" "$@"
+else "$@"
+fi
+
+# vim: et sw=2 ts=2

Propchange: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash
------------------------------------------------------------------------------
    svn:executable = *

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh Thu Aug 19 21:25:13 2010
@@ -0,0 +1,101 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# If this scripted is run out of /usr/bin or some other system bin directory
+# it should be linked to and not copied. Things like java jar files are found
+# relative to the canonical path of this script.
+#
+
+# Only follow symlinks if readlink supports it. Find the directory path
+# for where this script is being executed from.
+if readlink -f "$0" > /dev/null 2>&1
+then
+  HWBIN=`readlink -f "$0"`
+else
+  HWBIN="$0"
+fi
+HWBINDIR=`dirname "$HWBIN"`
+
+# We use HWCFGDIR if defined, otherwise we use /etc/hedwig
+# or the conf directory that is a sibling of this script's directory.
+# This is to find out where the Hedwig server config file resides.
+if [ "x$HWCFGDIR" = "x" ]
+then
+    if [ -d "/etc/hedwig" ]
+    then
+        HWCFGDIR="/etc/hedwig"
+    else
+        HWCFGDIR="$HWBINDIR/../conf"
+    fi
+fi
+
+# We use HWCFG as the name of the Hedwig server config file if defined,
+# otherwise use the default file name "hw_server.conf".
+if [ "x$HWCFG" = "x" ]
+then
+    HWCFG="hw_server.conf"
+fi
+HWCFG="$HWCFGDIR/$HWCFG"
+
+# If a config file is passed in directly when invoking the script,
+# use that instead.
+if [ "x$2" != "x" ]
+then
+    HWCFG="$HWCFGDIR/$2"
+fi
+
+# Find the Hedwig server jar and setup the CLASSPATH. We assume it to be
+# located in a standard place relative to the location of where this script
+# is executed from.
+HWJAR="$HWBINDIR/../server/target/server-1.0-SNAPSHOT-jar-with-dependencies.jar"
+CLASSPATH="$HWJAR:$CLASSPATH"
+
+# Store the Hedwig server's PID (java process) in the same $HWBINDIR. 
+# This is used for us to stop the server later on via this same script.
+if [ -z $HWPIDFILE ]
+    then HWPIDFILE=$HWBINDIR/hedwig_server.pid
+fi
+
+case $1 in
+start)
+    echo  "Starting Hedwig server ... "
+    java  -cp "$CLASSPATH" org.apache.hedwig.server.netty.PubSubServer "$HWCFG" &
+    /bin/echo -n $! > "$HWPIDFILE"
+    echo STARTED
+    ;;
+stop)
+    echo "Stopping Hedwig server ... "
+    if [ ! -f "$HWPIDFILE" ]
+    then
+    echo "error: could not find file $HWPIDFILE"
+    exit 1
+    else
+    kill -9 $(cat "$HWPIDFILE")
+    rm "$HWPIDFILE"
+    echo STOPPED
+    fi
+    ;;
+restart)
+    shift
+    "$0" stop ${@}
+    sleep 3
+    "$0" start ${@}
+    ;;
+*)
+    echo "Usage: $0 {start|stop|restart}" >&2
+esac

Propchange: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote Thu Aug 19 21:25:13 2010
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+# vim:et:sw=2:ts=2
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import sys
+
+def quote(s):
+  return r'"%s"' % s.replace('\\', r'\\').replace('"', r'\"').replace('$', r'\$')
+
+print ' '.join( map( quote, sys.argv[1:] ) )

Propchange: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote
------------------------------------------------------------------------------
    svn:executable = *

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/lib/README
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/lib/README?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/lib/README (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/lib/README Thu Aug 19 21:25:13 2010
@@ -0,0 +1,4 @@
+The Zookeeper and BookKeeper jars included in this server/lib directory were
+created off the zookeeper Apache trunk at revision 947756. They have new 
+features that haven't been put into a release version yet so just putting it
+here temporarily.

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/pom.xml?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/pom.xml (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/pom.xml Thu Aug 19 21:25:13 2010
@@ -0,0 +1,153 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hedwig</groupId>
+    <artifactId>hedwig</artifactId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+  <properties>
+      <mainclass>org.apache.hedwig.server.netty.PubSubServer</mainclass>
+  </properties>
+  <groupId>org.apache.hedwig</groupId>
+  <artifactId>server</artifactId>
+  <packaging>jar</packaging>
+  <version>1.0-SNAPSHOT</version>
+  <name>server</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.8.1</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hedwig</groupId>
+      <artifactId>client</artifactId>
+      <version>1.0-SNAPSHOT</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+    	<groupId>org.apache.derby</groupId>
+    	<artifactId>derby</artifactId>
+    	<version>10.4.2.0</version>
+    	<scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.0</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper-test</artifactId>
+      <version>3.4.0</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper</artifactId>
+      <version>3.4.0</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <archive>
+            <manifest>
+              <mainClass>${mainclass}</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+<!--
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>1.2.1</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <artifactSet>
+                <excludes>
+                  <exclude>classworlds:classworlds</exclude>
+                  <exclude>junit:junit</exclude>
+                  <exclude>jmock:jmock</exclude>
+                  <exclude>xml-apis:xml-apis</exclude>
+                </excludes>
+              </artifactSet>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      -->
+      <plugin>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>removebuilddir</id>
+            <phase>clean</phase>
+            <configuration>
+              <tasks>
+                <delete dir="build" />
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>createbuilddir</id>
+            <phase>generate-test-resources</phase>
+            <configuration>
+              <tasks>
+                <mkdir dir="build" />
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.benchmark;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+public abstract class AbstractBenchmark {
+
+	static final Logger logger = Logger.getLogger(AbstractBenchmark.class);
+    
+	AtomicLong totalLatency = new AtomicLong();
+    LinkedBlockingQueue<Boolean> doneSignalQueue = new LinkedBlockingQueue<Boolean>();
+
+    abstract void doOps(int numOps) throws Exception;
+	abstract void tearDown() throws Exception;
+    
+	protected class AbstractCallback{
+		AtomicInteger numDone = new AtomicInteger(0);
+		Semaphore outstanding;
+		int numOps;
+		boolean logging;
+		
+		public AbstractCallback(Semaphore outstanding, int numOps) {
+			this.outstanding = outstanding;
+			this.numOps = numOps;
+			logging = Boolean.getBoolean("progress");
+		}
+    	
+		public void handle(boolean success, Object ctx){
+            outstanding.release();
+            
+            if (!success){
+                ConcurrencyUtils.put(doneSignalQueue, false);
+                return;
+            }
+            
+            totalLatency.addAndGet(System.currentTimeMillis() - (Long)ctx);
+            int numDoneInt = numDone.incrementAndGet();
+            
+            if (logging && numDoneInt % 10000 == 0){
+                logger.info("Finished " + numDoneInt + " ops");
+            }
+            
+            if (numOps == numDoneInt){
+                ConcurrencyUtils.put(doneSignalQueue, true);
+            }   
+        }
+	}
+	
+	public void runPhase(String phase, int numOps) throws Exception{
+        long startTime = System.currentTimeMillis();
+        
+        doOps(numOps);
+        
+        if (!doneSignalQueue.take()){
+        	logger.error("One or more operations failed in phase: " + phase);
+        	throw new RuntimeException();
+        }else{
+            logger.info("Phase: " + phase + " Avg latency : " + totalLatency.get() / numOps + ", tput = " + (numOps * 1000/ (System.currentTimeMillis() - startTime)));
+        }
+	}
+	
+	
+	
+	
+
+	public void run() throws Exception{
+        
+        int numWarmup = Integer.getInteger("nWarmup", 50000);
+    	runPhase("warmup", numWarmup);
+        
+    	logger.info("Sleeping for 10 seconds");
+    	Thread.sleep(10000);
+        //reset latency
+        totalLatency.set(0);
+        
+        int numOps = Integer.getInteger("nOps", 400000);
+        runPhase("real", numOps);
+
+        tearDown();
+	}
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.benchmark;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.log4j.Logger;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+public class BookieBenchmark extends AbstractBenchmark{
+    
+    static final Logger logger = Logger.getLogger(BookkeeperBenchmark.class);
+    
+    BookieClient bkc;
+    InetSocketAddress addr;
+    ClientSocketChannelFactory channelFactory;
+    OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+    
+    
+    public BookieBenchmark(String bookieHostPort)  throws Exception{
+    	channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+    	bkc = new BookieClient(channelFactory, executor);
+    	String[] hostPort = bookieHostPort.split(":");
+        addr = new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]));
+        
+    }
+    
+    
+    @Override
+	void doOps(final int numOps) throws Exception{
+    	int numOutstanding = Integer.getInteger("nPars",1000);
+        final Semaphore outstanding = new Semaphore(numOutstanding);
+        
+        
+        WriteCallback callback = new WriteCallback() {
+        	AbstractCallback handler = new AbstractCallback(outstanding, numOps);
+        	
+        	@Override
+            public void writeComplete(int rc, long ledgerId, long entryId,
+            		InetSocketAddress addr, Object ctx) {
+                handler.handle(rc == BKException.Code.OK, ctx);   
+            }
+        };
+        
+    	byte[] passwd = new byte[20];
+    	int size = Integer.getInteger("size", 1024);
+        byte[] data = new byte[size];
+    	
+    	for (int i=0; i<numOps; i++){
+        	outstanding.acquire();
+            
+        	ByteBuffer buffer = ByteBuffer.allocate(44);
+        	long ledgerId = 1000;
+        	buffer.putLong(ledgerId);
+            buffer.putLong(i);
+        	buffer.putLong(0);
+        	buffer.put(passwd);
+            buffer.rewind();
+            ChannelBuffer toSend = ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer.slice()), ChannelBuffers.wrappedBuffer(data));
+            bkc.addEntry(addr, ledgerId, passwd, i, toSend, callback, System.currentTimeMillis());            
+        }
+
+    }
+    
+    @Override
+	public void tearDown(){        
+        bkc.close();
+        channelFactory.releaseExternalResources();
+        executor.shutdown();
+    }
+    
+    
+    public static void main(String[] args) throws Exception{
+        BookieBenchmark benchmark = new BookieBenchmark(args[0]);
+        benchmark.run();
+    }
+
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.benchmark;
+
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.log4j.Logger;
+
+public class BookkeeperBenchmark extends AbstractBenchmark{
+    
+    static final Logger logger = Logger.getLogger(BookkeeperBenchmark.class);
+    
+    BookKeeper bk;
+    LedgerHandle[] lh;
+    
+    public BookkeeperBenchmark(String zkHostPort) throws Exception{
+        bk = new BookKeeper(zkHostPort);
+        int numLedgers = Integer.getInteger("nLedgers",5);
+        lh = new LedgerHandle[numLedgers];
+        int quorumSize = Integer.getInteger("quorum", 2);
+        int ensembleSize = Integer.getInteger("ensemble", 4);
+        DigestType digestType = DigestType.valueOf(System.getProperty("digestType", "CRC32"));
+        for (int i=0; i< numLedgers; i++){
+            lh[i] = bk.createLedger(ensembleSize, quorumSize, digestType, "blah".getBytes());
+        }
+        
+    }
+    
+    
+    @Override
+    void doOps(final int numOps) throws Exception {
+    	int size = Integer.getInteger("size", 1024);
+    	byte[] msg = new byte[size];
+        
+    	int numOutstanding = Integer.getInteger("nPars",1000);
+        final Semaphore outstanding = new Semaphore(numOutstanding);
+
+        AddCallback callback = new AddCallback() {
+            	AbstractCallback handler = new AbstractCallback(outstanding, numOps);
+            	
+
+        	@Override
+            public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                handler.handle(rc == BKException.Code.OK, ctx);   
+        	}
+        				
+		};
+
+        
+        
+        Random rand = new Random();
+    	
+    	for (int i=0; i<numOps; i++){
+            outstanding.acquire();
+            lh[rand.nextInt(lh.length)].asyncAddEntry(msg, callback, System.currentTimeMillis());
+        }
+        
+    	
+    }
+ 
+    @Override
+	public void tearDown() throws Exception{
+        bk.halt();
+    }
+    
+    
+    public static void main(String[] args) throws Exception{
+        BookkeeperBenchmark benchmark = new BookkeeperBenchmark(args[0]);
+        benchmark.run();
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.benchmark;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Log4JLoggerFactory;
+
+@ChannelPipelineCoverage("all")
+public class FakeBookie extends SimpleChannelHandler implements
+		ChannelPipelineFactory {
+	static final Logger logger = Logger.getLogger(FakeBookie.class);
+	ServerSocketChannelFactory serverChannelFactory = new NioServerSocketChannelFactory(
+			Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+
+	public FakeBookie(int port) {
+		InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
+		ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
+
+		bootstrap.setPipelineFactory(this);
+		bootstrap.setOption("child.tcpNoDelay", true);
+		bootstrap.setOption("child.keepAlive", true);
+		bootstrap.setOption("reuseAddress", true);
+
+		logger.info("Going into receive loop");
+		// Bind and start to accept incoming connections.
+		bootstrap.bind(new InetSocketAddress(port));
+	}
+
+	@Override
+	public ChannelPipeline getPipeline() throws Exception {
+		ChannelPipeline pipeline = Channels.pipeline();
+		pipeline.addLast("lengthbaseddecoder",
+				new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 4, 0, 4));
+		pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+		pipeline.addLast("main", this);
+		return pipeline;
+	}
+
+	@Override
+	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+			throws Exception {
+		if (!(e.getMessage() instanceof ChannelBuffer)) {
+			ctx.sendUpstream(e);
+			return;
+		}
+
+		ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+
+		int type = buffer.readInt();
+		buffer.readerIndex(24);
+		long ledgerId = buffer.readLong();
+		long entryId = buffer.readLong();
+
+		ChannelBuffer outBuf = ctx.getChannel().getConfig().getBufferFactory()
+				.getBuffer(24);
+		outBuf.writeInt(type);
+		outBuf.writeInt(0); // rc
+		outBuf.writeLong(ledgerId);
+		outBuf.writeLong(entryId);
+		e.getChannel().write(outBuf);
+
+	}
+
+	
+	public static void main(String args[]){
+		new FakeBookie(Integer.parseInt(args[0]));
+	}
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.protobuf.ByteString;
+
+public class ByteStringInterner {
+    // TODO: how to release references when strings are no longer used. weak
+    // references?
+
+    private static ConcurrentMap<ByteString, ByteString> map = new ConcurrentHashMap<ByteString, ByteString>();
+
+    public static ByteString intern(ByteString in) {
+        ByteString presentValueInMap = map.get(in);
+        if (presentValueInMap != null) {
+            return presentValueInMap;
+        }
+
+        presentValueInMap = map.putIfAbsent(in, in);
+        if (presentValueInMap != null) {
+            return presentValueInMap;
+        }
+
+        return in;
+
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.conf.AbstractConfiguration;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class ServerConfiguration extends AbstractConfiguration {
+    protected final static String REGION = "region";
+    protected final static String MAX_MESSAGE_SIZE = "max_message_size";
+    protected final static String READAHEAD_COUNT = "readahead_count";
+    protected final static String READAHEAD_SIZE = "readahead_size";
+    protected final static String CACHE_SIZE = "cache_size";
+    protected final static String SCAN_BACKOFF_MSEC = "scan_backoff_ms";
+    protected final static String SERVER_PORT = "server_port";
+    protected final static String SSL_SERVER_PORT = "ssl_server_port";
+    protected final static String ZK_PREFIX = "zk_prefix";
+    protected final static String ZK_HOST = "zk_host";
+    protected final static String ZK_TIMEOUT = "zk_timeout";
+    protected final static String READAHEAD_ENABLED = "readhead_enabled";
+    protected final static String STANDALONE = "standalone";
+    protected final static String REGIONS = "regions";
+    protected final static String CERT_NAME = "cert_name";
+    protected final static String CERT_PATH = "cert_path";
+    protected final static String PASSWORD = "password";
+    protected final static String SSL_ENABLED = "ssl_enabled";
+    protected final static String CONSUME_INTERVAL = "consume_interval";
+    protected final static String RETENTION_SECS = "retention_secs";
+    protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
+    protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
+
+    // these are the derived attributes
+    protected ByteString myRegionByteString = null;
+    protected HedwigSocketAddress myServerAddress = null;
+    protected List<String> regionList = null;
+
+    // Although this method is not currently used, currently maintaining it like
+    // this so that we can support on-the-fly changes in configuration
+    protected void refreshDerivedAttributes() {
+        refreshMyRegionByteString();
+        refreshMyServerAddress();
+        refreshRegionList();
+    }
+
+    @Override
+    public void loadConf(URL confURL) throws ConfigurationException {
+        super.loadConf(confURL);
+        refreshDerivedAttributes();
+    }
+
+    public int getMaximumMessageSize() {
+        return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
+    }
+
+    public String getMyRegion() {
+        return conf.getString(REGION, "standalone");
+    }
+
+    protected void refreshMyRegionByteString() {
+        myRegionByteString = ByteString.copyFromUtf8(getMyRegion());
+    }
+
+    protected void refreshMyServerAddress() {
+        try {
+            // Use the raw IP address as the hostname
+            myServerAddress = new HedwigSocketAddress(InetAddress.getLocalHost().getHostAddress(), getServerPort(),
+                    getSSLServerPort());
+        } catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // The expected format for the regions parameter is Hostname:Port:SSLPort
+    // with spaces in between each of the regions.
+    protected void refreshRegionList() {
+        String regions = conf.getString(REGIONS, "");
+        if (regions.isEmpty()) {
+            regionList = new LinkedList<String>();
+        } else {
+            regionList = Arrays.asList(regions.split(" "));
+        }
+    }
+
+    public ByteString getMyRegionByteString() {
+        if (myRegionByteString == null) {
+            refreshMyRegionByteString();
+        }
+        return myRegionByteString;
+    }
+
+    public int getReadAheadCount() {
+        return conf.getInt(READAHEAD_COUNT, 10);
+    }
+
+    public long getReadAheadSizeBytes() {
+        return conf.getLong(READAHEAD_SIZE, 4 * 1024 * 1024); // 4M
+    }
+
+    public long getMaximumCacheSize() {
+        // 2G or half of the maximum amount of memory the JVM uses
+        return conf.getLong(CACHE_SIZE, Math.min(2 * 1024L * 1024L * 1024L, Runtime.getRuntime().maxMemory() / 2));
+    }
+
+    // After a scan of a log fails, how long before we retry (in msec)
+    public long getScanBackoffPeriodMs() {
+        return conf.getLong(SCAN_BACKOFF_MSEC, 1000);
+    }
+
+    public int getServerPort() {
+        return conf.getInt(SERVER_PORT, 4080);
+    }
+
+    public int getSSLServerPort() {
+        return conf.getInt(SSL_SERVER_PORT, 9876);
+    }
+
+    public String getZkPrefix() {
+        return conf.getString(ZK_PREFIX, "/hedwig");
+    }
+
+    public StringBuilder getZkRegionPrefix(StringBuilder sb) {
+        return sb.append(getZkPrefix()).append("/").append(getMyRegion());
+    }
+
+    public StringBuilder getZkTopicsPrefix(StringBuilder sb) {
+        return getZkRegionPrefix(sb).append("/topics");
+    }
+
+    public StringBuilder getZkTopicPath(StringBuilder sb, ByteString topic) {
+        return getZkTopicsPrefix(sb).append("/").append(topic.toStringUtf8());
+    }
+
+    public StringBuilder getZkHostsPrefix(StringBuilder sb) {
+        return getZkRegionPrefix(sb).append("/hosts");
+    }
+
+    public HedwigSocketAddress getServerAddr() {
+        if (myServerAddress == null) {
+            refreshMyServerAddress();
+        }
+        return myServerAddress;
+    }
+
+    public String getZkHost() {
+        return conf.getString(ZK_HOST, "localhost");
+    }
+
+    public int getZkTimeout() {
+        return conf.getInt(ZK_TIMEOUT, 2000);
+    }
+
+    public boolean getReadAheadEnabled() {
+        return conf.getBoolean(READAHEAD_ENABLED, true);
+    }
+
+    public boolean isStandalone() {
+        return conf.getBoolean(STANDALONE, false);
+    }
+
+    public List<String> getRegions() {
+        if (regionList == null) {
+            refreshRegionList();
+        }
+        return regionList;
+    }
+
+    // This is the name of the SSL certificate if available as a resource.
+    public String getCertName() {
+        return conf.getString(CERT_NAME, "");
+    }
+
+    // This is the path to the SSL certificate if it is available as a file.
+    public String getCertPath() {
+        return conf.getString(CERT_PATH, "");
+    }
+
+    // This method return the SSL certificate as an InputStream based on if it
+    // is configured to be available as a resource or as a file. If nothing is
+    // configured correctly, then a ConfigurationException will be thrown as
+    // we do not know how to obtain the SSL certificate stream.
+    public InputStream getCertStream() throws FileNotFoundException, ConfigurationException {
+        String certName = getCertName();
+        String certPath = getCertPath();
+        if (certName != null && !certName.isEmpty()) {
+            return getClass().getResourceAsStream(certName);
+        } else if (certPath != null && !certPath.isEmpty()) {
+            return new FileInputStream(certPath);
+        } else
+            throw new ConfigurationException("SSL Certificate configuration does not have resource name or path set!");
+    }
+
+    public String getPassword() {
+        return conf.getString(PASSWORD, "");
+    }
+
+    public boolean isSSLEnabled() {
+        return conf.getBoolean(SSL_ENABLED, false);
+    }
+
+    public int getConsumeInterval() {
+        return conf.getInt(CONSUME_INTERVAL, 50);
+    }
+
+    public int getRetentionSecs() {
+        return conf.getInt(RETENTION_SECS, 0);
+    }
+
+    public boolean isInterRegionSSLEnabled() {
+        return conf.getBoolean(INTER_REGION_SSL_ENABLED, false);
+    }
+
+    // This parameter is used to determine how often we run the
+    // SubscriptionManager's Messages Consumed timer task thread (in
+    // milliseconds).
+    public int getMessagesConsumedThreadRunInterval() {
+        return conf.getInt(MESSAGES_CONSUMED_THREAD_RUN_INTERVAL, 60000);
+    }
+
+    /*
+     * Is this a valid configuration that we can run with? This code might grow
+     * over time.
+     */
+    public void validate() throws ConfigurationException {
+        if (!getZkPrefix().startsWith("/")) {
+            throw new ConfigurationException(ZK_PREFIX + " must start with a /");
+        }
+        // Validate that if Regions exist and inter-region communication is SSL
+        // enabled, that the Regions correspond to valid HedwigSocketAddresses,
+        // namely that SSL ports are present.
+        if (isInterRegionSSLEnabled() && getRegions().size() > 0) {
+            for (String hubString : getRegions()) {
+                HedwigSocketAddress hub = new HedwigSocketAddress(hubString);
+                if (hub.getSSLSocketAddress() == null)
+                    throw new ConfigurationException("Region defined does not have required SSL port: " + hubString);
+            }
+        }
+
+        // add other checks here
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+import org.apache.log4j.Logger;
+
+public class TerminateJVMExceptionHandler implements Thread.UncaughtExceptionHandler {
+    static Logger logger = Logger.getLogger(TerminateJVMExceptionHandler.class);
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        logger.fatal("Uncaught exception in thread " + t.getName(), e);
+        System.exit(1);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+
+public class TopicOpQueuer {
+    /**
+     * Map from topic to the queue of operations for that topic.
+     */
+    protected HashMap<ByteString, Queue<Runnable>> topic2ops = new HashMap<ByteString, Queue<Runnable>>();
+
+    protected final ScheduledExecutorService scheduler;
+
+    public TopicOpQueuer(ScheduledExecutorService scheduler) {
+        this.scheduler = scheduler;
+    }
+
+    public interface Op extends Runnable {
+    }
+
+    public abstract class AsynchronousOp<T> implements Op {
+        final public ByteString topic;
+        final public Callback<T> cb;
+        final public Object ctx;
+
+        public AsynchronousOp(final ByteString topic, final Callback<T> cb, Object ctx) {
+            this.topic = topic;
+            this.cb = new Callback<T>() {
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    cb.operationFailed(ctx, exception);
+                    popAndRunNext(topic);
+                }
+
+                @Override
+                public void operationFinished(Object ctx, T resultOfOperation) {
+                    cb.operationFinished(ctx, resultOfOperation);
+                    popAndRunNext(topic);
+                }
+            };
+            this.ctx = ctx;
+        }
+    }
+
+    public abstract class SynchronousOp implements Op {
+        final public ByteString topic;
+
+        public SynchronousOp(ByteString topic) {
+            this.topic = topic;
+        }
+
+        @Override
+        public final void run() {
+            runInternal();
+            popAndRunNext(topic);
+        }
+
+        protected abstract void runInternal();
+
+    }
+
+    protected synchronized void popAndRunNext(ByteString topic) {
+        Queue<Runnable> ops = topic2ops.get(topic);
+        if (!ops.isEmpty())
+            ops.remove();
+        if (!ops.isEmpty())
+            scheduler.submit(ops.peek());
+    }
+
+    public void pushAndMaybeRun(ByteString topic, Op op) {
+        int size;
+        synchronized (this) {
+            Queue<Runnable> ops = topic2ops.get(topic);
+            if (ops == null) {
+                ops = new LinkedList<Runnable>();
+                topic2ops.put(topic, ops);
+            }
+            ops.add(op);
+            size = ops.size();
+        }
+        if (size == 1)
+            op.run();
+    }
+
+    public Runnable peek(ByteString topic) {
+        return topic2ops.get(topic).peek();
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+public class UnexpectedError extends Error {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public UnexpectedError(String msg) {
+        super(msg);
+    }
+
+    public UnexpectedError(Throwable cause) {
+        super(cause);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.server.common.UnexpectedError;
+
+public class ChannelEndPoint implements DeliveryEndPoint, ChannelFutureListener {
+
+    Channel channel;
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    Map<ChannelFuture, DeliveryCallback> callbacks = new HashMap<ChannelFuture, DeliveryCallback>();
+
+    public ChannelEndPoint(Channel channel) {
+        this.channel = channel;
+    }
+
+    public void close() {
+        channel.close();
+    }
+
+    public void send(PubSubResponse response, DeliveryCallback callback) {
+        ChannelFuture future = channel.write(response);
+        callbacks.put(future, callback);
+        future.addListener(this);
+    }
+
+    public void operationComplete(ChannelFuture future) throws Exception {
+        DeliveryCallback callback = callbacks.get(future);
+        callbacks.remove(future);
+
+        if (callback == null) {
+            throw new UnexpectedError("Could not locate callback for channel future");
+        }
+
+        if (future.isSuccess()) {
+            callback.sendingFinished();
+        } else {
+            // treat all channel errors as permanent
+            callback.permanentErrorOnSend();
+        }
+
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof ChannelEndPoint) {
+            ChannelEndPoint channelEndPoint = (ChannelEndPoint) obj;
+            return channel.equals(channelEndPoint.channel);
+        } else {
+            return false;
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+public interface DeliveryCallback {
+
+    public void sendingFinished();
+
+    public void transientErrorOnSend();
+
+    public void permanentErrorOnSend();
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+
+public interface DeliveryEndPoint {
+
+    public void send(PubSubResponse response, DeliveryCallback callback);
+
+    public void close();
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.server.subscriptions.MessageFilter;
+
+public interface DeliveryManager {
+    public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
+            DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber);
+
+    public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
+}



Mime
View raw message