Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A25B6200C16 for ; Wed, 25 Jan 2017 18:24:24 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A0D16160B4E; Wed, 25 Jan 2017 17:24:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5EC1D160B5D for ; Wed, 25 Jan 2017 18:24:22 +0100 (CET) Received: (qmail 98030 invoked by uid 500); 25 Jan 2017 17:24:21 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 97980 invoked by uid 99); 25 Jan 2017 17:24:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jan 2017 17:24:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0F934DFB0E; Wed, 25 Jan 2017 17:24:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mwalch@apache.org To: commits@accumulo.apache.org Date: Wed, 25 Jan 2017 17:24:23 -0000 Message-Id: In-Reply-To: <8f02418ade7b48338326154cfd8b8dee@git.apache.org> References: <8f02418ade7b48338326154cfd8b8dee@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] accumulo-testing git commit: ACCUMULO-4510 Refactored Continous Ingest tests archived-at: Wed, 25 Jan 2017 17:24:24 -0000 http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/start-stats.sh ---------------------------------------------------------------------- diff --git a/continuous/start-stats.sh b/continuous/start-stats.sh deleted file mode 100755 index 0a90364..0000000 --- a/continuous/start-stats.sh +++ /dev/null @@ -1,49 +0,0 @@ -#! /usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Start: Resolve Script Directory -SOURCE="${BASH_SOURCE[0]}" -while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink - bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) - SOURCE=$(readlink "${SOURCE}") - [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) -script=$( basename "${SOURCE}" ) -# Stop: Resolve Script Directory - -CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} -. "$CONTINUOUS_CONF_DIR/continuous-env.sh" - -mkdir -p "$CONTINUOUS_LOG_DIR" - -CONFIG_OUT=$CONTINUOUS_LOG_DIR/$(date +%Y%m%d%H%M%S)_$(hostname)_config.out - -cat "$ACCUMULO_CONF_DIR/accumulo-env.sh" > "$CONFIG_OUT" -echo >> "$CONFIG_OUT" -echo -e "config -np\nconfig -t $TABLE -np\nquit" | "$ACCUMULO_HOME/bin/accumulo" shell -u "$USER" -p "$PASS" >> "$CONFIG_OUT" -echo >> "$CONFIG_OUT" -cat "$CONTINUOUS_CONF_DIR/continuous-env.sh" >> "$CONFIG_OUT" -echo >> "$CONFIG_OUT" -wc -l "$CONTINUOUS_CONF_DIR/walkers.txt" >> "$CONFIG_OUT" -wc -l "$CONTINUOUS_CONF_DIR/ingesters.txt" >> "$CONFIG_OUT" -wc -l "$CONTINUOUS_CONF_DIR/scanners.txt" >> "$CONFIG_OUT" -wc -l "$CONTINUOUS_CONF_DIR/batch_walkers.txt" >> "$CONFIG_OUT" - - -nohup "$ACCUMULO_HOME/bin/accumulo" org.apache.accumulo.test.continuous.ContinuousStatsCollector --table "$TABLE" -i "$INSTANCE_NAME" -z "$ZOO_KEEPERS" -u "$USER" -p "$PASS" >"$CONTINUOUS_LOG_DIR/$(date +%Y%m%d%H%M%S)_$(hostname)_stats.out" 2>"$CONTINUOUS_LOG_DIR/$(date +%Y%m%d%H%M%S)_$(hostname)_stats.err" & - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/start-walkers.sh ---------------------------------------------------------------------- diff --git a/continuous/start-walkers.sh b/continuous/start-walkers.sh deleted file mode 100755 index d9bbff4..0000000 --- a/continuous/start-walkers.sh +++ /dev/null @@ -1,41 +0,0 @@ -#! /usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Start: Resolve Script Directory -SOURCE="${BASH_SOURCE[0]}" -while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink - bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) - SOURCE=$(readlink "${SOURCE}") - [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) -script=$( basename "${SOURCE}" ) -# Stop: Resolve Script Directory - -CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} -. "$CONTINUOUS_CONF_DIR/continuous-env.sh" - -DEBUG_OPT='' -if [[ "$DEBUG_WALKER" == "on" ]] ; then - DEBUG_OPT="--debug $CONTINUOUS_LOG_DIR/\`date +%Y%m%d%H%M%S\`_\`hostname\`_walk.log"; -fi - -AUTH_OPT='' -[[ -n "$AUTHS" ]] && AUTH_OPT="--auths \"$AUTHS\"" - -pssh -h "$CONTINUOUS_CONF_DIR/walkers.txt" "mkdir -p $CONTINUOUS_LOG_DIR; nohup $ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.continuous.ContinuousWalk $DEBUG_OPT $AUTH_OPT -i $INSTANCE_NAME -z $ZOO_KEEPERS -u $USER -p $PASS --table $TABLE --min $MIN --max $MAX --sleep $SLEEP_TIME >$CONTINUOUS_LOG_DIR/\`date +%Y%m%d%H%M%S\`_\`hostname\`_walk.out 2>$CONTINUOUS_LOG_DIR/\`date +%Y%m%d%H%M%S\`_\`hostname\`_walk.err &" < /dev/null - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-agitator.sh ---------------------------------------------------------------------- diff --git a/continuous/stop-agitator.sh b/continuous/stop-agitator.sh deleted file mode 100755 index d8f30e4..0000000 --- a/continuous/stop-agitator.sh +++ /dev/null @@ -1,51 +0,0 @@ -#! /usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Start: Resolve Script Directory -SOURCE="${BASH_SOURCE[0]}" -while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink - bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) - SOURCE=$(readlink "${SOURCE}") - [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) -script=$( basename "${SOURCE}" ) -# Stop: Resolve Script Directory - -CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} -. "$CONTINUOUS_CONF_DIR/continuous-env.sh" - -# Try to use sudo when we wouldn't normally be able to kill the processes -[[ -n $AGITATOR_USER ]] || AGITATOR_USER=$(whoami) -if [[ $AGITATOR_USER == root ]]; then - echo "Stopping all processes matching 'agitator.pl' as root" - pkill -f agitator.pl 2>/dev/null -elif [[ $AGITATOR_USER == "$ACCUMULO_USER" ]]; then - echo "Stopping all processes matching 'datanode-agitator.pl' as $HDFS_USER" - sudo -u "$HDFS_USER" pkill -f datanode-agitator.pl 2>/dev/null - echo "Stopping all processes matching 'hdfs-agitator.pl' as $HDFS_USER" - sudo -u "$HDFS_USER" pkill -f hdfs-agitator.pl 2>/dev/null - echo "Stopping all processes matching 'agitator.pl' as $AGITATOR_USER" - pkill -f agitator.pl 2>/dev/null 2>/dev/null -else - echo "Stopping all processes matching 'datanode-agitator.pl' as $HDFS_USER" - sudo -u "$HDFS_USER" pkill -f datanode-agitator.pl 2>/dev/null - echo "Stopping all processes matching 'hdfs-agitator.pl' as $HDFS_USER" - sudo -u "$HDFS_USER" pkill -f hdfs-agitator.pl 2>/dev/null - echo "Stopping all processes matching 'agitator.pl' as $ACCUMULO_USER" - sudo -u "$ACCUMULO_USER" pkill -f agitator.pl 2>/dev/null -fi - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-batchwalkers.sh ---------------------------------------------------------------------- diff --git a/continuous/stop-batchwalkers.sh b/continuous/stop-batchwalkers.sh deleted file mode 100755 index 4696387..0000000 --- a/continuous/stop-batchwalkers.sh +++ /dev/null @@ -1,33 +0,0 @@ -#! /usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Start: Resolve Script Directory -SOURCE="${BASH_SOURCE[0]}" -while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink - bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) - SOURCE=$(readlink "${SOURCE}") - [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) -script=$( basename "${SOURCE}" ) -# Stop: Resolve Script Directory - -CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} -. "$CONTINUOUS_CONF_DIR/continuous-env.sh" - -pssh -h "$CONTINUOUS_CONF_DIR/batch_walkers.txt" "pkill -f '[o]rg.apache.accumulo.test.continuous.ContinuousBatchWalker'" < /dev/null - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-ingest.sh ---------------------------------------------------------------------- diff --git a/continuous/stop-ingest.sh b/continuous/stop-ingest.sh deleted file mode 100755 index d159bf7..0000000 --- a/continuous/stop-ingest.sh +++ /dev/null @@ -1,33 +0,0 @@ -#! /usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Start: Resolve Script Directory -SOURCE="${BASH_SOURCE[0]}" -while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink - bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) - SOURCE=$(readlink "${SOURCE}") - [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) -script=$( basename "${SOURCE}" ) -# Stop: Resolve Script Directory - -CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} -. "$CONTINUOUS_CONF_DIR/continuous-env.sh" - -pssh -h "$CONTINUOUS_CONF_DIR/ingesters.txt" "pkill -f '[o]rg.apache.accumulo.test.continuous.ContinuousIngest'" < /dev/null - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-scanners.sh ---------------------------------------------------------------------- diff --git a/continuous/stop-scanners.sh b/continuous/stop-scanners.sh deleted file mode 100755 index cf927b0..0000000 --- a/continuous/stop-scanners.sh +++ /dev/null @@ -1,33 +0,0 @@ -#! /usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Start: Resolve Script Directory -SOURCE="${BASH_SOURCE[0]}" -while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink - bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) - SOURCE=$(readlink "${SOURCE}") - [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) -script=$( basename "${SOURCE}" ) -# Stop: Resolve Script Directory - -CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} -. "$CONTINUOUS_CONF_DIR/continuous-env.sh" - -pssh -h "$CONTINUOUS_CONF_DIR/scanners.txt" "pkill -f '[o]rg.apache.accumulo.test.continuous.ContinuousScanner'" < /dev/null - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-stats.sh ---------------------------------------------------------------------- diff --git a/continuous/stop-stats.sh b/continuous/stop-stats.sh deleted file mode 100755 index 9886eec..0000000 --- a/continuous/stop-stats.sh +++ /dev/null @@ -1,33 +0,0 @@ -#! /usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Start: Resolve Script Directory -SOURCE="${BASH_SOURCE[0]}" -while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink - bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) - SOURCE=$(readlink "${SOURCE}") - [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) -script=$( basename "${SOURCE}" ) -# Stop: Resolve Script Directory - -CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} -. "$CONTINUOUS_CONF_DIR/continuous-env.sh" - -pkill -f org.apache.accumulo.test.continuous.ContinuousStatsCollector - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-walkers.sh ---------------------------------------------------------------------- diff --git a/continuous/stop-walkers.sh b/continuous/stop-walkers.sh deleted file mode 100755 index 2c22cfa..0000000 --- a/continuous/stop-walkers.sh +++ /dev/null @@ -1,33 +0,0 @@ -#! /usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Start: Resolve Script Directory -SOURCE="${BASH_SOURCE[0]}" -while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink - bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) - SOURCE=$(readlink "${SOURCE}") - [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) -script=$( basename "${SOURCE}" ) -# Stop: Resolve Script Directory - -CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} -. "$CONTINUOUS_CONF_DIR/continuous-env.sh" - -pssh -h "$CONTINUOUS_CONF_DIR/walkers.txt" "pkill -f '[o]rg.apache.accumulo.test.continuous.ContinuousWalk'" < /dev/null - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/tserver-agitator.pl ---------------------------------------------------------------------- diff --git a/continuous/tserver-agitator.pl b/continuous/tserver-agitator.pl deleted file mode 100755 index 0e65a50..0000000 --- a/continuous/tserver-agitator.pl +++ /dev/null @@ -1,134 +0,0 @@ -#! /usr/bin/env perl - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -use POSIX qw(strftime); -use Cwd qw(); - -if(scalar(@ARGV) != 4 && scalar(@ARGV) != 2){ - print "Usage : tserver-agitator.pl [:max sleep before kill in minutes] [:max sleep before tup in minutes] [ ]\n"; - exit(1); -} - -my $ACCUMULO_HOME; -if( defined $ENV{'ACCUMULO_HOME'} ){ - $ACCUMULO_HOME = $ENV{'ACCUMULO_HOME'}; -} else { - $cwd=Cwd::cwd(); - $ACCUMULO_HOME=$cwd . '/../../..'; -} - -print "ACCUMULO_HOME=$ACCUMULO_HOME\n"; - -@sleeprange1 = split(/:/, $ARGV[0]); -$sleep1 = $sleeprange1[0]; - -@sleeprange2 = split(/:/, $ARGV[1]); -$sleep2 = $sleeprange2[0]; - -if (scalar(@sleeprange1) > 1) { - $sleep1max = $sleeprange1[1] + 1; -} else { - $sleep1max = $sleep1; -} - -if ($sleep1 > $sleep1max) { - die("sleep1 > sleep1max $sleep1 > $sleep1max"); -} - -if (scalar(@sleeprange2) > 1) { - $sleep2max = $sleeprange2[1] + 1; -} else { - $sleep2max = $sleep2; -} - -if($sleep2 > $sleep2max){ - die("sleep2 > sleep2max $sleep2 > $sleep2max"); -} - -if(defined $ENV{'ACCUMULO_CONF_DIR'}){ - $ACCUMULO_CONF_DIR = $ENV{'ACCUMULO_CONF_DIR'}; -}else{ - $ACCUMULO_CONF_DIR = $ACCUMULO_HOME . '/conf'; -} - -if(scalar(@ARGV) == 4){ - $minKill = $ARGV[2]; - $maxKill = $ARGV[3]; -}else{ - $minKill = 1; - $maxKill = 1; -} - -if($minKill > $maxKill){ - die("minKill > maxKill $minKill > $maxKill"); -} - -@tserversRaw = `cat $ACCUMULO_CONF_DIR/tservers`; -chomp(@tserversRaw); - -for $tserver (@tserversRaw){ - if($tserver eq "" || substr($tserver,0,1) eq "#"){ - next; - } - - push(@tservers, $tserver); -} - - -if(scalar(@tservers) < $maxKill){ - print STDERR "WARN setting maxKill to ".scalar(@tservers)."\n"; - $maxKill = scalar(@tservers); -} - -if ($minKill > $maxKill){ - print STDERR "WARN setting minKill to equal maxKill\n"; - $minKill = $maxKill; -} - -while(1){ - - $numToKill = int(rand($maxKill - $minKill + 1)) + $minKill; - %killed = {}; - $server = ""; - - for($i = 0; $i < $numToKill; $i++){ - while($server eq "" || $killed{$server} != undef){ - $index = int(rand(scalar(@tservers))); - $server = $tservers[$index]; - } - - $killed{$server} = 1; - - $t = strftime "%Y%m%d %H:%M:%S", localtime; - - print STDERR "$t Killing tserver on $server\n"; - # We're the accumulo user, just run the commandj - system("$ACCUMULO_HOME/bin/stop-server.sh $server 'accumulo-start.jar' tserver KILL"); - } - - $nextsleep2 = int(rand($sleep2max - $sleep2)) + $sleep2; - sleep($nextsleep2 * 60); - $t = strftime "%Y%m%d %H:%M:%S", localtime; - print STDERR "$t Running tup\n"; - # restart the as them as the accumulo user - system("$ACCUMULO_HOME/bin/tup.sh"); - - $nextsleep1 = int(rand($sleep1max - $sleep1)) + $sleep1; - sleep($nextsleep1 * 60); -} - http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/walkers.txt.example ---------------------------------------------------------------------- diff --git a/continuous/walkers.txt.example b/continuous/walkers.txt.example deleted file mode 100644 index b59052d..0000000 --- a/continuous/walkers.txt.example +++ /dev/null @@ -1,17 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -host3 -host4 http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/TestEnv.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/TestEnv.java b/core/src/main/java/org/apache/accumulo/testing/core/TestEnv.java new file mode 100644 index 0000000..55fecb7 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/TestEnv.java @@ -0,0 +1,179 @@ +package org.apache.accumulo.testing.core; + +import static java.util.Objects.requireNonNull; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.tools.CLI; +import org.apache.hadoop.security.UserGroupInformation; + +public class TestEnv { + + protected final Properties p; + private Instance instance = null; + private Connector connector = null; + + /** + * Creates new test environment using provided properties + * + * @param p Properties + */ + public TestEnv(Properties p) { + requireNonNull(p); + this.p = p; + } + + /** + * Gets a copy of the configuration properties. + * + * @return a copy of the configuration properties + */ + public Properties copyConfigProperties() { + return new Properties(p); + } + + /** + * Gets a configuration property. + * + * @param key key + * @return property value + */ + public String getConfigProperty(String key) { + return p.getProperty(key); + } + + /** + * Gets the configured username. + * + * @return username + */ + public String getAccumuloUserName() { + return p.getProperty(TestProps.ACCUMULO_USERNAME); + } + + /** + * Gets the configured password. + * + * @return password + */ + public String getAccumuloPassword() { + return p.getProperty(TestProps.ACCUMULO_PASSWORD); + } + + /** + * Gets the configured keytab. + * + * @return path to keytab + */ + public String getAccumuloKeytab() { + return p.getProperty(TestProps.ACCUMULO_KEYTAB); + } + + /** + * Gets this process's ID. + * + * @return pid + */ + public String getPid() { + return ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; + } + + + public Configuration getHadoopConfiguration() { + Configuration config = new Configuration(); + config.set("mapreduce.framework.name", "yarn"); + // Setting below are required due to bundled jar breaking default config. + // See http://stackoverflow.com/questions/17265002/hadoop-no-filesystem-for-scheme-file + config.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + config.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + return config; + } + + /** + * Gets an authentication token based on the configured password. + */ + public AuthenticationToken getToken() { + String password = getAccumuloPassword(); + if (null != password) { + return new PasswordToken(getAccumuloPassword()); + } + String keytab = getAccumuloKeytab(); + if (null != keytab) { + File keytabFile = new File(keytab); + if (!keytabFile.exists() || !keytabFile.isFile()) { + throw new IllegalArgumentException("Provided keytab is not a normal file: " + keytab); + } + try { + UserGroupInformation.loginUserFromKeytab(getAccumuloUserName(), keytabFile.getAbsolutePath()); + return new KerberosToken(); + } catch (IOException e) { + throw new RuntimeException("Failed to login", e); + } + } + throw new IllegalArgumentException("Must provide password or keytab in configuration"); + } + + public String getAccumuloInstanceName() { + return p.getProperty(TestProps.ACCUMULO_INSTANCE); + } + + public String getZookeepers() { + return p.getProperty(TestProps.ZOOKEEPERS); + } + + public ClientConfiguration getClientConfiguration() { + return ClientConfiguration.loadDefault().withInstance(getAccumuloInstanceName()) + .withZkHosts(getZookeepers()); + } + + /** + * Gets an Accumulo instance object. The same instance is reused after the first call. + */ + public Instance getAccumuloInstance() { + if (instance == null) { + this.instance = new ZooKeeperInstance(getClientConfiguration()); + } + return instance; + } + + /** + * Gets an Accumulo connector. The same connector is reused after the first call. + */ + public Connector getAccumuloConnector() throws AccumuloException, AccumuloSecurityException { + if (connector == null) { + connector = getAccumuloInstance().getConnector(getAccumuloUserName(), getToken()); + } + return connector; + } + + public BatchWriterConfig getBatchWriterConfig() { + int numThreads = Integer.parseInt(p.getProperty(TestProps.BW_NUM_THREADS)); + long maxLatency = Long.parseLong(p.getProperty(TestProps.BW_MAX_LATENCY_MS)); + long maxMemory = Long.parseLong(p.getProperty(TestProps.BW_MAX_MEM_BYTES)); + + BatchWriterConfig config = new BatchWriterConfig(); + config.setMaxWriteThreads(numThreads); + config.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS); + config.setMaxMemory(maxMemory); + return config; + } + + public int getScannerBatchSize() { + return Integer.parseInt(p.getProperty(TestProps.SCANNER_BATCH_SIZE)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java b/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java index f8ce9ca..e134c7f 100644 --- a/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java +++ b/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java @@ -17,11 +17,21 @@ package org.apache.accumulo.testing.core; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + public class TestProps { private static final String PREFIX = "test."; - private static final String RANDOMWALK = PREFIX + "randomwalk."; private static final String COMMON = PREFIX + "common."; + private static final String CI = PREFIX + "ci."; + private static final String CI_COMMON = CI + "common."; + private static final String CI_INGEST = CI + "ingest."; + private static final String CI_WALKER = CI + "walker."; + private static final String CI_BW = CI + "batch.walker."; + private static final String CI_SCANNER = CI + "scanner."; + private static final String CI_VERIFY = CI + "verify."; /** Common properties **/ // Zookeeper connection string @@ -38,16 +48,80 @@ public class TestProps { public static final String YARN_CONTAINER_MEMORY_MB = COMMON + "yarn.container.memory.mb"; // Number of cores given to each YARN container public static final String YARN_CONTAINER_CORES = COMMON + "yarn.container.cores"; + // Max memory (in bytes) each batch writer will use to buffer writes + public static final String BW_MAX_MEM_BYTES = COMMON + "bw.max.memory.bytes"; + // Max the maximum time (in ms) each batch writer will buffer data + public static final String BW_MAX_LATENCY_MS = COMMON + "bw.max.latency.ms"; + // Number of threads each batch writer will use to write data + public static final String BW_NUM_THREADS = COMMON + "bw.num.threads"; + // Number of thread for each batch scanner + public static final String BS_NUM_THREADS = COMMON + "bw.num.threads"; + // Number of key/value entries to pull during scan + public static final String SCANNER_BATCH_SIZE = COMMON + "scanner.batch.size"; + + /** Continuous ingest test properties **/ + /** Common **/ + // Accumulo table used by continuous ingest tests + public static final String CI_COMMON_ACCUMULO_TABLE = CI_COMMON + "accumulo.table"; + // Number of tablets that should exist in Accumulo table when created + public static final String CI_COMMON_ACCUMULO_NUM_TABLETS = CI_COMMON + "accumulo.num.tablets"; + // Optional authorizations (in CSV format) that if specified will be randomly selected by scanners + // and walkers + public static final String CI_COMMON_AUTHS = CI_COMMON + "auths"; + + /** Ingest **/ + // Number of entries each ingest client should write + public static final String CI_INGEST_CLIENT_ENTRIES = CI_INGEST + "client.entries"; + // Minimum random row to generate + public static final String CI_INGEST_ROW_MIN = CI_INGEST + "row.min"; + // Maximum random row to generate + public static final String CI_INGEST_ROW_MAX = CI_INGEST + "row.max"; + // Maximum number of random column families to generate + public static final String CI_INGEST_MAX_CF = CI_INGEST + "max.cf"; + // Maximum number of random column qualifiers to generate + public static final String CI_INGEST_MAX_CQ = CI_INGEST + "max.cq"; + // Optional visibilities (in CSV format) that if specified will be randomly selected by ingesters for + // each linked list + public static final String CI_INGEST_VISIBILITIES = CI_INGEST + "visibilities"; + // Checksums will be generated during ingest if set to true + public static final String CI_INGEST_CHECKSUM = CI_INGEST + "checksum"; + + /** Batch Walker **/ + // Sleep time between batch scans (in ms) + public static final String CI_BW_SLEEP_MS = CI_BW + "sleep.ms"; + // Scan batch size + public static final String CI_BW_BATCH_SIZE = CI_BW + "batch.size"; + + /** Walker **/ + // Sleep time between scans (in ms) + public static final String CI_WALKER_SLEEP_MS = CI_WALKER + "sleep.ms"; + + /** Scanner **/ + // Sleep time between scans (in ms) + public static final String CI_SCANNER_SLEEP_MS = CI_SCANNER + "sleep.ms"; + // Scanner entries + public static final String CI_SCANNER_ENTRIES = CI_SCANNER + "entries"; + /** Verify **/ + // Maximum number of mapreduce mappers + public static final String CI_VERIFY_MAX_MAPS = CI_VERIFY + "max.maps"; + // Number of mapreduce reducers + public static final String CI_VERIFY_REDUCERS = CI_VERIFY + "reducers"; + // Perform the verification directly on the files while the table is offline" + public static final String CI_VERIFY_SCAN_OFFLINE = CI_VERIFY + "scan.offline"; + // Comma separated list of auths to use for verify + public static final String CI_VERIFY_AUTHS = CI_VERIFY + "auths"; + // Location in HDFS to store output + public static final String CI_VERIFY_OUTPUT_DIR = CI_VERIFY + "output.dir"; - /** Random walk properties **/ - // Number of random walker (if running in YARN) - public static final String RW_NUM_WALKERS = RANDOMWALK + "num.walkers"; - // Max memory for multi-table batch writer - public static final String RW_BW_MAX_MEM = RANDOMWALK + "bw.max.mem"; - // Max latency in milliseconds for multi-table batch writer - public static final String RW_BW_MAX_LATENCY = RANDOMWALK + "bw.max.latency"; - // Number of write thread for multi-table batch writer - public static final String RW_BW_NUM_THREADS = RANDOMWALK + "bw.num.threads"; + public static Properties loadFromFile(String propsFilePath) throws IOException { + return loadFromStream(new FileInputStream(propsFilePath)); + } + public static Properties loadFromStream(FileInputStream fis) throws IOException { + Properties props = new Properties(); + props.load(fis); + fis.close(); + return props; + } } http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java index e89f2eb..0282c2b 100644 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java @@ -21,13 +21,11 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.cli.BatchScannerOpts; -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.cli.ScannerOpts; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; @@ -35,50 +33,44 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.core.TestProps; import org.apache.hadoop.io.Text; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.validators.PositiveInteger; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; public class ContinuousBatchWalker { - static class Opts extends ContinuousWalk.Opts { - @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class) - long numToScan = 0; - } - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - ScannerOpts scanOpts = new ScannerOpts(); - BatchScannerOpts bsOpts = new BatchScannerOpts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousBatchWalker.class.getName(), args, scanOpts, bsOpts, opts); + Properties props = TestProps.loadFromFile(args[0]); + + ContinuousEnv env = new ContinuousEnv(props); + + Authorizations auths = env.getRandomAuthorizations(); + Connector conn = env.getAccumuloConnector(); + Scanner scanner = ContinuousUtil.createScanner(conn, env.getAccumuloTableName(), auths); + int scanBatchSize = Integer.parseInt(props.getProperty(TestProps.CI_BW_BATCH_SIZE)); + scanner.setBatchSize(scanBatchSize); Random r = new Random(); - Authorizations auths = opts.randomAuths.getAuths(r); - Connector conn = clientOpts.getConnector(); - Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), auths); - scanner.setBatchSize(scanOpts.scanBatchSize); + int scanThreads = Integer.parseInt(props.getProperty(TestProps.BS_NUM_THREADS)); while (true) { - BatchScanner bs = conn.createBatchScanner(clientOpts.getTableName(), auths, bsOpts.scanThreads); - bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS); + BatchScanner bs = conn.createBatchScanner(env.getAccumuloTableName(), auths, scanThreads); - Set batch = getBatch(scanner, opts.min, opts.max, scanOpts.scanBatchSize, r); + Set batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), scanBatchSize, r); List ranges = new ArrayList<>(batch.size()); for (Text row : batch) { ranges.add(new Range(row)); } - runBatchScan(scanOpts.scanBatchSize, bs, batch, ranges); + runBatchScan(scanBatchSize, bs, batch, ranges); - sleepUninterruptibly(opts.sleepTime, TimeUnit.MILLISECONDS); + int bwSleepMs = Integer.parseInt(props.getProperty(TestProps.CI_BW_SLEEP_MS)); + sleepUninterruptibly(bwSleepMs, TimeUnit.MILLISECONDS); } - } private static void runBatchScan(int batchSize, BatchScanner bs, Set batch, List ranges) { @@ -117,7 +109,6 @@ public class ContinuousBatchWalker { } else { System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count, (int) (rowsSeen.size() / ((t2 - t1) / 1000.0))); } - } private static void addRow(int batchSize, Value v) { @@ -171,5 +162,4 @@ public class ContinuousBatchWalker { return ret; } - } http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousEnv.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousEnv.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousEnv.java new file mode 100644 index 0000000..7907ffd --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousEnv.java @@ -0,0 +1,66 @@ +package org.apache.accumulo.testing.core.continuous; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.core.TestEnv; +import org.apache.accumulo.testing.core.TestProps; + +class ContinuousEnv extends TestEnv { + + private List authList; + + ContinuousEnv(Properties props) { + super(props); + } + + /** + * @return Accumulo authorizations list + */ + private List getAuthList() { + if (authList == null) { + String authValue = p.getProperty(TestProps.CI_COMMON_AUTHS); + if (authValue == null || authValue.trim().isEmpty()) { + authList = Collections.singletonList(Authorizations.EMPTY); + } else { + authList = new ArrayList<>(); + for (String a : authValue.split("|")) { + authList.add(new Authorizations(a.split(","))); + } + } + } + return authList; + } + + /** + * @return random authorization + */ + Authorizations getRandomAuthorizations() { + Random r = new Random(); + return getAuthList().get(r.nextInt(getAuthList().size())); + } + + long getRowMin() { + return Long.parseLong(p.getProperty(TestProps.CI_INGEST_ROW_MIN)); + } + + long getRowMax() { + return Long.parseLong(p.getProperty(TestProps.CI_INGEST_ROW_MAX)); + } + + int getMaxColF() { + return Integer.parseInt(p.getProperty(TestProps.CI_INGEST_MAX_CF)); + } + + int getMaxColQ() { + return Integer.parseInt(p.getProperty(TestProps.CI_INGEST_MAX_CQ)); + } + + String getAccumuloTableName() { + return p.getProperty(TestProps.CI_COMMON_ACCUMULO_TABLE); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java index 4681cb8..f260e78 100644 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java @@ -18,18 +18,15 @@ package org.apache.accumulo.testing.core.continuous; import static java.nio.charset.StandardCharsets.UTF_8; -import java.io.BufferedReader; -import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.zip.CRC32; import java.util.zip.Checksum; -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.cli.ClientOnDefaultTable; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; @@ -40,9 +37,7 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.trace.CountSampler; import org.apache.accumulo.core.trace.Trace; import org.apache.accumulo.core.util.FastFormat; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.accumulo.testing.core.TestProps; import org.apache.hadoop.io.Text; public class ContinuousIngest { @@ -51,49 +46,44 @@ public class ContinuousIngest { private static List visibilities; - private static void initVisibilities(ContinuousOpts opts) throws Exception { - if (opts.visFile == null) { - visibilities = Collections.singletonList(new ColumnVisibility()); - return; - } - - visibilities = new ArrayList<>(); - - FileSystem fs = FileSystem.get(new Configuration()); - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8)); - - String line; - - while ((line = in.readLine()) != null) { - visibilities.add(new ColumnVisibility(line)); - } - - in.close(); - } - private static ColumnVisibility getVisibility(Random rand) { return visibilities.get(rand.nextInt(visibilities.size())); } public static void main(String[] args) throws Exception { - ContinuousOpts opts = new ContinuousOpts(); - BatchWriterOpts bwOpts = new BatchWriterOpts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts, opts); + if (args.length != 1) { + System.err.println("Usage: ContinuousIngest "); + System.exit(-1); + } + + Properties props = TestProps.loadFromFile(args[0]); + + String vis = props.getProperty(TestProps.CI_INGEST_VISIBILITIES); + if (vis == null) { + visibilities = Collections.singletonList(new ColumnVisibility()); + } else { + visibilities = new ArrayList<>(); + for (String v : vis.split(",")) { + visibilities.add(new ColumnVisibility(v.trim())); + } + } - initVisibilities(opts); + ContinuousEnv env = new ContinuousEnv(props); - if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) { + long rowMin = env.getRowMin(); + long rowMax = env.getRowMax(); + if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) { throw new IllegalArgumentException("bad min and max"); } - Connector conn = clientOpts.getConnector(); - if (!conn.tableOperations().exists(clientOpts.getTableName())) { - throw new TableNotFoundException(null, clientOpts.getTableName(), "Consult the README and create the table before starting ingest."); + Connector conn = env.getAccumuloConnector(); + String tableName = env.getAccumuloTableName(); + if (!conn.tableOperations().exists(tableName)) { + throw new TableNotFoundException(null, tableName, "Consult the README and create the table before starting ingest."); } - BatchWriter bw = conn.createBatchWriter(clientOpts.getTableName(), bwOpts.getBatchWriterConfig()); + BatchWriter bw = conn.createBatchWriter(tableName, env.getBatchWriterConfig()); bw = Trace.wrapAll(bw, new CountSampler(1024)); Random r = new Random(); @@ -117,61 +107,65 @@ public class ContinuousIngest { long lastFlushTime = System.currentTimeMillis(); + int maxColF = env.getMaxColF(); + int maxColQ = env.getMaxColQ(); + boolean checksum = Boolean.parseBoolean(props.getProperty(TestProps.CI_INGEST_CHECKSUM)); + long numEntries = Long.parseLong(props.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES)); + out: while (true) { // generate first set of nodes ColumnVisibility cv = getVisibility(r); for (int index = 0; index < flushInterval; index++) { - long rowLong = genLong(opts.min, opts.max, r); + long rowLong = genLong(rowMin, rowMax, r); prevRows[index] = rowLong; firstRows[index] = rowLong; - int cf = r.nextInt(opts.maxColF); - int cq = r.nextInt(opts.maxColQ); + int cf = r.nextInt(maxColF); + int cq = r.nextInt(maxColQ); firstColFams[index] = cf; firstColQuals[index] = cq; - Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum); + Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, + checksum); count++; bw.addMutation(m); } lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - if (count >= opts.num) + if (count >= numEntries) break out; // generate subsequent sets of nodes that link to previous set of nodes for (int depth = 1; depth < maxDepth; depth++) { for (int index = 0; index < flushInterval; index++) { - long rowLong = genLong(opts.min, opts.max, r); + long rowLong = genLong(rowMin, rowMax, r); byte[] prevRow = genRow(prevRows[index]); prevRows[index] = rowLong; - Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum); + Mutation m = genMutation(rowLong, r.nextInt(maxColF), r.nextInt(maxColQ), cv, ingestInstanceId, count, prevRow, checksum); count++; bw.addMutation(m); } lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - if (count >= opts.num) + if (count >= numEntries) break out; } // create one big linked list, this makes all of the first inserts // point to something for (int index = 0; index < flushInterval - 1; index++) { - Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r, - opts.checksum); + Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), checksum); count++; bw.addMutation(m); } lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - if (count >= opts.num) + if (count >= numEntries) break out; } bw.close(); - clientOpts.stopTracing(); } private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException { @@ -183,8 +177,9 @@ public class ContinuousIngest { return lastFlushTime; } - public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r, - boolean checksum) { + static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, + byte[] ingestInstanceId, long count, byte[] prevRow, + boolean checksum) { // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead CRC32 cksum = null; @@ -207,15 +202,15 @@ public class ContinuousIngest { return m; } - public static final long genLong(long min, long max, Random r) { - return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min; + static long genLong(long min, long max, Random r) { + return ((r.nextLong() & 0x7fffffffffffffffL) % (max - min)) + min; } - static final byte[] genRow(long min, long max, Random r) { + static byte[] genRow(long min, long max, Random r) { return genRow(genLong(min, max, r)); } - static final byte[] genRow(long rowLong) { + static byte[] genRow(long rowLong) { return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); } http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java index c2902ee..560e2ff 100644 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java @@ -19,12 +19,11 @@ package org.apache.accumulo.testing.core.continuous; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; +import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.UUID; -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; @@ -34,6 +33,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.testing.core.TestProps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; @@ -43,13 +43,11 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.validators.PositiveInteger; - /** - * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to - * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes. - * + * A map only job that reads a table created by continuous ingest and creates doubly linked list. + * This map reduce job tests the ability of a map only job to read and write to accumulo at the + * same time. This map reduce job mutates the table in such a way that it should not create any + * undefined nodes. */ public class ContinuousMoru extends Configured implements Tool { private static final String PREFIX = ContinuousMoru.class.getSimpleName() + "."; @@ -59,8 +57,8 @@ public class ContinuousMoru extends Configured implements Tool { private static final String MIN = PREFIX + "MIN"; private static final String CI_ID = PREFIX + "CI_ID"; - static enum Counts { - SELF_READ; + enum Counts { + SELF_READ } public static class CMapper extends Mapper { @@ -105,43 +103,36 @@ public class ContinuousMoru extends Configured implements Tool { if (offset > 0) { long rowLong = Long.parseLong(new String(val, offset, 16, UTF_8), 16); Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData() - .toArray(), random, true); + .toArray(), true); context.write(null, m); } } else { - context.getCounter(Counts.SELF_READ).increment(1l); + context.getCounter(Counts.SELF_READ).increment(1L); } } } - static class Opts extends ContinuousOpts { - @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class) - short maxColF = Short.MAX_VALUE; - - @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class) - short maxColQ = Short.MAX_VALUE; - - @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class) - int maxMaps = 0; - } - @Override public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException { - Opts opts = new Opts(); - BatchWriterOpts bwOpts = new BatchWriterOpts(); - MapReduceClientOnDefaultTable clientOpts = new MapReduceClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts, opts); + + Properties props = TestProps.loadFromFile(args[0]); + ContinuousEnv env = new ContinuousEnv(props); Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); job.setJarByClass(this.getClass()); job.setInputFormatClass(AccumuloInputFormat.class); - clientOpts.setAccumuloConfigs(job); + + AccumuloInputFormat.setConnectorInfo(job, env.getAccumuloUserName(), env.getToken()); + AccumuloInputFormat.setInputTableName(job, env.getAccumuloTableName()); + AccumuloInputFormat.setZooKeeperInstance(job, env.getClientConfiguration()); + + int maxMaps = Integer.parseInt(props.getProperty(TestProps.CI_VERIFY_MAX_MAPS)); // set up ranges try { - Set ranges = clientOpts.getConnector().tableOperations().splitRangeByTablets(clientOpts.getTableName(), new Range(), opts.maxMaps); + Set ranges = env.getAccumuloConnector().tableOperations().splitRangeByTablets(env.getAccumuloTableName(), new Range(), maxMaps); AccumuloInputFormat.setRanges(job, ranges); AccumuloInputFormat.setAutoAdjustRanges(job, false); } catch (Exception e) { @@ -149,31 +140,28 @@ public class ContinuousMoru extends Configured implements Tool { } job.setMapperClass(CMapper.class); - job.setNumReduceTasks(0); - job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig()); + AccumuloOutputFormat.setBatchWriterOptions(job, env.getBatchWriterConfig()); + AccumuloOutputFormat.setConnectorInfo(job, env.getAccumuloUserName(), env.getToken()); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, env.getAccumuloTableName()); + AccumuloOutputFormat.setZooKeeperInstance(job, env.getClientConfiguration()); Configuration conf = job.getConfiguration(); - conf.setLong(MIN, opts.min); - conf.setLong(MAX, opts.max); - conf.setInt(MAX_CF, opts.maxColF); - conf.setInt(MAX_CQ, opts.maxColQ); + conf.setLong(MIN, env.getRowMin()); + conf.setLong(MAX, env.getRowMax()); + conf.setInt(MAX_CF, env.getMaxColF()); + conf.setInt(MAX_CQ, env.getMaxColQ()); conf.set(CI_ID, UUID.randomUUID().toString()); job.waitForCompletion(true); - clientOpts.stopTracing(); return job.isSuccessful() ? 0 : 1; } - /** - * - * @param args - * instanceName zookeepers username password table columns outputpath - */ public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args); + ContinuousEnv env = new ContinuousEnv(TestProps.loadFromFile(args[0])); + int res = ToolRunner.run(env.getHadoopConfiguration(), new ContinuousMoru(), args); if (res != 0) System.exit(res); } http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java deleted file mode 100644 index 8180383..0000000 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.testing.core.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.Map.Entry; -import java.util.Random; - -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.cli.ClientOpts.TimeConverter; -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.io.Text; - -import com.beust.jcommander.Parameter; - -public class ContinuousQuery { - - public static class Opts extends ContinuousOpts { - @Parameter(names = "--sleep", description = "the time to wait between queries", converter = TimeConverter.class) - long sleepTime = 100; - } - - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - ScannerOpts scanOpts = new ScannerOpts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts, opts); - - Connector conn = clientOpts.getConnector(); - Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), clientOpts.auths); - scanner.setBatchSize(scanOpts.scanBatchSize); - - Random r = new Random(); - - while (true) { - byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r); - - int count = 0; - - long t1 = System.currentTimeMillis(); - scanner.setRange(new Range(new Text(row))); - for (Entry entry : scanner) { - ContinuousWalk.validate(entry.getKey(), entry.getValue()); - count++; - } - long t2 = System.currentTimeMillis(); - - System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, UTF_8), (t2 - t1), count); - - if (opts.sleepTime > 0) - Thread.sleep(opts.sleepTime); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java index 42e0ea8..162e64d 100644 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java @@ -20,49 +20,44 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.util.Iterator; import java.util.Map.Entry; +import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.cli.ScannerOpts; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.core.TestProps; import org.apache.hadoop.io.Text; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.validators.PositiveInteger; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; public class ContinuousScanner { - static class Opts extends ContinuousWalk.Opts { - @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class) - long numToScan = 0; - } - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - ScannerOpts scanOpts = new ScannerOpts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts, opts); + + Properties props = TestProps.loadFromFile(args[0]); + ContinuousEnv env = new ContinuousEnv(props); Random r = new Random(); long distance = 1000000000000l; - Connector conn = clientOpts.getConnector(); - Authorizations auths = opts.randomAuths.getAuths(r); - Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), auths); - scanner.setBatchSize(scanOpts.scanBatchSize); + Connector conn = env.getAccumuloConnector(); + Authorizations auths = env.getRandomAuthorizations(); + Scanner scanner = ContinuousUtil.createScanner(conn, env.getAccumuloTableName(), auths); + scanner.setBatchSize(env.getScannerBatchSize()); - double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0)); + int numToScan = Integer.parseInt(props.getProperty(TestProps.CI_SCANNER_ENTRIES)); + int scannerSleepMs = Integer.parseInt(props.getProperty(TestProps.CI_SCANNER_SLEEP_MS)); + + double delta = Math.min(.05, .05 / (numToScan / 1000.0)); while (true) { - long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r); + long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance, r); byte[] scanStart = ContinuousIngest.genRow(startRow); byte[] scanStop = ContinuousIngest.genRow(startRow + distance); @@ -83,13 +78,13 @@ public class ContinuousScanner { // System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan); - if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) { + if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) { if (count == 0) { distance = distance * 10; if (distance < 0) distance = 1000000000000l; } else { - double ratio = (double) opts.numToScan / count; + double ratio = (double) numToScan / count; // move ratio closer to 1 to make change slower ratio = ratio - (ratio - 1.0) * (2.0 / 3.0); distance = (long) (ratio * distance); @@ -100,8 +95,9 @@ public class ContinuousScanner { System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count); - if (opts.sleepTime > 0) - sleepUninterruptibly(opts.sleepTime, TimeUnit.MILLISECONDS); + if (scannerSleepMs > 0) { + sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS); + } } } http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java deleted file mode 100644 index 818e387..0000000 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.testing.core.continuous; - -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.Timer; -import java.util.TimerTask; - -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.impl.ClientContext; -import org.apache.accumulo.core.client.impl.Credentials; -import org.apache.accumulo.core.client.impl.MasterClient; -import org.apache.accumulo.core.client.impl.Tables; -import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.iterators.ColumnFamilyCounter; -import org.apache.accumulo.core.master.thrift.MasterClientService; -import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.trace.Tracer; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.Stat; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.cli.ClientOnRequiredTable; -import org.apache.accumulo.server.conf.ServerConfigurationFactory; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.fs.VolumeManagerImpl; -import org.apache.accumulo.server.util.TableInfoUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.ClusterStatus; -import org.apache.hadoop.mapred.JobClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ContinuousStatsCollector { - - private static final Logger log = LoggerFactory.getLogger(ContinuousStatsCollector.class); - - static class StatsCollectionTask extends TimerTask { - - private final String tableId; - private final Opts opts; - private final int scanBatchSize; - - public StatsCollectionTask(Opts opts, int scanBatchSize) { - this.opts = opts; - this.scanBatchSize = scanBatchSize; - this.tableId = Tables.getNameToIdMap(opts.getInstance()).get(opts.getTableName()); - System.out - .println("TIME TABLET_SERVERS TOTAL_ENTRIES TOTAL_INGEST TOTAL_QUERY TABLE_RECS TABLE_RECS_IN_MEM TABLE_INGEST TABLE_QUERY TABLE_TABLETS TABLE_TABLETS_ONLINE" - + " ACCUMULO_DU ACCUMULO_DIRS ACCUMULO_FILES TABLE_DU TABLE_DIRS TABLE_FILES" - + " MAP_TASK MAX_MAP_TASK REDUCE_TASK MAX_REDUCE_TASK TASK_TRACKERS BLACK_LISTED MIN_FILES/TABLET MAX_FILES/TABLET AVG_FILES/TABLET STDDEV_FILES/TABLET"); - } - - @Override - public void run() { - try { - String acuStats = getACUStats(); - String fsStats = getFSStats(); - String mrStats = getMRStats(); - String tabletStats = getTabletStats(); - - System.out.println(System.currentTimeMillis() + " " + acuStats + " " + fsStats + " " + mrStats + " " + tabletStats); - } catch (Exception e) { - log.error(System.currentTimeMillis() + " - Failed to collect stats", e); - } - } - - private String getTabletStats() throws Exception { - - Connector conn = opts.getConnector(); - Scanner scanner = conn.createScanner(MetadataTable.NAME, opts.auths); - scanner.setBatchSize(scanBatchSize); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.addScanIterator(new IteratorSetting(1000, "cfc", ColumnFamilyCounter.class.getName())); - scanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - - Stat s = new Stat(); - - int count = 0; - for (Entry entry : scanner) { - count++; - s.addStat(Long.parseLong(entry.getValue().toString())); - } - - if (count > 0) - return String.format("%d %d %.3f %.3f", s.getMin(), s.getMax(), s.getAverage(), s.getStdDev()); - else - return "0 0 0 0"; - - } - - private String getFSStats() throws Exception { - VolumeManager fs = VolumeManagerImpl.get(); - long length1 = 0, dcount1 = 0, fcount1 = 0; - long length2 = 0, dcount2 = 0, fcount2 = 0; - for (String dir : ServerConstants.getTablesDirs()) { - ContentSummary contentSummary = fs.getContentSummary(new Path(dir)); - length1 += contentSummary.getLength(); - dcount1 += contentSummary.getDirectoryCount(); - fcount1 += contentSummary.getFileCount(); - contentSummary = fs.getContentSummary(new Path(dir, tableId)); - length2 += contentSummary.getLength(); - dcount2 += contentSummary.getDirectoryCount(); - fcount2 += contentSummary.getFileCount(); - } - - return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2; - } - - private String getACUStats() throws Exception { - - MasterClientService.Iface client = null; - while (true) { - try { - ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.getPrincipal(), opts.getToken()), new ServerConfigurationFactory( - opts.getInstance()).getConfiguration()); - client = MasterClient.getConnectionWithRetry(context); - MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds()); - - TableInfo all = new TableInfo(); - Map tableSummaries = new HashMap<>(); - - for (TabletServerStatus server : stats.tServerInfo) { - for (Entry info : server.tableMap.entrySet()) { - TableInfo tableSummary = tableSummaries.get(info.getKey()); - if (tableSummary == null) { - tableSummary = new TableInfo(); - tableSummaries.put(info.getKey(), tableSummary); - } - TableInfoUtil.add(tableSummary, info.getValue()); - TableInfoUtil.add(all, info.getValue()); - } - } - - TableInfo ti = tableSummaries.get(tableId); - - return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate + " " + (long) all.queryRate + " " + ti.recs + " " - + ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate + " " + ti.tablets + " " + ti.onlineTablets; - - } catch (ThriftNotActiveServiceException e) { - // Let it loop, fetching a new location - log.debug("Contacted a Master which is no longer active, retrying"); - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } finally { - if (client != null) - MasterClient.close(client); - } - } - } - - } - - private static String getMRStats() throws Exception { - Configuration conf = CachedConfiguration.getInstance(); - // No alternatives for hadoop 20 - JobClient jc = new JobClient(new org.apache.hadoop.mapred.JobConf(conf)); - - ClusterStatus cs = jc.getClusterStatus(false); - - return "" + cs.getMapTasks() + " " + cs.getMaxMapTasks() + " " + cs.getReduceTasks() + " " + cs.getMaxReduceTasks() + " " + cs.getTaskTrackers() + " " - + cs.getBlacklistedTrackers(); - - } - - static class Opts extends ClientOnRequiredTable {} - - public static void main(String[] args) { - Opts opts = new Opts(); - ScannerOpts scanOpts = new ScannerOpts(); - opts.parseArgs(ContinuousStatsCollector.class.getName(), args, scanOpts); - Timer jtimer = new Timer(); - - jtimer.schedule(new StatsCollectionTask(opts, scanOpts.scanBatchSize), 0, 30000); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java index 64f8a35..430bf3b 100644 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java @@ -22,16 +22,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Properties; import java.util.Random; import java.util.Set; -import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.testing.core.TestProps; import org.apache.accumulo.testing.core.continuous.ContinuousWalk.BadChecksumException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -47,13 +47,9 @@ import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.validators.PositiveInteger; - /** * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined. */ - public class ContinuousVerify extends Configured implements Tool { public static final VLongWritable DEF = new VLongWritable(-1); @@ -76,7 +72,7 @@ public class ContinuousVerify extends Configured implements Tool { try { ContinuousWalk.validate(key, data); } catch (BadChecksumException bce) { - context.getCounter(Counts.CORRUPT).increment(1l); + context.getCounter(Counts.CORRUPT).increment(1L); if (corrupt < 1000) { log.error("Bad checksum : " + key); } else if (corrupt == 1000) { @@ -100,7 +96,7 @@ public class ContinuousVerify extends Configured implements Tool { } } - public static enum Counts { + public enum Counts { UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT } @@ -131,95 +127,82 @@ public class ContinuousVerify extends Configured implements Tool { } context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); - context.getCounter(Counts.UNDEFINED).increment(1l); + context.getCounter(Counts.UNDEFINED).increment(1L); } else if (defCount > 0 && refs.size() == 0) { - context.getCounter(Counts.UNREFERENCED).increment(1l); + context.getCounter(Counts.UNREFERENCED).increment(1L); } else { - context.getCounter(Counts.REFERENCED).increment(1l); + context.getCounter(Counts.REFERENCED).increment(1L); } } } - static class Opts extends MapReduceClientOnDefaultTable { - @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist") - String outputDir = "/tmp/continuousVerify"; - - @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", validateWith = PositiveInteger.class) - int maxMaps = 1; - - @Parameter(names = "--reducers", description = "the number of reducers to use", validateWith = PositiveInteger.class) - int reducers = 1; - - @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline") - boolean scanOffline = false; - - public Opts() { - super("ci"); - } - } - @Override public int run(String[] args) throws Exception { - Opts opts = new Opts(); - opts.parseArgs(this.getClass().getName(), args); + + Properties props = TestProps.loadFromFile(args[0]); + ContinuousEnv env = new ContinuousEnv(props); Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); job.setJarByClass(this.getClass()); job.setInputFormatClass(AccumuloInputFormat.class); - opts.setAccumuloConfigs(job); - Set ranges = null; - String clone = opts.getTableName(); - Connector conn = null; + boolean scanOffline = Boolean.parseBoolean(props.getProperty(TestProps.CI_VERIFY_SCAN_OFFLINE)); + String tableName = env.getAccumuloTableName(); + int maxMaps = Integer.parseInt(props.getProperty(TestProps.CI_VERIFY_MAX_MAPS)); + int reducers = Integer.parseInt(props.getProperty(TestProps.CI_VERIFY_REDUCERS)); + String outputDir = props.getProperty(TestProps.CI_VERIFY_OUTPUT_DIR); + + Set ranges; + String clone = ""; + Connector conn = env.getAccumuloConnector(); - if (opts.scanOffline) { + if (scanOffline) { Random random = new Random(); - clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl)); - conn = opts.getConnector(); - conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap(), new HashSet()); - ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + clone = tableName + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffL)); + conn.tableOperations().clone(tableName, clone, true, new HashMap<>(), new HashSet<>()); + ranges = conn.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps); conn.tableOperations().offline(clone); AccumuloInputFormat.setInputTableName(job, clone); AccumuloInputFormat.setOfflineTableScan(job, true); } else { - ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + ranges = conn.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps); + AccumuloInputFormat.setInputTableName(job, tableName); } - + AccumuloInputFormat.setRanges(job, ranges); AccumuloInputFormat.setAutoAdjustRanges(job, false); + AccumuloInputFormat.setConnectorInfo(job, env.getAccumuloUserName(), env.getToken()); + AccumuloInputFormat.setZooKeeperInstance(job, env.getClientConfiguration()); job.setMapperClass(CMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(VLongWritable.class); job.setReducerClass(CReducer.class); - job.setNumReduceTasks(opts.reducers); + job.setNumReduceTasks(reducers); job.setOutputFormatClass(TextOutputFormat.class); - job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline); + job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", scanOffline); - TextOutputFormat.setOutputPath(job, new Path(opts.outputDir)); + TextOutputFormat.setOutputPath(job, new Path(outputDir)); job.waitForCompletion(true); - if (opts.scanOffline) { + if (scanOffline) { conn.tableOperations().delete(clone); } - opts.stopTracing(); return job.isSuccessful() ? 0 : 1; } - /** - * - * @param args - * instanceName zookeepers username password table columns outputpath - */ public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args); + + ContinuousEnv env = new ContinuousEnv(TestProps.loadFromFile(args[0])); + + int res = ToolRunner.run(env.getHadoopConfiguration(), new ContinuousVerify(), args); if (res != 0) System.exit(res); } http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java index 2335fd4..49c10c9 100644 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java @@ -18,106 +18,49 @@ package org.apache.accumulo.testing.core.continuous; import static java.nio.charset.StandardCharsets.UTF_8; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map.Entry; +import java.util.Properties; import java.util.Random; import java.util.zip.CRC32; -import org.apache.accumulo.core.cli.ClientOnDefaultTable; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.accumulo.testing.core.TestProps; import org.apache.hadoop.io.Text; -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.Parameter; - public class ContinuousWalk { - static public class Opts extends ContinuousQuery.Opts { - class RandomAuthsConverter implements IStringConverter { - @Override - public RandomAuths convert(String value) { - try { - return new RandomAuths(value); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - @Parameter(names = "--authsFile", description = "read the authorities to use from a file") - RandomAuths randomAuths = new RandomAuths(); - } - static class BadChecksumException extends RuntimeException { private static final long serialVersionUID = 1L; - public BadChecksumException(String msg) { + BadChecksumException(String msg) { super(msg); } } - static class RandomAuths { - private List auths; - - RandomAuths() { - auths = Collections.singletonList(Authorizations.EMPTY); - } - - RandomAuths(String file) throws IOException { - if (file == null) { - auths = Collections.singletonList(Authorizations.EMPTY); - return; - } - - auths = new ArrayList<>(); - - FileSystem fs = FileSystem.get(new Configuration()); - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), UTF_8)); - try { - String line; - while ((line = in.readLine()) != null) { - auths.add(new Authorizations(line.split(","))); - } - } finally { - in.close(); - } - } - - Authorizations getAuths(Random r) { - return auths.get(r.nextInt(auths.size())); - } - } - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousWalk.class.getName(), args, opts); - Connector conn = clientOpts.getConnector(); + Properties props = TestProps.loadFromFile(args[0]); + ContinuousEnv env = new ContinuousEnv(props); + + Connector conn = env.getAccumuloConnector(); Random r = new Random(); ArrayList values = new ArrayList<>(); + int sleepTime = Integer.parseInt(props.getProperty(TestProps.CI_WALKER_SLEEP_MS)); + while (true) { - Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), opts.randomAuths.getAuths(r)); - String row = findAStartRow(opts.min, opts.max, scanner, r); + Scanner scanner = ContinuousUtil.createScanner(conn, env.getAccumuloTableName(), env.getRandomAuthorizations()); + String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, r); while (row != null) { @@ -146,12 +89,12 @@ public class ContinuousWalk { row = null; } - if (opts.sleepTime > 0) - Thread.sleep(opts.sleepTime); + if (sleepTime > 0) + Thread.sleep(sleepTime); } - if (opts.sleepTime > 0) - Thread.sleep(opts.sleepTime); + if (sleepTime > 0) + Thread.sleep(sleepTime); } } @@ -197,7 +140,7 @@ public class ContinuousWalk { return -1; } - static String getPrevRow(Value value) { + private static String getPrevRow(Value value) { byte[] val = value.get(); int offset = getPrevRowOffset(val); @@ -208,7 +151,7 @@ public class ContinuousWalk { return null; } - static int getChecksumOffset(byte val[]) { + private static int getChecksumOffset(byte val[]) { if (val[val.length - 1] != ':') { if (val[val.length - 9] != ':') throw new IllegalArgumentException(new String(val, UTF_8));