flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [1/2] flink git commit: [FLINK-8979] [test] Refactor Kafka-related common end-to-end test scripts
Date Tue, 03 Apr 2018 07:39:48 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.5 44506326f -> 6ec1aab0a


[FLINK-8979] [test] Refactor Kafka-related common end-to-end test scripts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bfc8bec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bfc8bec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bfc8bec

Branch: refs/heads/release-1.5
Commit: 2bfc8bec91ea8ec71945d8d6f1440430d8d9ab07
Parents: 4450632
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Tue Mar 27 18:06:19 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Apr 3 15:39:28 2018 +0800

----------------------------------------------------------------------
 .../test-scripts/kafka-common.sh                | 74 ++++++++++++++++++++
 .../test-scripts/test_resume_savepoint.sh       | 34 ++-------
 .../test-scripts/test_streaming_kafka010.sh     | 51 ++++----------
 3 files changed, 91 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2bfc8bec/flink-end-to-end-tests/test-scripts/kafka-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh
new file mode 100644
index 0000000..7f05357
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -0,0 +1,74 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+    echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+    exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1)
=~ .*Node\ does\ not\ exist.* ]]; do
+    echo "Waiting for broker..."
+    sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor
$1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
$2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $2 --from-beginning
--max-messages $1 2> /dev/null
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2bfc8bec/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index 83e0e5a..6642ad5 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -23,27 +23,10 @@ if [ -z $1 ] || [ -z $2 ]; then
 fi
 
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
 
-# get Kafka 0.10.0
-mkdir -p $TEST_DATA_DIR
-if [ -z "$3" ]; then
-  # need to download Kafka because no Kafka was specified on the invocation
-  KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
-  echo "Downloading Kafka from $KAFKA_URL"
-  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
-else
-  echo "Using specified Kafka from $3"
-  cp $3 $TEST_DATA_DIR/kafka.tgz
-fi
-
-tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
-KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
-
-# fix kafka config
-sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties
-sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties
-$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties
-$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties
+setup_kafka_dist
+start_kafka_cluster
 
 ORIGINAL_DOP=$1
 NEW_DOP=$2
@@ -67,8 +50,7 @@ start_cluster
 
 # make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster
and our moodifications
 function test_cleanup {
-  $KAFKA_DIR/bin/kafka-server-stop.sh
-  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+  stop_kafka_cluster
 
   # revert our modifications to the Flink distribution
   rm $FLINK_DIR/conf/flink-conf.yaml
@@ -81,14 +63,8 @@ function test_cleanup {
 trap test_cleanup INT
 trap test_cleanup EXIT
 
-# zookeeper outputs the "Node does not exist" bit to stderr
-while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1)
=~ .*Node\ does\ not\ exist.* ]]; do
-  echo "Waiting for broker..."
-  sleep 1
-done
-
 # create the required topic
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic test-input
+create_kafka_topic 1 1 test-input
 
 # run the state machine example job
 STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $FLINK_DIR/examples/streaming/StateMachineExample.jar
\

http://git-wip-us.apache.org/repos/asf/flink/blob/2bfc8bec/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index a6a9a8e..e09be35 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -18,51 +18,25 @@
 ################################################################################
 
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
 
-start_cluster
-
-# get Kafka 0.10.0
-mkdir -p $TEST_DATA_DIR
-if [ -z "$3" ]; then
-  # need to download Kafka because no Kafka was specified on the invocation
-  KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
-  echo "Downloading Kafka from $KAFKA_URL"
-  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
-else
-  echo "Using specified Kafka from $3"
-  cp $3 $TEST_DATA_DIR/kafka.tgz
-fi
-
-tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
-KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+setup_kafka_dist
+start_kafka_cluster
 
-# fix kafka config
-sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties
-sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties
-$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties
-$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties
-
-# make sure to stop Kafka and ZooKeeper at the end
+start_cluster
 
-function kafka_cleanup {
-  $KAFKA_DIR/bin/kafka-server-stop.sh
-  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+function test_cleanup {
+  stop_kafka_cluster
 
   # make sure to run regular cleanup as well
   cleanup
 }
-trap kafka_cleanup INT
-trap kafka_cleanup EXIT
-
-# zookeeper outputs the "Node does not exist" bit to stderr
-while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1)
=~ .*Node\ does\ not\ exist.* ]]; do
-  echo "Waiting for broker..."
-  sleep 1
-done
+trap test_cleanup INT
+trap test_cleanup EXIT
 
 # create the required topics
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic test-input
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic test-output
+create_kafka_topic 1 1 test-input
+create_kafka_topic 1 1 test-output
 
 # run the Flink job (detached mode)
 $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
@@ -71,9 +45,8 @@ $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar
\
   --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
--auto.offset.reset earliest
 
 # send some data to Kafka
-echo -e "hello,45218\nwhats,46213\nup,51348" | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic test-input
-
-DATA_FROM_KAFKA=$($KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic test-output --from-beginning --max-messages 3 2> /dev/null)
+send_messages_to_kafka "hello,45218\nwhats,46213\nup,51348" test-input
+DATA_FROM_KAFKA=$(read_messages_from_kafka 3 test-output)
 
 # make sure we have actual newlines in the string, not "\n"
 EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348")


Mime
View raw message