From commits-return-4275-archive-asf-public=cust-asf.ponee.io@bigtop.apache.org Sat Apr 20 18:31:31 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id DF5AE180629 for ; Sat, 20 Apr 2019 20:31:30 +0200 (CEST) Received: (qmail 7015 invoked by uid 500); 20 Apr 2019 18:31:30 -0000 Mailing-List: contact commits-help@bigtop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bigtop-dev@bigtop.apache.org Delivered-To: mailing list commits@bigtop.apache.org Received: (qmail 7006 invoked by uid 99); 20 Apr 2019 18:31:30 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Apr 2019 18:31:30 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B104987A1C; Sat, 20 Apr 2019 18:31:26 +0000 (UTC) Date: Sat, 20 Apr 2019 18:31:26 +0000 To: "commits@bigtop.apache.org" Subject: [bigtop] branch branch-1.4 updated: BIGTOP-3209. Revert Kafka to 0.10.2.2 and Flume to 1.8.0 (#521) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155578508657.6822.10580491779810483028@gitbox.apache.org> From: evansye@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: bigtop X-Git-Refname: refs/heads/branch-1.4 X-Git-Reftype: branch X-Git-Oldrev: 25cd096f6bb4af2e9f8b6c63adda9fd2763c3c8b X-Git-Newrev: 46fa32c84dcd94c57b4a6ed2339d6b12791666a1 X-Git-Rev: 46fa32c84dcd94c57b4a6ed2339d6b12791666a1 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. evansye pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/bigtop.git The following commit(s) were added to refs/heads/branch-1.4 by this push: new 46fa32c BIGTOP-3209. Revert Kafka to 0.10.2.2 and Flume to 1.8.0 (#521) 46fa32c is described below commit 46fa32c84dcd94c57b4a6ed2339d6b12791666a1 Author: Evans Ye AuthorDate: Sun Apr 21 02:31:22 2019 +0800 BIGTOP-3209. Revert Kafka to 0.10.2.2 and Flume to 1.8.0 (#521) * Revert "BIGTOP-3185. Bump Kafka to 2.1.1 (#493)" This reverts commit ca52568591010884687ac7616d09d97c63a89ebf. * Revert "BIGTOP-3187. Bump Flume to 1.9.0" This reverts commit 173d80e381cf8b527812e321f1908d90aeba9823. * Revert "BIGTOP-3171. Update Kafka Puppet module for version 1.1.1 (#477)" This reverts commit 6d5705e3e023caae39410888713f7bbf77578e09. * Revert "BIGTOP-3164. Bump Kafka to 1.1.1 (#476)" This reverts commit 12cb0ad24edb29e8cdac8d571a0472814629a40f. --- bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml | 2 - .../puppet/modules/kafka/manifests/init.pp | 2 - .../modules/kafka/templates/server.properties | 4 - .../src/common/flume/patch0-FLUME-2662.diff | 13 +++ .../common/flume/patch1-FLUME-3026_rebased.diff | 99 ++++++++++++++++++++++ .../src/common/flume/patch2-scala-symbol.diff | 96 +++++++++++++++++++++ bigtop.bom | 4 +- 7 files changed, 210 insertions(+), 10 deletions(-) diff --git a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml index c2ed6db..9e7ce80 100644 --- a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml +++ b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml @@ -191,8 +191,6 @@ hadoop::common::tez_jars: "/usr/lib/tez" #kafka kafka::server::port: "9092" kafka::server::zookeeper_connection_string: "%{hiera('bigtop::hadoop_head_node')}:2181" -# Set to 3 for production deployment -# kafka::server::offsets_topic_replication_factor: 3 zeppelin::server::spark_master_url: "yarn-client" zeppelin::server::hiveserver2_url: "jdbc:hive2://%{hiera('hadoop-hive::common::hiveserver2_host')}:%{hiera('hadoop-hive::common::hiveserver2_port')}" diff --git a/bigtop-deploy/puppet/modules/kafka/manifests/init.pp b/bigtop-deploy/puppet/modules/kafka/manifests/init.pp index 0c26fef..f13dec1 100644 --- a/bigtop-deploy/puppet/modules/kafka/manifests/init.pp +++ b/bigtop-deploy/puppet/modules/kafka/manifests/init.pp @@ -27,8 +27,6 @@ class kafka { $bind_addr = undef, $port = "9092", $zookeeper_connection_string = "localhost:2181", - # Default to 1 for less than 3 nodes deployment to work. - $offsets_topic_replication_factor = 1, ) { package { 'kafka': diff --git a/bigtop-deploy/puppet/modules/kafka/templates/server.properties b/bigtop-deploy/puppet/modules/kafka/templates/server.properties index 30f9efb..a58c7b3 100644 --- a/bigtop-deploy/puppet/modules/kafka/templates/server.properties +++ b/bigtop-deploy/puppet/modules/kafka/templates/server.properties @@ -125,10 +125,6 @@ log.retention.check.interval.ms=60000 # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false -# The replication factor for the offsets topic (set higher to ensure availability). -# Internal topic creation will fail until the cluster size meets this replication factor requirement. -offsets.topic.replication.factor=<%= @offsets_topic_replication_factor %> - ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). diff --git a/bigtop-packages/src/common/flume/patch0-FLUME-2662.diff b/bigtop-packages/src/common/flume/patch0-FLUME-2662.diff new file mode 100644 index 0000000..61497b9 --- /dev/null +++ b/bigtop-packages/src/common/flume/patch0-FLUME-2662.diff @@ -0,0 +1,13 @@ +diff --git a/pom.xml b/pom.xml +index 3c82a47..bdd998d 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -58,7 +58,7 @@ limitations under the License. + 3.2.2 + 1.4.1 + 1.4 +- 2.1 ++ 2.4 + 2.5 + 2.6.0 + 10.11.1.1 diff --git a/bigtop-packages/src/common/flume/patch1-FLUME-3026_rebased.diff b/bigtop-packages/src/common/flume/patch1-FLUME-3026_rebased.diff new file mode 100644 index 0000000..20a2ddd --- /dev/null +++ b/bigtop-packages/src/common/flume/patch1-FLUME-3026_rebased.diff @@ -0,0 +1,99 @@ +diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +index 5e5f2d0..63607f7 100644 +--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ++++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +@@ -20,6 +20,7 @@ package org.apache.flume.channel.kafka; + + import com.google.common.collect.Lists; + import kafka.admin.AdminUtils; ++import kafka.admin.RackAwareMode; + import kafka.utils.ZKGroupTopicDirs; + import kafka.utils.ZkUtils; + import org.apache.commons.lang.RandomStringUtils; +@@ -883,7 +884,8 @@ public class TestKafkaChannel { + ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); + int replicationFactor = 1; + Properties topicConfig = new Properties(); +- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); ++ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig, ++ RackAwareMode.Disabled$.MODULE$); + } + + public static void deleteTopic(String topicName) { +diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +index d92c71f..66c6fe3 100644 +--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ++++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +@@ -21,6 +21,7 @@ package org.apache.flume.sink.kafka; + import com.google.common.base.Charsets; + + import kafka.admin.AdminUtils; ++import kafka.admin.RackAwareMode; + import kafka.message.MessageAndMetadata; + import kafka.utils.ZkUtils; + +@@ -674,7 +675,8 @@ public class TestKafkaSink { + ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); + int replicationFactor = 1; + Properties topicConfig = new Properties(); +- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); ++ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig, ++ RackAwareMode.Disabled$.MODULE$); + } + + public static void deleteTopic(String topicName) { +@@ -698,4 +700,4 @@ public class TestKafkaSink { + return newTopic; + } + +-} +\ No newline at end of file ++} +diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +index 53bd65c..ba75623 100644 +--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java ++++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +@@ -17,6 +17,7 @@ + package org.apache.flume.source.kafka; + + import kafka.admin.AdminUtils; ++import kafka.admin.RackAwareMode; + import kafka.server.KafkaConfig; + import kafka.server.KafkaServerStartable; + import kafka.utils.ZkUtils; +@@ -131,7 +132,8 @@ public class KafkaSourceEmbeddedKafka { + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + int replicationFactor = 1; + Properties topicConfig = new Properties(); +- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); ++ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig, ++ RackAwareMode.Disabled$.MODULE$); + } + + } +diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +index 7804fa2..2d5bbf8 100644 +--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ++++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +@@ -20,7 +20,7 @@ package org.apache.flume.source.kafka; + import com.google.common.base.Charsets; + import com.google.common.collect.Lists; + import junit.framework.Assert; +-import kafka.common.TopicExistsException; ++import org.apache.kafka.common.errors.TopicExistsException; + import kafka.utils.ZKGroupTopicDirs; + import kafka.utils.ZkUtils; + import org.apache.avro.io.BinaryEncoder; +diff --git a/pom.xml b/pom.xml +index 3c82a47..2276355 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -77,7 +77,7 @@ limitations under the License. + 6.1.26 + 2.9.9 + 4.10 +- 0.9.0.1 ++ 0.10.2.2 + 1.0.0 + 1.0.0 + 1.0.0 diff --git a/bigtop-packages/src/common/flume/patch2-scala-symbol.diff b/bigtop-packages/src/common/flume/patch2-scala-symbol.diff new file mode 100644 index 0000000..17833d4 --- /dev/null +++ b/bigtop-packages/src/common/flume/patch2-scala-symbol.diff @@ -0,0 +1,96 @@ +From f809342685fcf1e1a2dc0fc227de84ccb26dad10 Mon Sep 17 00:00:00 2001 +From: Anton Chevychalov +Date: Wed, 25 Oct 2017 15:47:48 +0300 +Subject: [PATCH] Fix kafka and Scala 2.11 trouble + +--- + .../org/apache/flume/channel/kafka/KafkaChannel.java | 4 ++-- + .../org/apache/flume/source/kafka/KafkaSource.java | 18 ++++++++++++++---- + 2 files changed, 16 insertions(+), 6 deletions(-) + +diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +index 5bd9be0..46494fd 100644 +--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ++++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +@@ -77,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicReference; + + import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; +-import static scala.collection.JavaConverters.asJavaListConverter; ++import scala.collection.JavaConverters; + + public class KafkaChannel extends BasicChannelSemantics { + +@@ -357,7 +357,7 @@ public class KafkaChannel extends BasicChannelSemantics { + private Map getZookeeperOffsets(ZkUtils client) { + Map offsets = new HashMap<>(); + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); +- List partitions = asJavaListConverter( ++ List partitions = JavaConverters.seqAsJavaListConverter( + client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); + for (String partition : partitions) { + TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); +diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +index ffdc96e..960e9e8 100644 +--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ++++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +@@ -28,8 +28,10 @@ import java.util.Properties; + import java.util.UUID; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.regex.Pattern; ++import java.util.stream.Collectors; + + import com.google.common.annotations.VisibleForTesting; ++import kafka.cluster.Broker; + import kafka.cluster.BrokerEndPoint; + import kafka.utils.ZKGroupTopicDirs; + import kafka.utils.ZkUtils; +@@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; + import org.apache.kafka.clients.consumer.OffsetAndMetadata; + import org.apache.kafka.common.PartitionInfo; + import org.apache.kafka.common.TopicPartition; ++import org.apache.kafka.common.network.ListenerName; + import org.apache.kafka.common.protocol.SecurityProtocol; + import org.apache.kafka.common.security.JaasUtils; + import org.slf4j.Logger; +@@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory; + + import com.google.common.base.Optional; + import scala.Option; ++import scala.collection.Seq; + + import static org.apache.flume.source.kafka.KafkaSourceConstants.*; +-import static scala.collection.JavaConverters.asJavaListConverter; ++import scala.collection.JavaConverters; + + /** + * A Source for Kafka which reads messages from kafka topics. +@@ -464,8 +468,14 @@ public class KafkaSource extends AbstractPollableSource + ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, + JaasUtils.isZkSecurityEnabled()); + try { +- List endPoints = +- asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava(); ++ Seq allBrokersInCluster = zkUtils.getAllBrokersInCluster(); ++ List brokerList = JavaConverters.seqAsJavaListConverter( ++ zkUtils.getAllBrokersInCluster()).asJava(); ++ List endPoints = brokerList.stream() ++ .map(broker -> broker.getBrokerEndPoint( ++ ListenerName.forSecurityProtocol(securityProtocol)) ++ ) ++ .collect(Collectors.toList()); + List connections = new ArrayList<>(); + for (BrokerEndPoint endPoint : endPoints) { + connections.add(endPoint.connectionString()); +@@ -597,7 +607,7 @@ public class KafkaSource extends AbstractPollableSource + String topicStr) { + Map offsets = new HashMap<>(); + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); +- List partitions = asJavaListConverter( ++ List partitions = JavaConverters.seqAsJavaListConverter( + client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); + for (String partition : partitions) { + TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); +-- +1.9.1 + diff --git a/bigtop.bom b/bigtop.bom index b4d3e02..0339284 100644 --- a/bigtop.bom +++ b/bigtop.bom @@ -235,7 +235,7 @@ bigtop { 'flume' { name = 'flume' relNotes = 'Apache Flume' - version { base = '1.9.0'; pkg = base; release = 1 } + version { base = '1.8.0'; pkg = base; release = 1 } tarball { destination = "apache-$name-${version.base}-src.tar.gz" source = destination } url { download_path = "/$name/${version.base}/" @@ -351,7 +351,7 @@ bigtop { 'kafka' { name = 'kafka' relNotes = 'Apache Kafka' - version { base = '2.1.1'; pkg = base; release = 1 } + version { base = '0.10.2.2'; pkg = base; release = 1 } tarball { destination = "$name-${version.base}.tar.gz" source = "$name-${version.base}-src.tgz" } url { download_path = "/$name/${version.base}/"