kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: ensure original use of prop_file in verifiable producer
Date Fri, 08 Apr 2016 01:17:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9c34df151 -> 8b9b07e5d


MINOR: ensure original use of prop_file in verifiable producer

This PR: https://github.com/apache/kafka/pull/958 fixed the use of prop_file in the situation
when we have multiple producers (before, every producer will add to the config). However,
it assumes that self.prop_file is initially "". This is correct for all existing tests, but
it precludes us from extending verifiable producer and adding more properties to the producer
config (same as console consumer).

This is a small PR to change the behavior to the original, but also make verifiable producer
use prop_file method to be consistent with console consumer.

Also few more fixes to verifiable producer came up during the review:
-- fixed each_produced_at_least() method
-- more straightforward use of compression types

granders please review.

Author: Anna Povzner <anna@confluent.io>

Reviewers: Geoff Anderson <geoff@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1192 from apovzner/fix_verifiable_producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b9b07e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b9b07e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b9b07e5

Branch: refs/heads/trunk
Commit: 8b9b07e5d6aed2552d1cdfba27b0211af39c691f
Parents: 9c34df1
Author: Anna Povzner <anna@confluent.io>
Authored: Thu Apr 7 18:17:40 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Apr 7 18:17:40 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/verifiable_producer.py | 34 +++++++++++---------
 1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b9b07e5/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index f2ea421..0096a34 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -52,12 +52,8 @@ class VerifiableProducer(BackgroundThreadService):
                num_nodes = 1
                * is_int_with_prefix recommended if num_nodes > 1, because otherwise each
producer
                will produce exactly same messages, and validation may miss missing messages.
-        :param compression_types: If None, all producers will not use compression; or a list
of one or
-        more compression types (including "none"). Each producer will pick a compression
type
-        from the list in round-robin fashion. Example: compression_types = ["none", "snappy"]
and
-        num_nodes = 3, then producer 1 and 2 will not use compression, and producer 3 will
use
-        compression type = snappy. If in this example, num_nodes is 1, then first (and only)
-        producer will not use compression.
+        :param compression_types: If None, all producers will not use compression; or a list
of
+        compression types, one per producer (could be "none").
         """
         super(VerifiableProducer, self).__init__(context, num_nodes)
 
@@ -67,30 +63,36 @@ class VerifiableProducer(BackgroundThreadService):
         self.throughput = throughput
         self.message_validator = message_validator
         self.compression_types = compression_types
+        if self.compression_types is not None:
+            assert len(self.compression_types) == num_nodes, "Specify one compression type
per node"
+
+        self.security_config = self.kafka.security_config.client_config()
 
         for node in self.nodes:
             node.version = version
         self.acked_values = []
         self.not_acked_values = []
         self.produced_count = {}
-        self.prop_file = ""
+
+    def prop_file(self, node):
+        idx = self.idx(node)
+        prop_file = str(self.security_config)
+        if self.compression_types is not None:
+            compression_index = idx - 1
+            self.logger.info("VerifiableProducer (index = %d) will use compression type =
%s", idx,
+                             self.compression_types[compression_index])
+            prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index]
+        return prop_file
 
     def _worker(self, idx, node):
         node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
 
         # Create and upload log properties
-        self.security_config = self.kafka.security_config.client_config(self.prop_file)
-        producer_prop_file = str(self.security_config)
         log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
         node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
 
         # Create and upload config file
-        if self.compression_types is not None:
-            compression_index = (idx - 1) % len(self.compression_types)
-            self.logger.info("VerifiableProducer (index = %d) will use compression type =
%s", idx,
-                             self.compression_types[compression_index])
-            producer_prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index]
-
+        producer_prop_file = self.prop_file(node)
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)
@@ -197,7 +199,7 @@ class VerifiableProducer(BackgroundThreadService):
 
     def each_produced_at_least(self, count):
         with self.lock:
-            for idx in range(1, self.num_nodes):
+            for idx in range(1, self.num_nodes + 1):
                 if self.produced_count.get(idx) is None or self.produced_count[idx] <
count:
                     return False
             return True


Mime
View raw message