brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [1/2] incubator-brooklyn git commit: Kafka - fix escaping
Date Fri, 24 Jul 2015 04:49:43 GMT
Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master a1efa470d -> be9ca632c


Kafka - fix escaping


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/1e8899f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/1e8899f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/1e8899f0

Branch: refs/heads/master
Commit: 1e8899f0a6427da6e11a180b69aa620ed52d2dfa
Parents: efaceed
Author: Valentin Aitken <valentin.aitken@cloudsoftcorp.com>
Authored: Fri Jul 17 16:26:28 2015 +0100
Committer: Valentin Aitken <valentin.aitken@cloudsoftcorp.com>
Committed: Thu Jul 23 11:23:24 2015 +0300

----------------------------------------------------------------------
 .../messaging/kafka/KafkaZooKeeperSshDriver.java    |  8 ++++++--
 .../messaging/kafka/KafkaIntegrationTest.java       | 13 +++++++++----
 .../entity/messaging/kafka/KafkaSupport.java        | 16 +++++-----------
 3 files changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
index fd57d95..d5b47e3 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
@@ -22,10 +22,11 @@ import java.util.Map;
 
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.zookeeper.ZooKeeperNode;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.collections.MutableMap;
 
+import static brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash;
+
 public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements KafkaZooKeeperDriver
{
 
     public KafkaZooKeeperSshDriver(KafkaZooKeeperImpl entity, SshMachineLocation machine)
{
@@ -72,7 +73,10 @@ public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements
         String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort();
         newScript(CUSTOMIZING)
                 .failOnNonZeroResultCode()
-                .body.append(String.format("./bin/%s  --create --zookeeper %s --replication-factor
1 --partitions 1 --topic %s", getTopicsScriptName(), zookeeperUrl, topic))
+                .body.append(String.format("./bin/%s  --create --zookeeper \"%s\" --replication-factor
1 --partitions 1 --topic \"%s\"",
+                                           getTopicsScriptName(),
+                                           escapeLiteralForDoubleQuotedBash(zookeeperUrl),
+                                           escapeLiteralForDoubleQuotedBash(topic)))
                 .execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
index 9f490d9..c6487e5 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
@@ -125,11 +125,16 @@ public class KafkaIntegrationTest {
 
         Entities.dumpInfo(cluster);
 
-        KafkaSupport support = new KafkaSupport(cluster);
+        final KafkaSupport support = new KafkaSupport(cluster);
 
         support.sendMessage("brooklyn", "TEST_MESSAGE");
-        Thread.sleep(Duration.seconds(5).toMilliseconds());
-        String message = support.getMessage("brooklyn");
-        assertEquals(message, "TEST_MESSAGE");
+
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_SECONDS), new Runnable()
{
+            @Override
+            public void run() {
+                String message = support.getMessage("brooklyn");
+                assertEquals(message, "TEST_MESSAGE");
+            }
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index c80befa..e035784 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -22,7 +22,6 @@ import brooklyn.entity.Entity;
 import brooklyn.entity.basic.EntityPredicates;
 import brooklyn.entity.zookeeper.ZooKeeperNode;
 
-import brooklyn.util.time.Duration;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
@@ -66,16 +65,11 @@ public class KafkaSupport {
             props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
             Producer<String, String> producer = new KafkaProducer<>(props);
-            try {
-                ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
-                Thread.sleep(Duration.seconds(1).toMilliseconds());
-
-                ProducerRecord<String, String> data = new ProducerRecord<>(topic,
message);
-                producer.send(data);
-                producer.close();
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+            ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
+
+            ProducerRecord<String, String> data = new ProducerRecord<>(topic,
message);
+            producer.send(data);
+            producer.close();
         } else {
             throw new InvalidParameterException("No kafka broker node found");
         }


Mime
View raw message