From commits-return-16700-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Apr 3 09:39:50 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6672D18064D for ; Tue, 3 Apr 2018 09:39:49 +0200 (CEST) Received: (qmail 95842 invoked by uid 500); 3 Apr 2018 07:39:48 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 95824 invoked by uid 99); 3 Apr 2018 07:39:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Apr 2018 07:39:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 244BDE0627; Tue, 3 Apr 2018 07:39:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tzulitai@apache.org To: commits@flink.apache.org Date: Tue, 03 Apr 2018 07:39:48 -0000 Message-Id: <9549563e0d294ddda58d12a4af0becee@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-8979] [test] Refactor Kafka-related common end-to-end test scripts Repository: flink Updated Branches: refs/heads/release-1.5 44506326f -> 6ec1aab0a [FLINK-8979] [test] Refactor Kafka-related common end-to-end test scripts Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bfc8bec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bfc8bec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bfc8bec Branch: refs/heads/release-1.5 Commit: 2bfc8bec91ea8ec71945d8d6f1440430d8d9ab07 Parents: 4450632 Author: Tzu-Li (Gordon) Tai Authored: Tue Mar 27 18:06:19 2018 +0800 Committer: Tzu-Li (Gordon) Tai Committed: Tue Apr 3 15:39:28 2018 +0800 ---------------------------------------------------------------------- .../test-scripts/kafka-common.sh | 74 ++++++++++++++++++++ .../test-scripts/test_resume_savepoint.sh | 34 ++------- .../test-scripts/test_streaming_kafka010.sh | 51 ++++---------- 3 files changed, 91 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2bfc8bec/flink-end-to-end-tests/test-scripts/kafka-common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh new file mode 100644 index 0000000..7f05357 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +set -o pipefail + +if [[ -z $TEST_DATA_DIR ]]; then + echo "Must run common.sh before kafka-common.sh." + exit 1 +fi + +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +function setup_kafka_dist { + # download Kafka + mkdir -p $TEST_DATA_DIR + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz" + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz + + tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ + + # fix kafka config + sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties + sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +} + +function start_kafka_cluster { + if [[ -z $KAFKA_DIR ]]; then + echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster" + exit 1 + fi + + $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties + $KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + + # zookeeper outputs the "Node does not exist" bit to stderr + while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 + done +} + +function stop_kafka_cluster { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh +} + +function create_kafka_topic { + $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor $1 --partitions $2 --topic $3 +} + +function send_messages_to_kafka { + echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $2 +} + +function read_messages_from_kafka { + $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $2 --from-beginning --max-messages $1 2> /dev/null +} http://git-wip-us.apache.org/repos/asf/flink/blob/2bfc8bec/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh index 83e0e5a..6642ad5 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh @@ -23,27 +23,10 @@ if [ -z $1 ] || [ -z $2 ]; then fi source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/kafka-common.sh -# get Kafka 0.10.0 -mkdir -p $TEST_DATA_DIR -if [ -z "$3" ]; then - # need to download Kafka because no Kafka was specified on the invocation - KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz" - echo "Downloading Kafka from $KAFKA_URL" - curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz -else - echo "Using specified Kafka from $3" - cp $3 $TEST_DATA_DIR/kafka.tgz -fi - -tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ -KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 - -# fix kafka config -sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties -sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties -$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties -$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties +setup_kafka_dist +start_kafka_cluster ORIGINAL_DOP=$1 NEW_DOP=$2 @@ -67,8 +50,7 @@ start_cluster # make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications function test_cleanup { - $KAFKA_DIR/bin/kafka-server-stop.sh - $KAFKA_DIR/bin/zookeeper-server-stop.sh + stop_kafka_cluster # revert our modifications to the Flink distribution rm $FLINK_DIR/conf/flink-conf.yaml @@ -81,14 +63,8 @@ function test_cleanup { trap test_cleanup INT trap test_cleanup EXIT -# zookeeper outputs the "Node does not exist" bit to stderr -while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do - echo "Waiting for broker..." - sleep 1 -done - # create the required topic -$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input +create_kafka_topic 1 1 test-input # run the state machine example job STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $FLINK_DIR/examples/streaming/StateMachineExample.jar \ http://git-wip-us.apache.org/repos/asf/flink/blob/2bfc8bec/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh index a6a9a8e..e09be35 100755 --- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh +++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh @@ -18,51 +18,25 @@ ################################################################################ source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/kafka-common.sh -start_cluster - -# get Kafka 0.10.0 -mkdir -p $TEST_DATA_DIR -if [ -z "$3" ]; then - # need to download Kafka because no Kafka was specified on the invocation - KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz" - echo "Downloading Kafka from $KAFKA_URL" - curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz -else - echo "Using specified Kafka from $3" - cp $3 $TEST_DATA_DIR/kafka.tgz -fi - -tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ -KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 +setup_kafka_dist +start_kafka_cluster -# fix kafka config -sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties -sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties -$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties -$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties - -# make sure to stop Kafka and ZooKeeper at the end +start_cluster -function kafka_cleanup { - $KAFKA_DIR/bin/kafka-server-stop.sh - $KAFKA_DIR/bin/zookeeper-server-stop.sh +function test_cleanup { + stop_kafka_cluster # make sure to run regular cleanup as well cleanup } -trap kafka_cleanup INT -trap kafka_cleanup EXIT - -# zookeeper outputs the "Node does not exist" bit to stderr -while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do - echo "Waiting for broker..." - sleep 1 -done +trap test_cleanup INT +trap test_cleanup EXIT # create the required topics -$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input -$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-output +create_kafka_topic 1 1 test-input +create_kafka_topic 1 1 test-output # run the Flink job (detached mode) $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \ @@ -71,9 +45,8 @@ $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \ --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest # send some data to Kafka -echo -e "hello,45218\nwhats,46213\nup,51348" | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-input - -DATA_FROM_KAFKA=$($KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-output --from-beginning --max-messages 3 2> /dev/null) +send_messages_to_kafka "hello,45218\nwhats,46213\nup,51348" test-input +DATA_FROM_KAFKA=$(read_messages_from_kafka 3 test-output) # make sure we have actual newlines in the string, not "\n" EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348")