Added: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash Thu Aug 19 21:25:13 2010
@@ -0,0 +1,734 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is a collection of script functions. To use, just run this file
+# with the function you want to execute as the first argument and arguments to
+# the function following that.
+#
+# For instance, to install the prereq jars:
+#
+# ./hw.bash install-zk
+#
+# Or to run BookKeeper:
+#
+# ./hw.bash bk 3181 /tmp/bk/{journal,ledgers}
+
+# Note: unbuffer can cause all sorts of funky problems, especially when dealing
+# with high volumes of output from multiple sources! Problems aren't just
+# about causing your terminal to get garbled; they may be as severe as killing
+# this script itself.
+
+set -o errexit # -o nounset
+
+dark_blue="\e[0;34m"; bright_blue="\e[1;34m"
+dark_green="\e[0;32m"; bright_green="\e[1;32m"
+dark_cyan="\e[0;36m"; bright_cyan="\e[1;36m"
+dark_red="\e[0;31m"; bright_red="\e[1;31m"
+dark_magenta="\e[0;35m"; bright_magenta="\e[1;35m"
+brown="\e[0;33m"; yellow="\e[1;33m"
+black="\e[0;30m"; dark_gray="\e[1;30m";
+bold_white="\e[1;37m" white="\e[0;37m"
+normal_color="\e[0m"
+
+# Default server ports used
+PROFILER_PORT=9874
+SERVER_PORT=9875
+SSL_SERVER_PORT=9876
+ZOOKEEPER_PORT=9877
+BOOKKEEPER_PORT=9878
+
+: ${script:=$0}
+serverdir="$(readlink -f "$(dirname "$script")/../server/")" || serverdir=
+cmd="$(basename "$script")"
+jarbase=server-1.0-SNAPSHOT-jar-with-dependencies.jar
+if [[ -f "$(dirname "$script")/$jarbase" ]]
+then jar="$(readlink -f "$(dirname "$script")/$jarbase")"
+else jar="$serverdir/target/$jarbase"
+fi
+already_pushed=false
+: ${push_jar:=false} ${push_script:=true} ${use_yjp=false} ${unbuffer:=false}
+: ${loglevel:=} ${dbg:=false} ${attach:=false} ${logconf:=} ${asserts:=false}
+if $unbuffer
+then unbuffercmd='unbuffer -p'
+else unbuffercmd=
+fi
+if $dbg
+then set -x
+fi
+if $asserts
+then JAVAFLAGS=-ea
+fi
+
+#
+# General utilities.
+#
+
+trace() { echo "$@" ; eval "$@" ; }
+
+# Add the given prefix (first arg) to all subsequent words (latter args).
+prefix-words() {
+ local prefix="$1"
+ shift
+ for i in "$@"
+ do echo "$prefix" "$i"
+ done
+}
+
+quote(){
+ "$(dirname "$script")/quote" "$@"
+}
+
+# Retrieve the substring of a string with a given prefix and suffix.
+# For example, substr "Hello World!" "ell" "rld" returns "o Wo".
+# Everything before the prefix and after the suffix (inclusive)
+# is stripped off and the remaining substring is returned.
+substr() {
+ if [ $# == 3 ]
+ then
+ nopref="${1#${1%${2}*}${2}}"
+ echo "${nopref%${3}*}"
+ else
+ echo "Usage: substr string prefix suffix"
+ fi
+}
+
+# Must test connectivity via ssh because we may be firewalled.
+wait-connect() {
+ local prof="${1%:*}" port="${1#*:}"
+ local host="$(ssh-hosts $prof)"
+ while ! echo | ssh "$prof" nc -w 1 localhost "$port"
+ do sleep 1
+ done
+}
+
+#
+# Java runners.
+#
+
+# Launch with yourkit profiler.
+java-yjp() {
+ if $use_yjp
+ then LD_LIBRARY_PATH="${YJP:-$HOME/yjp-8.0.15}/bin/linux-x86-32/" \
+ java -agentlib:yjpagent $JAVAFLAGS "$@"
+ else java $JAVAFLAGS "$@"
+ fi
+}
+
+with-attach() {
+ if $attach
+ then JAVAFLAGS="-agentlib:jdwp=transport=dt_socket,server=y,address=$PROFILER_PORT $JAVAFLAGS" "$@"
+ else "$@"
+ fi
+}
+
+with-logging() {
+ if [[ $loglevel ]] ; then
+ logconf="
+log4j.rootLogger=$loglevel, A1
+log4j.logger.org.apache.zookeeper = ERROR
+log4j.logger.org.apache.bookkeeper.client.QuorumOpMonitor = ERROR
+log4j.logger.org.apache.bookkeeper.proto.BookieClient = ERROR
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d %-4r [%t] %-5p %c %x - %m%n
+"
+ fi
+ if [[ "$logconf" ]] ; then
+ mkdir -p /tmp/$USER/logging/
+ local logdir="$(mktemp -d -p /tmp/$USER/logging/)"
+ echo "$logconf" > "$logdir/log4j.properties"
+ CLASSPATH="$logdir:$CLASSPATH" "$@"
+ else
+ "$@"
+ fi
+}
+
+try-ulimit() { ulimit -n $((1024**2)) || true ; }
+j() { CLASSPATH="$jar" with-logging with-attach java-yjp "$@" ; }
+bk() { j org.apache.bookkeeper.proto.BookieServer "$@" ; }
+#bk() { j com.yahoo.pubsub.client.benchmark.FakeBookie "$@" ; }
+zk() { j org.apache.zookeeper.server.quorum.QuorumPeerMain "$@" ; }
+zkc() { j org.apache.zookeeper.ZooKeeperMain -server "$@" ; }
+hw() { try-ulimit; j org.apache.hedwig.server.netty.PubSubServer "$@" ; }
+hwc() { try-ulimit; hwc-soft "$@" ; }
+hwc-soft() { j org.apache.hedwig.client.benchmark.HedwigBenchmark "$@" ; }
+
+ssh-zkc() {
+ local server="$1" port="$2"
+ ssh "$server" "hedwig/hw.bash zkc localhost:$port"
+}
+
+#
+# Setup
+#
+
+# Get/build the ZK dependencies that must be manually installed and place those
+# jars in the current directory.
+get-zk() {
+ local stagedir="$(pwd)" dstdir="$(pwd)" ver=3.2.0
+ local unpacked="$stagedir/zookeeper-$ver/"
+ local url="http://archive.apache.org/dist/hadoop/zookeeper/zookeeper-$ver/zookeeper-$ver.tar.gz"
+
+ if [[ ! -d "$unpacked" ]]
+ then
+ echo $url
+ wget -q -O - "$url" | tar xzf - -C "$stagedir"
+ fi
+ ant -q -buildfile "$unpacked/build.xml" compile-test
+ cp -r "$unpacked/src/java/test/"{config,data}/ "$unpacked/build/testclasses/"
+ jar cf zookeeper-test-$ver.jar -C "$(readlink -f "$unpacked/build/testclasses/")" .
+ cp "$unpacked/zookeeper-$ver.jar" .
+ cp "$unpacked/contrib/bookkeeper/zookeeper-$ver-bookkeeper.jar" bookkeeper-$ver.jar
+ jar cf zookeeper-test-$ver-sources.jar -C "$(readlink -f "$unpacked/src/java/test/")" .
+ jar cf zookeeper-$ver-sources.jar -C "$(readlink -f "$unpacked/src/java/main/")" .
+ jar cf bookkeeper-$ver-sources.jar -C "$(readlink -f "$unpacked/src/contrib/bookkeeper/src/java/")" .
+}
+
+get-bk() {
+ local svn="$serverdir/../Zookeeper/" svnver=3.3.0-SNAPSHOT
+ ant -q -buildfile "$svn/build.xml" compile-test
+ ant -q -buildfile "$svn/src/contrib/bookkeeper/build.xml"
+ jar cf bookkeeper-$svnver-sources.jar -C "$(readlink -f "$svn/src/contrib/bookkeeper/src/java/")" .
+ cp "$svn/build/contrib/bookkeeper/zookeeper-dev-bookkeeper.jar" bookkeeper-$svnver.jar
+}
+
+# Install the jars from the current directory, as obtained by get-zk.
+# For now, we will use the checked in ZK/BK jars in the server/lib directory.
+# When an official ZK/BK release for those changes is done, then we can
+# modify the get-zk function to get the latest code.
+install-zk-bk() {
+ for pkg in zookeeper zookeeper-test bookkeeper ; do
+ local grp="${pkg%-*}" ver=SNAPSHOT
+ for srcs in '' -sources
+ do trace mvn -q install:install-file -Dfile="$pkg-$ver$srcs.jar" \
+ -DgroupId=org.apache.$grp -DartifactId=$pkg -Dpackaging=jar \
+ -Dversion=$ver ${srcs:+-Dclassifier=sources}
+ done
+ done
+}
+
+setup-java() {
+ # wget 'http://cds.sun.com/is-bin/INTERSHOP.enfinity/WFS/CDS-CDS_Developer-Site/en_US/-/USD/VerifyItem-Start/jdk-7-ea-linux-i586.bin?BundledLineItemUUID=dXBIBe.m0.UAAAEiZUUKrYfz&OrderID=BnZIBe.mencAAAEiU0UKrYfz&ProductID=O29IBe.py.oAAAEhK1kP50GU&FileName=/jdk-7-ea-linux-i586.bin'
+ local jdk="$1"
+ parscp "$jdk" ^:/tmp/jdk6
+ parssh "
+ echo yes | /tmp/jdk6 > /tmp/java-install-log &&
+ if ! fgrep jdk1.6.0 ~/.bashrc > /dev/null
+ then echo 'export PATH=~/jdk1.6.0_14/bin/:\$PATH' >> ~/.bashrc
+ fi
+ "
+}
+
+setup-yjp() {
+ local pkg="$1"
+ parscp "$pkg" ^:/tmp/yjp.zip
+ parssh "
+ yes A | unzip -q /tmp/yjp.zip
+ if ! fgrep YJP= ~/.bashrc > /dev/null
+ then echo 'export YJP=~/yjp-8.0.15/' >> ~/.bashrc
+ fi
+ "
+}
+
+# Usage: setup-bk ZKSERVER ZKSERVER_PORT
+#
+# Create the /ledgers and /ledgers/available ZK nodes on the given ZK server.
+# The bookie servers will register themselves once they are up on ZK but they
+# need these nodes to exist first.
+setup-bk() {
+ local server="$1" port="$2"
+ shift 2
+ ssh-zkc "$server" "$port" << EOF || true
+create /ledgers 0
+create /ledgers/available 0
+EOF
+}
+
+# Get rid of duplicate files in a jar.
+strip-jar() {
+ local jar="${1:-$jar}" tmpdir=/tmp/$USER/jar
+ rm -rf "$tmpdir"
+ mkdir -p "$tmpdir"
+ (
+ cd "$tmpdir"
+ jar xf "$jar"
+ jar cf "$jar" .
+ )
+}
+
+# Inspect the current logging level.
+get-logging() {
+ local jar="${1:-$jar}" tmpdir=/tmp/$USER/jar
+ mkdir -p "$tmpdir"
+ (
+ cd "$tmpdir"
+ jar xf "$jar" log4j.properties
+ grep rootLogger= log4j.properties
+ )
+}
+
+# Adjust the log level but without modifying the original source tree or going
+# through the full rebuild process.
+set-logging() {
+ local level="$1" tmpdir=/tmp/$USER/jarlog
+ mkdir -p "$tmpdir"
+ (
+ cd "$tmpdir"
+ jar xf "$jar" log4j.properties
+ sed -i "s/\(rootLogger\)=[[:alpha:]]\+/\1=$level/" log4j.properties
+ jar uf "$jar" log4j.properties
+ )
+}
+
+#
+# General testbed tools.
+#
+
+hosts() {
+ if [[ ! "$hosts" ]]
+ then echo '$hosts not set' 1>&2 ; return 1
+ fi
+ echo $hosts | sed 's/[[:space:]]\+/\n/g' | sort -u
+}
+
+hostargs() { "$@" $hosts ; }
+tagssh() {
+ local prof="$1"
+ shift
+ {
+ ssh "$prof" "export use_yjp=$use_yjp loglevel=$loglevel asserts=$asserts logconf=$(quote "$logconf"); attach=$attach; $@" 2>&1 &&
+ echo -e "$bright_green[SUCCESS]$normal_color" ||
+ echo -e "$bright_red[FAILURE: $?]$normal_color" && false
+ } | $unbuffercmd sed "s/^/$prof: /g"
+}
+parssh() { hosts | xargs ${xargs--P0} -i^ "$script" tagssh ^ "$@" ; }
+parscp() { hosts | xargs ${xargs--P0} -i^ scp "$@" ; }
+
+# Resolves profile names to actual hostnames according to the user's .ssh/config.
+ssh-hosts() {
+ python -c "
+from __future__ import with_statement
+import sys, os
+hosts = {}
+with file(os.environ['HOME'] + '/.ssh/config') as f:
+ for line in f:
+ words = line.split('#')[0].split()
+ if len(words) == 2:
+ if words[0] == 'Host': key = words[1]
+ if words[0] == 'HostName': hosts[key] = words[1]
+for profile in sys.argv[1:]:
+ parts = profile.split(':', 1)
+ key, rest = parts[0], ('' if len(parts) == 1 else ':' + parts[1])
+ print hosts.get(key, key) + rest,
+" "$@"
+}
+
+#
+# Hedwig testbed tools.
+#
+
+# Usage: hosts='wilbur2 wilbur3 wilbur4' ./hw.bash push
+push() {
+ if ! $already_pushed ; then
+ if $push_jar || $push_script
+ then parssh 'mkdir -p hedwig/'
+ fi
+ if $push_jar && $push_script
+ then parscp -q "$script" "$jar" ^:hedwig/
+ elif $push_jar && ! $push_script
+ then parscp -q "$jar" ^:hedwig/
+ elif ! $push_jar && $push_script
+ then parscp -q "$script" ^:hedwig/
+ else :
+ fi
+ already_pushed=true
+ fi
+}
+
+# Kill processes and garbage-collect the log4j temp directories.
+dkill() { parssh 'pkill java; rm -rf /tmp/$USER/logging/' ; }
+
+# Pass in any argument to get a long listing.
+lstatus() {
+ for port in 2181 3181 4080 {9874..9878} ; do
+ if /usr/sbin/lsof -i tcp:$port 2>&1 | fgrep 'LISTEN' > /dev/null ; then
+ if (( $# > 0 )) ; then
+ echo "$port:"
+ ps u $(/usr/sbin/lsof -t -i tcp:$port) | cat
+ else
+ echo -n "$port "
+ fi
+ fi
+ done
+ (( $# > 0 )) || echo
+}
+
+# Pass in any argument to get a long listing.
+dstatus() {
+ push
+ xargs= parssh "hedwig/hw.bash lstatus $@"
+}
+
+# See if anything is running on each machine.
+tops() {
+ xargs= parssh '
+ echo
+ hostname
+ echo =====
+ top -b -n 1 | fgrep -A3 COMMAND
+ '
+}
+
+# Familiarize this machine with the given hosts' keys.
+warmup() {
+ hosts | xargs -P0 -i^ ssh -o StrictHostKeyChecking=no ^ hostname
+}
+
+# Add yourself to nopasswd sudoers for all hosts in $hosts.
+setup-sudo() {
+ local cmd='sudo su - -c "
+ if ! fgrep \"$USER ALL=(yahoo) NOPASSWD:ALL\" /etc/sudoers >& /dev/null
+ then echo -e \"$USER ALL=(ALL) ALL\n$USER ALL=(yahoo) NOPASSWD:ALL\" >> /etc/sudoers
+ fi"'
+ python -c '
+import getpass, os, pexpect, sys
+pw = getpass.getpass()
+for host in os.environ["hosts"].split():
+ c = pexpect.spawn(sys.argv[1], [host] + sys.argv[2:])
+ i = c.expect(["Password:", pexpect.EOF])
+ if i == 0: c.sendline(pw); c.read()
+ filtered = filter(lambda x: pw not in x, c.before.split("\n"))
+ sys.stdout.write("\n".join(filtered).lstrip("\n"))
+ ' ssh "$cmd"
+}
+
+setup-limits() {
+ ssh '
+ if ! sudo fgrep "* hard nofile $((1024**2))" /etc/security/limits.conf >& /dev/null
+ then sudo su - -c "echo \"* hard nofile $((1024**2))\" >> /etc/security/limits.conf"
+ fi
+ '
+}
+
+mkhomes() {
+ ssh 'sudo mkdir -p /home/yang && sudo chown -R yang /home/yang'
+}
+
+#
+# Distributed launchers.
+#
+
+# Given a hostname (*not* an ssh profile), figure out how to utilize the
+# machine's disks for BK.
+bk-journal() {
+ case "$@" in
+ * ) echo '"/d1/$USER/bk/journal"' ;;
+ esac
+}
+bk-ledger() {
+ case "$@" in
+ * ) echo '"/home/$USER/bk/ledger"' ;;
+ esac
+}
+bk-paths() {
+ echo "$(bk-journal "$@") $(bk-ledger "$@")"
+}
+
+# Start ZK on the first arg host and BKs on the second argument which is
+# a string list of hosts.
+start-zk-bks() {
+ hosts="$*" push jar
+ local zk="$1" abks=( $2 )
+ shift
+ tagssh $zk "
+ rm -rf /tmp/$USER/zk/
+ mkdir -p /tmp/$USER/zk/
+ cat > /tmp/$USER/zoo.cfg << EOF
+tickTime=2000
+dataDir=/tmp/$USER/zk/
+clientPort=$ZOOKEEPER_PORT
+EOF
+ hedwig/hw.bash eval 'set -x; zk /tmp/$USER/zoo.cfg'
+ " &
+ wait-connect $zk:$ZOOKEEPER_PORT
+ setup-bk $(ssh-hosts $zk) $ZOOKEEPER_PORT
+ hosts="$*" parssh "
+ rm -rf $(bk-paths "$1")
+ mkdir -p $(bk-paths "$1")
+ hedwig/hw.bash eval $(quote "set -x; bk $BOOKKEEPER_PORT $( ssh-hosts $zk ):$ZOOKEEPER_PORT $(bk-paths "$1")")
+ " &
+ for bk in "${abks[@]}" ; do
+ wait-connect $bk:$BOOKKEEPER_PORT
+ done
+}
+
+# The first argument is a string list of remote region default Hedwig servers
+# in a multi-region setup (if any). The second argument is a string list of
+# Hedwig hubs to start for this local region. The third argument is the single
+# ZooKeeper server host the hubs should connect to.
+start-hw() {
+ local allhws="$1" ahws=( $2 ) zk="$3"
+ if [[ $region ]]
+ then regionconf="region=$region"
+ fi
+ shift
+ hosts="$@" push
+ for hw in "${ahws[@]}" ; do
+ tagssh $hw "
+ mkdir -p /tmp/$USER/
+ cat > /tmp/$USER/hw.conf << EOF
+zk_host=$( ssh-hosts $zk ):$ZOOKEEPER_PORT
+regions=$( ssh-hosts $allhws )
+server_port=$SERVER_PORT
+ssl_server_port=$SSL_SERVER_PORT
+ssl_enabled=true
+inter_region_ssl_enabled=true
+cert_name=/server.p12
+password=eUySvp2phM2Wk
+$regionconf
+$extraconf
+EOF
+ hedwig/hw.bash eval 'set -x; hw /tmp/$USER/hw.conf'
+ " &
+ wait-connect $hw:$SERVER_PORT
+ wait-connect $hw:$SSL_SERVER_PORT
+ done
+}
+
+# The arguments are similar to those for start-hw() above.
+# The additional 4th argument is a string list of BookKeeper servers to start up.
+start-region() {
+ local allhws="$1" hws="$2" zk="$3" bks="$4"
+ shift
+ hosts="$*" push jar
+ start-zk-bks "$zk" "$bks"
+ start-hw "$allhws" "$hws" "$zk"
+}
+
+# Start multiple regions from a file configuration. The format that is expected
+# is to have each region on a separate line with the following format:
+# region=<Region name>, hub=<list of hub servers>, default=<single hub server>, zk=<single ZK server>, bk=<list of BK servers>
+# This will create all of the regions with an all-to-all topology. Each region
+# is connected to the default hub server of every other region.
+start-regions() {
+ local cfg="$1"
+ local regionPref="region=" hubPref=", hub=" defaultPref=", default=" zkPref=", zk=" bkPref=", bk="
+ while read line ; do
+ local otherhws=
+ while read subline ; do
+ local profile="$(substr "$subline" "$defaultPref" "$zkPref")"
+ if [[ $profile != "$(substr "$line" "$defaultPref" "$zkPref")" ]]
+ then otherhws="$otherhws $profile:$SERVER_PORT:$SSL_SERVER_PORT"
+ fi
+ done < "$cfg"
+ local curRegion="$(substr "$line" "$regionPref" "$hubPref")"
+ local curHub="$(substr "$line" "$hubPref" "$defaultPref")"
+ local curZk="$(substr "$line" "$zkPref" "$bkPref")"
+ local curBk="$(substr "$line" "$bkPref" "")"
+ hosts="$curHub $curZk $curBk" push jar
+ region="$curRegion" start-region "$otherhws" "$curHub" "$curZk" "$curBk" &
+ done < "$cfg"
+ wait
+}
+
+app() {
+ local hw="$1" # the server to connect to
+ push
+ parssh "
+ set -o errexit -x
+ mkdir -p /tmp/\$USER/
+ echo $(quote "default_server_host=$(ssh-hosts "$hw"):$SERVER_PORT:$SSL_SERVER_PORT") > /tmp/\$USER/hwc.conf
+ JAVAFLAGS=$(quote "$JAVAFLAGS") hedwig/hw.bash hwc /tmp/\$USER/hwc.conf
+ "
+}
+
+#
+# Experiments.
+#
+
+sub-exp() {
+ : ${start:=0}
+ push
+ for sync in ${syncs:-true false} ; do
+ for count in ${counts:-1000} ; do
+ for npar in ${npars:-1 10 20 30 40 50} ; do
+ if (( $npar <= $count )) ; then
+ for rep in {1..3} ; do
+ echo JAVAFLAGS="-Dmode=sub -Dsynchronous=$sync -Dcount=$count -Dnpar=$npar -Dstart=$start $JAVAFLAGS" app "$@"
+ JAVAFLAGS="-Dmode=sub -Dcount=$count -Dnpar=$npar -Dstart=$start $JAVAFLAGS" app "$@" >& ${outfile:-sub/sync-$sync-count-$count-npar-$npar-rep-$rep.out}
+ let start+=$count
+ done
+ fi
+ done
+ done
+ done
+}
+
+wait-sub() {
+ local zk="$1" topic="$2" subid="$3" region="$4"
+ while ! echo "ls /hedwig/$region/topics/$topic/subscribers/$subid" | \
+ ssh-zkc "$zk" $ZOOKEEPER_PORT 2>&1 | \
+ tee "${dbgfile:-/dev/null}" | \
+ grep '^\[\]$' > /dev/null
+ do sleep 1
+ done
+}
+
+# Note: this code waits for subscribers to show up in ZK, so when running this
+# multiple times on the same servers, adjust `start` to use a different topic
+# each time; otherwise, you'll immediately see the subscribers from last time,
+# thus causing the script to not wait for the current session's subscribers.
+# Alternatively, adjust recvid.
+#
+# Params:
+#
+# recvs: the list of local receivers
+# pubs: the list of publishers
+# hw: the local hedwig node
+# zk: the local zookeeper node (used to wait for receivers to join)
+#
+# Optional group of params:
+#
+# rrecv: the remote receiver
+# rhw: the remote hedwig node
+# rzk: the remote zookeeper node
+pub-exp() {
+ (( $# >= 4 ))
+ local recvs="$1" pubs="$2" hw="$3" zk="$4"
+ # Optional remote args.
+ if (( $# > 4 ))
+ then local remote=true rrecv="$5" rhw="$6" rzk="$7"
+ else local remote=false
+ fi
+ : ${start:=0} ${count:=100000} ${recvid:=0} ${dir="pub"}
+ hosts="$recvs $pubs $rrecv" push
+ # Convert to arrays.
+ local arecvs=( $recvs ) apubs=( $pubs )
+ mkdir -p "$dir"
+
+ #rregion="$(ssh $rhw cat /tmp/$USER/hw.conf)"
+ region=$hw rregion=$rhw
+
+ # Default to only using all recvs (rather than iterating over subsets).
+ for nrecvs in ${nrecvss:-${#arecvs[*]}} ; do
+ # Ditto for publishers.
+ for npubs in ${npubss:-${#apubs[*]}} ; do
+ # Default to only using a single value of npar.
+ for npar in ${npars:-100} ; do
+ # Default to repeating each trial thrice.
+ for rep in $(seq ${nreps:-3}) ; do
+
+ echo -n "nrecvs=$nrecvs npubs=$npubs npar=$npar rep=$rep"
+
+ local outbase="$dir/nrecvs-$nrecvs-npubs-$npubs-npar-$npar-rep-$rep"
+
+ # Skip if already done.
+ if [[ -f "$outbase"* ]]
+ then echo '...skipped' ; continue
+ else echo
+ fi
+
+ if $remote ; then
+ # Start remote receiver.
+ hosts=$rrecv JAVAFLAGS="-Dmode=recv -DsubId=recv-$recvid -Dstart=$start -Dcount=$((count/npubs*npubs)) $JAVAFLAGS" app "$rhw" >& "${outfile:-$outbase-rrecv.out}" &
+ fi
+
+ # Start all receivers.
+ for ((irecv = 0; irecv < nrecvs; irecv++)) ; do
+ hosts="${arecvs[$irecv]}" JAVAFLAGS="-Dmode=recv -DsubId=recv-$((recvid+irecv)) -Dstart=$start -Dcount=$((count/npubs*npubs)) $JAVAFLAGS" app "$hw" >& "${outfile:-$outbase-recv-$irecv.out}" &
+ done
+
+ # Wait till subscribed.
+ sleep 1
+ for ((irecv = 0; irecv < nrecvs; irecv++))
+ do wait-sub $zk topic-$start recv-$((recvid+irecv)) $region
+ done
+ if $remote ; then
+ # Wait till remote subscribed.
+ wait-sub $rzk topic-$start recv-$recvid $rregion
+ # Wait till cross-region subscribed, since default is async subs.
+ # This should only happen once.
+ wait-sub $zk topic-$start hub-$rregion $region
+ fi
+
+ # Launch all publishers.
+ for ((ipub = 0; ipub < npubs; ipub++)) ; do
+ hosts="${apubs[$ipub]}" JAVAFLAGS="-Dmode=pub -Dnpar=$npar -Dstart=$start -Dcount=$((count/npubs)) $JAVAFLAGS" app "$hw" >& "${outfile:-$outbase-pub-$ipub.out}" &
+ done
+
+ # Wait for everyone to terminate.
+ wait
+
+ # To avoid reusing the same subscriber ID.
+ let recvid+=$nrecvs
+
+ done
+ done
+ done
+ done
+}
+
+pub-exps() {
+ local pool="$1" hw="$2" zk="$3" rrecv="$4" rhw="$5" rzk="$6"
+ local apool=( $pool ) npool=${#apool[*]}
+ echo $apool $npool
+ : ${start:=0}
+ hosts="$pool $rrecv $rhw" push
+
+ quote start=$start npars="${npars:-20 40 60 80 100}" pub-exp ${apool[0]} ${apool[1]} "$hw" "$zk" $rrecv $rhw $rzk
+ start=$start npars="${npars:-20 40 60 80 100}" pub-exp ${apool[0]} ${apool[1]} "$hw" "$zk" $rrecv $rhw $rzk
+
+ pubs=${apool[0]}
+ for ((i = 1; i < npool; i++))
+ do recvs="${recvs:-} ${apool[$i]}"
+ done
+ quote start=$((start+1)) nrecvss="$( seq -s' ' $(( npool - 1 )) )" pub-exp "$recvs" "$pubs" "$hw" "$zk" $rrecv $rhw $rzk
+ start=$((start+1)) nrecvss="$( seq -s' ' $(( npool - 1 )) )" pub-exp "$recvs" "$pubs" "$hw" "$zk" $rrecv $rhw $rzk
+
+ recvs=${apool[0]}
+ for ((i = 1; i < npool; i++))
+ do pubs="${pubs:-} ${apool[$i]}"
+ done
+ quote start=$((start+2)) npubss="$( seq -s' ' $(( npool - 1 )) )" pub-exp "$recvs" "$pubs" "$hw" "$zk" $rrecv $rhw $rzk
+ start=$((start+2)) npubss="$( seq -s' ' $(( npool - 1 )) )" pub-exp "$recvs" "$pubs" "$hw" "$zk" $rrecv $rhw $rzk
+}
+
+#
+# Post-processing
+#
+
+# Consolidate to a directory.
+sub-agg() {
+ local dst="$1"
+ mkdir -p "$dst"
+ for s in 0 1 ; do
+ for i in sync$s/*.out
+ do cp "$i" "$dst/sync-$s-$(basename "$i")"
+ done
+ done
+}
+
+if [[ "$(type -t "$cmd")" == function ]]
+then "$cmd" "$@"
+else "$@"
+fi
+
+# vim: et sw=2 ts=2
Propchange: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hw.bash
------------------------------------------------------------------------------
svn:executable = *
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh Thu Aug 19 21:25:13 2010
@@ -0,0 +1,101 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# If this scripted is run out of /usr/bin or some other system bin directory
+# it should be linked to and not copied. Things like java jar files are found
+# relative to the canonical path of this script.
+#
+
+# Only follow symlinks if readlink supports it. Find the directory path
+# for where this script is being executed from.
+if readlink -f "$0" > /dev/null 2>&1
+then
+ HWBIN=`readlink -f "$0"`
+else
+ HWBIN="$0"
+fi
+HWBINDIR=`dirname "$HWBIN"`
+
+# We use HWCFGDIR if defined, otherwise we use /etc/hedwig
+# or the conf directory that is a sibling of this script's directory.
+# This is to find out where the Hedwig server config file resides.
+if [ "x$HWCFGDIR" = "x" ]
+then
+ if [ -d "/etc/hedwig" ]
+ then
+ HWCFGDIR="/etc/hedwig"
+ else
+ HWCFGDIR="$HWBINDIR/../conf"
+ fi
+fi
+
+# We use HWCFG as the name of the Hedwig server config file if defined,
+# otherwise use the default file name "hw_server.conf".
+if [ "x$HWCFG" = "x" ]
+then
+ HWCFG="hw_server.conf"
+fi
+HWCFG="$HWCFGDIR/$HWCFG"
+
+# If a config file is passed in directly when invoking the script,
+# use that instead.
+if [ "x$2" != "x" ]
+then
+ HWCFG="$HWCFGDIR/$2"
+fi
+
+# Find the Hedwig server jar and setup the CLASSPATH. We assume it to be
+# located in a standard place relative to the location of where this script
+# is executed from.
+HWJAR="$HWBINDIR/../server/target/server-1.0-SNAPSHOT-jar-with-dependencies.jar"
+CLASSPATH="$HWJAR:$CLASSPATH"
+
+# Store the Hedwig server's PID (java process) in the same $HWBINDIR.
+# This is used for us to stop the server later on via this same script.
+if [ -z $HWPIDFILE ]
+ then HWPIDFILE=$HWBINDIR/hedwig_server.pid
+fi
+
+case $1 in
+start)
+ echo "Starting Hedwig server ... "
+ java -cp "$CLASSPATH" org.apache.hedwig.server.netty.PubSubServer "$HWCFG" &
+ /bin/echo -n $! > "$HWPIDFILE"
+ echo STARTED
+ ;;
+stop)
+ echo "Stopping Hedwig server ... "
+ if [ ! -f "$HWPIDFILE" ]
+ then
+ echo "error: could not find file $HWPIDFILE"
+ exit 1
+ else
+ kill -9 $(cat "$HWPIDFILE")
+ rm "$HWPIDFILE"
+ echo STOPPED
+ fi
+ ;;
+restart)
+ shift
+ "$0" stop ${@}
+ sleep 3
+ "$0" start ${@}
+ ;;
+*)
+ echo "Usage: $0 {start|stop|restart}" >&2
+esac
Propchange: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/hwServer.sh
------------------------------------------------------------------------------
svn:executable = *
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote Thu Aug 19 21:25:13 2010
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+# vim:et:sw=2:ts=2
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+def quote(s):
+ return r'"%s"' % s.replace('\\', r'\\').replace('"', r'\"').replace('$', r'\$')
+
+print ' '.join( map( quote, sys.argv[1:] ) )
Propchange: hadoop/zookeeper/trunk/src/contrib/hedwig/scripts/quote
------------------------------------------------------------------------------
svn:executable = *
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/lib/README
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/lib/README?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/lib/README (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/lib/README Thu Aug 19 21:25:13 2010
@@ -0,0 +1,4 @@
+The Zookeeper and BookKeeper jars included in this server/lib directory were
+created off the zookeeper Apache trunk at revision 947756. They have new
+features that haven't been put into a release version yet so just putting it
+here temporarily.
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/pom.xml?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/pom.xml (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/pom.xml Thu Aug 19 21:25:13 2010
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hedwig</groupId>
+ <artifactId>hedwig</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <properties>
+ <mainclass>org.apache.hedwig.server.netty.PubSubServer</mainclass>
+ </properties>
+ <groupId>org.apache.hedwig</groupId>
+ <artifactId>server</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-SNAPSHOT</version>
+ <name>server</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hedwig</groupId>
+ <artifactId>client</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.4.2.0</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper-test</artifactId>
+ <version>3.4.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper</artifactId>
+ <version>3.4.0</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass>${mainclass}</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+<!--
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>classworlds:classworlds</exclude>
+ <exclude>junit:junit</exclude>
+ <exclude>jmock:jmock</exclude>
+ <exclude>xml-apis:xml-apis</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ -->
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>removebuilddir</id>
+ <phase>clean</phase>
+ <configuration>
+ <tasks>
+ <delete dir="build" />
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>createbuilddir</id>
+ <phase>generate-test-resources</phase>
+ <configuration>
+ <tasks>
+ <mkdir dir="build" />
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.benchmark;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+public abstract class AbstractBenchmark {
+
+ static final Logger logger = Logger.getLogger(AbstractBenchmark.class);
+
+ AtomicLong totalLatency = new AtomicLong();
+ LinkedBlockingQueue<Boolean> doneSignalQueue = new LinkedBlockingQueue<Boolean>();
+
+ abstract void doOps(int numOps) throws Exception;
+ abstract void tearDown() throws Exception;
+
+ protected class AbstractCallback{
+ AtomicInteger numDone = new AtomicInteger(0);
+ Semaphore outstanding;
+ int numOps;
+ boolean logging;
+
+ public AbstractCallback(Semaphore outstanding, int numOps) {
+ this.outstanding = outstanding;
+ this.numOps = numOps;
+ logging = Boolean.getBoolean("progress");
+ }
+
+ public void handle(boolean success, Object ctx){
+ outstanding.release();
+
+ if (!success){
+ ConcurrencyUtils.put(doneSignalQueue, false);
+ return;
+ }
+
+ totalLatency.addAndGet(System.currentTimeMillis() - (Long)ctx);
+ int numDoneInt = numDone.incrementAndGet();
+
+ if (logging && numDoneInt % 10000 == 0){
+ logger.info("Finished " + numDoneInt + " ops");
+ }
+
+ if (numOps == numDoneInt){
+ ConcurrencyUtils.put(doneSignalQueue, true);
+ }
+ }
+ }
+
+ public void runPhase(String phase, int numOps) throws Exception{
+ long startTime = System.currentTimeMillis();
+
+ doOps(numOps);
+
+ if (!doneSignalQueue.take()){
+ logger.error("One or more operations failed in phase: " + phase);
+ throw new RuntimeException();
+ }else{
+ logger.info("Phase: " + phase + " Avg latency : " + totalLatency.get() / numOps + ", tput = " + (numOps * 1000/ (System.currentTimeMillis() - startTime)));
+ }
+ }
+
+
+
+
+
+ public void run() throws Exception{
+
+ int numWarmup = Integer.getInteger("nWarmup", 50000);
+ runPhase("warmup", numWarmup);
+
+ logger.info("Sleeping for 10 seconds");
+ Thread.sleep(10000);
+ //reset latency
+ totalLatency.set(0);
+
+ int numOps = Integer.getInteger("nOps", 400000);
+ runPhase("real", numOps);
+
+ tearDown();
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.benchmark;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.log4j.Logger;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+public class BookieBenchmark extends AbstractBenchmark{
+
+ static final Logger logger = Logger.getLogger(BookkeeperBenchmark.class);
+
+ BookieClient bkc;
+ InetSocketAddress addr;
+ ClientSocketChannelFactory channelFactory;
+ OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+
+
+ public BookieBenchmark(String bookieHostPort) throws Exception{
+ channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ bkc = new BookieClient(channelFactory, executor);
+ String[] hostPort = bookieHostPort.split(":");
+ addr = new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]));
+
+ }
+
+
+ @Override
+ void doOps(final int numOps) throws Exception{
+ int numOutstanding = Integer.getInteger("nPars",1000);
+ final Semaphore outstanding = new Semaphore(numOutstanding);
+
+
+ WriteCallback callback = new WriteCallback() {
+ AbstractCallback handler = new AbstractCallback(outstanding, numOps);
+
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
+ InetSocketAddress addr, Object ctx) {
+ handler.handle(rc == BKException.Code.OK, ctx);
+ }
+ };
+
+ byte[] passwd = new byte[20];
+ int size = Integer.getInteger("size", 1024);
+ byte[] data = new byte[size];
+
+ for (int i=0; i<numOps; i++){
+ outstanding.acquire();
+
+ ByteBuffer buffer = ByteBuffer.allocate(44);
+ long ledgerId = 1000;
+ buffer.putLong(ledgerId);
+ buffer.putLong(i);
+ buffer.putLong(0);
+ buffer.put(passwd);
+ buffer.rewind();
+ ChannelBuffer toSend = ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer.slice()), ChannelBuffers.wrappedBuffer(data));
+ bkc.addEntry(addr, ledgerId, passwd, i, toSend, callback, System.currentTimeMillis());
+ }
+
+ }
+
+ @Override
+ public void tearDown(){
+ bkc.close();
+ channelFactory.releaseExternalResources();
+ executor.shutdown();
+ }
+
+
+ public static void main(String[] args) throws Exception{
+ BookieBenchmark benchmark = new BookieBenchmark(args[0]);
+ benchmark.run();
+ }
+
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.benchmark;
+
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.log4j.Logger;
+
+public class BookkeeperBenchmark extends AbstractBenchmark{
+
+ static final Logger logger = Logger.getLogger(BookkeeperBenchmark.class);
+
+ BookKeeper bk;
+ LedgerHandle[] lh;
+
+ public BookkeeperBenchmark(String zkHostPort) throws Exception{
+ bk = new BookKeeper(zkHostPort);
+ int numLedgers = Integer.getInteger("nLedgers",5);
+ lh = new LedgerHandle[numLedgers];
+ int quorumSize = Integer.getInteger("quorum", 2);
+ int ensembleSize = Integer.getInteger("ensemble", 4);
+ DigestType digestType = DigestType.valueOf(System.getProperty("digestType", "CRC32"));
+ for (int i=0; i< numLedgers; i++){
+ lh[i] = bk.createLedger(ensembleSize, quorumSize, digestType, "blah".getBytes());
+ }
+
+ }
+
+
+ @Override
+ void doOps(final int numOps) throws Exception {
+ int size = Integer.getInteger("size", 1024);
+ byte[] msg = new byte[size];
+
+ int numOutstanding = Integer.getInteger("nPars",1000);
+ final Semaphore outstanding = new Semaphore(numOutstanding);
+
+ AddCallback callback = new AddCallback() {
+ AbstractCallback handler = new AbstractCallback(outstanding, numOps);
+
+
+ @Override
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ handler.handle(rc == BKException.Code.OK, ctx);
+ }
+
+ };
+
+
+
+ Random rand = new Random();
+
+ for (int i=0; i<numOps; i++){
+ outstanding.acquire();
+ lh[rand.nextInt(lh.length)].asyncAddEntry(msg, callback, System.currentTimeMillis());
+ }
+
+
+ }
+
+ @Override
+ public void tearDown() throws Exception{
+ bk.halt();
+ }
+
+
+ public static void main(String[] args) throws Exception{
+ BookkeeperBenchmark benchmark = new BookkeeperBenchmark(args[0]);
+ benchmark.run();
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.benchmark;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Log4JLoggerFactory;
+
+@ChannelPipelineCoverage("all")
+public class FakeBookie extends SimpleChannelHandler implements
+ ChannelPipelineFactory {
+ static final Logger logger = Logger.getLogger(FakeBookie.class);
+ ServerSocketChannelFactory serverChannelFactory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+
+ public FakeBookie(int port) {
+ InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
+ ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
+
+ bootstrap.setPipelineFactory(this);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+
+ logger.info("Going into receive loop");
+ // Bind and start to accept incoming connections.
+ bootstrap.bind(new InetSocketAddress(port));
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("lengthbaseddecoder",
+ new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("main", this);
+ return pipeline;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ if (!(e.getMessage() instanceof ChannelBuffer)) {
+ ctx.sendUpstream(e);
+ return;
+ }
+
+ ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+
+ int type = buffer.readInt();
+ buffer.readerIndex(24);
+ long ledgerId = buffer.readLong();
+ long entryId = buffer.readLong();
+
+ ChannelBuffer outBuf = ctx.getChannel().getConfig().getBufferFactory()
+ .getBuffer(24);
+ outBuf.writeInt(type);
+ outBuf.writeInt(0); // rc
+ outBuf.writeLong(ledgerId);
+ outBuf.writeLong(entryId);
+ e.getChannel().write(outBuf);
+
+ }
+
+
+ public static void main(String args[]){
+ new FakeBookie(Integer.parseInt(args[0]));
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.protobuf.ByteString;
+
+public class ByteStringInterner {
+ // TODO: how to release references when strings are no longer used. weak
+ // references?
+
+ private static ConcurrentMap<ByteString, ByteString> map = new ConcurrentHashMap<ByteString, ByteString>();
+
+ public static ByteString intern(ByteString in) {
+ ByteString presentValueInMap = map.get(in);
+ if (presentValueInMap != null) {
+ return presentValueInMap;
+ }
+
+ presentValueInMap = map.putIfAbsent(in, in);
+ if (presentValueInMap != null) {
+ return presentValueInMap;
+ }
+
+ return in;
+
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.conf.AbstractConfiguration;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class ServerConfiguration extends AbstractConfiguration {
+ protected final static String REGION = "region";
+ protected final static String MAX_MESSAGE_SIZE = "max_message_size";
+ protected final static String READAHEAD_COUNT = "readahead_count";
+ protected final static String READAHEAD_SIZE = "readahead_size";
+ protected final static String CACHE_SIZE = "cache_size";
+ protected final static String SCAN_BACKOFF_MSEC = "scan_backoff_ms";
+ protected final static String SERVER_PORT = "server_port";
+ protected final static String SSL_SERVER_PORT = "ssl_server_port";
+ protected final static String ZK_PREFIX = "zk_prefix";
+ protected final static String ZK_HOST = "zk_host";
+ protected final static String ZK_TIMEOUT = "zk_timeout";
+ protected final static String READAHEAD_ENABLED = "readhead_enabled";
+ protected final static String STANDALONE = "standalone";
+ protected final static String REGIONS = "regions";
+ protected final static String CERT_NAME = "cert_name";
+ protected final static String CERT_PATH = "cert_path";
+ protected final static String PASSWORD = "password";
+ protected final static String SSL_ENABLED = "ssl_enabled";
+ protected final static String CONSUME_INTERVAL = "consume_interval";
+ protected final static String RETENTION_SECS = "retention_secs";
+ protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
+ protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
+
+ // these are the derived attributes
+ protected ByteString myRegionByteString = null;
+ protected HedwigSocketAddress myServerAddress = null;
+ protected List<String> regionList = null;
+
+ // Although this method is not currently used, currently maintaining it like
+ // this so that we can support on-the-fly changes in configuration
+ protected void refreshDerivedAttributes() {
+ refreshMyRegionByteString();
+ refreshMyServerAddress();
+ refreshRegionList();
+ }
+
+ @Override
+ public void loadConf(URL confURL) throws ConfigurationException {
+ super.loadConf(confURL);
+ refreshDerivedAttributes();
+ }
+
+ public int getMaximumMessageSize() {
+ return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
+ }
+
+ public String getMyRegion() {
+ return conf.getString(REGION, "standalone");
+ }
+
+ protected void refreshMyRegionByteString() {
+ myRegionByteString = ByteString.copyFromUtf8(getMyRegion());
+ }
+
+ protected void refreshMyServerAddress() {
+ try {
+ // Use the raw IP address as the hostname
+ myServerAddress = new HedwigSocketAddress(InetAddress.getLocalHost().getHostAddress(), getServerPort(),
+ getSSLServerPort());
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // The expected format for the regions parameter is Hostname:Port:SSLPort
+ // with spaces in between each of the regions.
+ protected void refreshRegionList() {
+ String regions = conf.getString(REGIONS, "");
+ if (regions.isEmpty()) {
+ regionList = new LinkedList<String>();
+ } else {
+ regionList = Arrays.asList(regions.split(" "));
+ }
+ }
+
+ public ByteString getMyRegionByteString() {
+ if (myRegionByteString == null) {
+ refreshMyRegionByteString();
+ }
+ return myRegionByteString;
+ }
+
+ public int getReadAheadCount() {
+ return conf.getInt(READAHEAD_COUNT, 10);
+ }
+
+ public long getReadAheadSizeBytes() {
+ return conf.getLong(READAHEAD_SIZE, 4 * 1024 * 1024); // 4M
+ }
+
+ public long getMaximumCacheSize() {
+ // 2G or half of the maximum amount of memory the JVM uses
+ return conf.getLong(CACHE_SIZE, Math.min(2 * 1024L * 1024L * 1024L, Runtime.getRuntime().maxMemory() / 2));
+ }
+
+ // After a scan of a log fails, how long before we retry (in msec)
+ public long getScanBackoffPeriodMs() {
+ return conf.getLong(SCAN_BACKOFF_MSEC, 1000);
+ }
+
+ public int getServerPort() {
+ return conf.getInt(SERVER_PORT, 4080);
+ }
+
+ public int getSSLServerPort() {
+ return conf.getInt(SSL_SERVER_PORT, 9876);
+ }
+
+ public String getZkPrefix() {
+ return conf.getString(ZK_PREFIX, "/hedwig");
+ }
+
+ public StringBuilder getZkRegionPrefix(StringBuilder sb) {
+ return sb.append(getZkPrefix()).append("/").append(getMyRegion());
+ }
+
+ public StringBuilder getZkTopicsPrefix(StringBuilder sb) {
+ return getZkRegionPrefix(sb).append("/topics");
+ }
+
+ public StringBuilder getZkTopicPath(StringBuilder sb, ByteString topic) {
+ return getZkTopicsPrefix(sb).append("/").append(topic.toStringUtf8());
+ }
+
+ public StringBuilder getZkHostsPrefix(StringBuilder sb) {
+ return getZkRegionPrefix(sb).append("/hosts");
+ }
+
+ public HedwigSocketAddress getServerAddr() {
+ if (myServerAddress == null) {
+ refreshMyServerAddress();
+ }
+ return myServerAddress;
+ }
+
+ public String getZkHost() {
+ return conf.getString(ZK_HOST, "localhost");
+ }
+
+ public int getZkTimeout() {
+ return conf.getInt(ZK_TIMEOUT, 2000);
+ }
+
+ public boolean getReadAheadEnabled() {
+ return conf.getBoolean(READAHEAD_ENABLED, true);
+ }
+
+ public boolean isStandalone() {
+ return conf.getBoolean(STANDALONE, false);
+ }
+
+ public List<String> getRegions() {
+ if (regionList == null) {
+ refreshRegionList();
+ }
+ return regionList;
+ }
+
+ // This is the name of the SSL certificate if available as a resource.
+ public String getCertName() {
+ return conf.getString(CERT_NAME, "");
+ }
+
+ // This is the path to the SSL certificate if it is available as a file.
+ public String getCertPath() {
+ return conf.getString(CERT_PATH, "");
+ }
+
+ // This method return the SSL certificate as an InputStream based on if it
+ // is configured to be available as a resource or as a file. If nothing is
+ // configured correctly, then a ConfigurationException will be thrown as
+ // we do not know how to obtain the SSL certificate stream.
+ public InputStream getCertStream() throws FileNotFoundException, ConfigurationException {
+ String certName = getCertName();
+ String certPath = getCertPath();
+ if (certName != null && !certName.isEmpty()) {
+ return getClass().getResourceAsStream(certName);
+ } else if (certPath != null && !certPath.isEmpty()) {
+ return new FileInputStream(certPath);
+ } else
+ throw new ConfigurationException("SSL Certificate configuration does not have resource name or path set!");
+ }
+
+ public String getPassword() {
+ return conf.getString(PASSWORD, "");
+ }
+
+ public boolean isSSLEnabled() {
+ return conf.getBoolean(SSL_ENABLED, false);
+ }
+
+ public int getConsumeInterval() {
+ return conf.getInt(CONSUME_INTERVAL, 50);
+ }
+
+ public int getRetentionSecs() {
+ return conf.getInt(RETENTION_SECS, 0);
+ }
+
+ public boolean isInterRegionSSLEnabled() {
+ return conf.getBoolean(INTER_REGION_SSL_ENABLED, false);
+ }
+
+ // This parameter is used to determine how often we run the
+ // SubscriptionManager's Messages Consumed timer task thread (in
+ // milliseconds).
+ public int getMessagesConsumedThreadRunInterval() {
+ return conf.getInt(MESSAGES_CONSUMED_THREAD_RUN_INTERVAL, 60000);
+ }
+
+ /*
+ * Is this a valid configuration that we can run with? This code might grow
+ * over time.
+ */
+ public void validate() throws ConfigurationException {
+ if (!getZkPrefix().startsWith("/")) {
+ throw new ConfigurationException(ZK_PREFIX + " must start with a /");
+ }
+ // Validate that if Regions exist and inter-region communication is SSL
+ // enabled, that the Regions correspond to valid HedwigSocketAddresses,
+ // namely that SSL ports are present.
+ if (isInterRegionSSLEnabled() && getRegions().size() > 0) {
+ for (String hubString : getRegions()) {
+ HedwigSocketAddress hub = new HedwigSocketAddress(hubString);
+ if (hub.getSSLSocketAddress() == null)
+ throw new ConfigurationException("Region defined does not have required SSL port: " + hubString);
+ }
+ }
+
+ // add other checks here
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+import org.apache.log4j.Logger;
+
+public class TerminateJVMExceptionHandler implements Thread.UncaughtExceptionHandler {
+ static Logger logger = Logger.getLogger(TerminateJVMExceptionHandler.class);
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.fatal("Uncaught exception in thread " + t.getName(), e);
+ System.exit(1);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+
+public class TopicOpQueuer {
+ /**
+ * Map from topic to the queue of operations for that topic.
+ */
+ protected HashMap<ByteString, Queue<Runnable>> topic2ops = new HashMap<ByteString, Queue<Runnable>>();
+
+ protected final ScheduledExecutorService scheduler;
+
+ public TopicOpQueuer(ScheduledExecutorService scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public interface Op extends Runnable {
+ }
+
+ public abstract class AsynchronousOp<T> implements Op {
+ final public ByteString topic;
+ final public Callback<T> cb;
+ final public Object ctx;
+
+ public AsynchronousOp(final ByteString topic, final Callback<T> cb, Object ctx) {
+ this.topic = topic;
+ this.cb = new Callback<T>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ cb.operationFailed(ctx, exception);
+ popAndRunNext(topic);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, T resultOfOperation) {
+ cb.operationFinished(ctx, resultOfOperation);
+ popAndRunNext(topic);
+ }
+ };
+ this.ctx = ctx;
+ }
+ }
+
+ public abstract class SynchronousOp implements Op {
+ final public ByteString topic;
+
+ public SynchronousOp(ByteString topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public final void run() {
+ runInternal();
+ popAndRunNext(topic);
+ }
+
+ protected abstract void runInternal();
+
+ }
+
+ protected synchronized void popAndRunNext(ByteString topic) {
+ Queue<Runnable> ops = topic2ops.get(topic);
+ if (!ops.isEmpty())
+ ops.remove();
+ if (!ops.isEmpty())
+ scheduler.submit(ops.peek());
+ }
+
+ public void pushAndMaybeRun(ByteString topic, Op op) {
+ int size;
+ synchronized (this) {
+ Queue<Runnable> ops = topic2ops.get(topic);
+ if (ops == null) {
+ ops = new LinkedList<Runnable>();
+ topic2ops.put(topic, ops);
+ }
+ ops.add(op);
+ size = ops.size();
+ }
+ if (size == 1)
+ op.run();
+ }
+
+ public Runnable peek(ByteString topic) {
+ return topic2ops.get(topic).peek();
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.common;
+
+public class UnexpectedError extends Error {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public UnexpectedError(String msg) {
+ super(msg);
+ }
+
+ public UnexpectedError(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.server.common.UnexpectedError;
+
+public class ChannelEndPoint implements DeliveryEndPoint, ChannelFutureListener {
+
+ Channel channel;
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ Map<ChannelFuture, DeliveryCallback> callbacks = new HashMap<ChannelFuture, DeliveryCallback>();
+
+ public ChannelEndPoint(Channel channel) {
+ this.channel = channel;
+ }
+
+ public void close() {
+ channel.close();
+ }
+
+ public void send(PubSubResponse response, DeliveryCallback callback) {
+ ChannelFuture future = channel.write(response);
+ callbacks.put(future, callback);
+ future.addListener(this);
+ }
+
+ public void operationComplete(ChannelFuture future) throws Exception {
+ DeliveryCallback callback = callbacks.get(future);
+ callbacks.remove(future);
+
+ if (callback == null) {
+ throw new UnexpectedError("Could not locate callback for channel future");
+ }
+
+ if (future.isSuccess()) {
+ callback.sendingFinished();
+ } else {
+ // treat all channel errors as permanent
+ callback.permanentErrorOnSend();
+ }
+
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ChannelEndPoint) {
+ ChannelEndPoint channelEndPoint = (ChannelEndPoint) obj;
+ return channel.equals(channelEndPoint.channel);
+ } else {
+ return false;
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+public interface DeliveryCallback {
+
+ public void sendingFinished();
+
+ public void transientErrorOnSend();
+
+ public void permanentErrorOnSend();
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+
+public interface DeliveryEndPoint {
+
+ public void send(PubSubResponse response, DeliveryCallback callback);
+
+ public void close();
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.server.subscriptions.MessageFilter;
+
+public interface DeliveryManager {
+ public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
+ DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber);
+
+ public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
+}
|