flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [18/50] [abbrv] flink git commit: [FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9
Date Fri, 12 Feb 2016 11:29:43 GMT
[FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9

This closes #1597


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9173825a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9173825a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9173825a

Branch: refs/heads/tableOnCalcite
Commit: 9173825aa6a1525d72a78cda16cb4ae1e9b8a8e4
Parents: 8ccd754
Author: Robert Metzger <rmetzger@apache.org>
Authored: Sat Feb 6 13:27:06 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 10 15:12:34 2016 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  2 +-
 .../kafka/internals/LegacyFetcher.java          |  3 ++-
 .../connectors/kafka/Kafka08ITCase.java         | 22 ++++++++++----------
 .../kafka/KafkaTestEnvironmentImpl.java         | 20 +++---------------
 .../kafka/KafkaTestEnvironmentImpl.java         | 22 +++++---------------
 .../connectors/kafka/KafkaConsumerTestBase.java |  5 ++---
 .../connectors/kafka/KafkaTestBase.java         |  2 --
 .../connectors/kafka/KafkaTestEnvironment.java  |  3 ---
 .../flink/yarn/YARNSessionFIFOITCase.java       |  1 +
 9 files changed, 25 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index bdea37f..1cdfffe 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -70,7 +70,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
  *             <li>socket.timeout.ms</li>
  *             <li>socket.receive.buffer.bytes</li>
  *             <li>fetch.message.max.bytes</li>
- *             <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2
behavior)</li>
+ *             <li>auto.offset.reset with the values "largest", "smallest"</li>
  *             <li>fetch.wait.max.ms</li>
  *         </ul>
  *     </li>

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index fe7f777..10f4c41 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -576,7 +576,8 @@ public class LegacyFetcher implements Fetcher {
 
 		private static long getInvalidOffsetBehavior(Properties config) {
 			long timeType;
-			if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest"))
{
+			String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
+			if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is
kafka 0.9
 				timeType = OffsetRequest.LatestTime();
 			} else {
 				timeType = OffsetRequest.EarliestTime();

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 6a2fa27..a3e815e 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -93,13 +93,13 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 		// set invalid offset:
 		CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topic,
0, 1234);
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"),
topic, 0, 1234);
 		curatorClient.close();
 
 		// read from topic
 		final int valuesCount = 20;
 		final int startFrom = 0;
-		readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom);
+		readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom);
 
 		deleteTestTopic(topic);
 	}
@@ -188,9 +188,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 		CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
-		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(),
topicName, 0);
-		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(),
topicName, 1);
-		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(),
topicName, 2);
+		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"),
topicName, 0);
+		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"),
topicName, 1);
+		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"),
topicName, 2);
 
 		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
 
@@ -201,9 +201,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		LOG.info("Manipulating offsets");
 
 		// set the offset to 50 for the three partitions
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName,
0, 49);
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName,
1, 49);
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName,
2, 49);
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"),
topicName, 0, 49);
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"),
topicName, 1, 49);
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"),
topicName, 2, 49);
 
 		curatorClient.close();
 
@@ -250,9 +250,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		// get the offset
 		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
-		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(),
topicName, 0);
-		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(),
topicName, 1);
-		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(),
topicName, 2);
+		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 0);
+		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 1);
+		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 2);
 
 		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 348b75d..6f56ede 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.common.KafkaException;
-import kafka.consumer.ConsumerConfig;
 import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
@@ -68,19 +67,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-	private ConsumerConfig standardCC;
-
 
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
 	}
 
-
-	@Override
-	public ConsumerConfig getStandardConsumerConfig() {
-		return standardCC;
-	}
-
 	@Override
 	public Properties getStandardProperties() {
 		return standardProps;
@@ -187,13 +178,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		standardProps.setProperty("auto.commit.enable", "false");
 		standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default.
Seems to be too small for travis.
 		standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
-		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
+		standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
(smallest is kafka 0.8)
 		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES
MUST BE SMALLER!)
-
-		Properties consumerConfigProps = new Properties();
-		consumerConfigProps.putAll(standardProps);
-		consumerConfigProps.setProperty("auto.offset.reset", "smallest");
-		standardCC = new ConsumerConfig(consumerConfigProps);
 	}
 
 	@Override
@@ -274,8 +260,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	private ZkClient createZkClient() {
-		return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+		return new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0855ba6..50dcab8 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -65,19 +65,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
 	private Properties standardProps;
-	private ConsumerConfig standardCC;
-
 
 	public String getBrokerConnectionString() {
 		return brokerConnectionString;
 	}
 
 	@Override
-	public ConsumerConfig getStandardConsumerConfig() {
-		return standardCC;
-	}
-
-	@Override
 	public Properties getStandardProperties() {
 		return standardProps;
 	}
@@ -184,13 +177,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		standardProps.setProperty("auto.commit.enable", "false");
 		standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default.
Seems to be too small for travis.
 		standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
-		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
+		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
(earliest is kafka 0.9 value)
 		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES
MUST BE SMALLER!)
-
-		Properties consumerConfigProps = new Properties();
-		consumerConfigProps.putAll(standardProps);
-		consumerConfigProps.setProperty("auto.offset.reset", "smallest");
-		standardCC = new ConsumerConfig(consumerConfigProps);
 	}
 
 	@Override
@@ -233,8 +221,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	public ZkUtils getZkUtils() {
-		ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
 		return ZkUtils.apply(creator, false);
 	}
 
@@ -280,8 +268,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		try {
 			LOG.info("Deleting topic {}", topic);
 
-			ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-					standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+			ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
 
 			AdminUtils.deleteTopic(zkUtils, topic);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2d9f2fc..680e4ec 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1284,11 +1284,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase
{
 			throws IOException
 	{
 		// write the sequence to log for debugging purposes
-		Properties stdProps = standardCC.props().props();
-		Properties newProps = new Properties(stdProps);
+		Properties newProps = new Properties(standardProps);
 		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
 		newProps.setProperty("auto.offset.reset", "smallest");
-		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
+		newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
 
 		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
 		printTopic(topicName, printerConfig, deserializer, elements);

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 73cd2f9..ab1d5b6 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -66,7 +66,6 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static String brokerConnectionStrings;
 
-	protected static ConsumerConfig standardCC;
 	protected static Properties standardProps;
 	
 	protected static ForkableFlinkMiniCluster flink;
@@ -98,7 +97,6 @@ public abstract class KafkaTestBase extends TestLogger {
 		kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS);
 
 		standardProps = kafkaServer.getStandardProperties();
-		standardCC = kafkaServer.getStandardConsumerConfig();
 		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
 
 		// start also a re-usable Flink mini cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 40be8a1..76a284b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import kafka.consumer.ConsumerConfig;
 import kafka.server.KafkaServer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
@@ -25,7 +24,6 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
-import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -45,7 +43,6 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor);
 
-	public abstract ConsumerConfig getStandardConsumerConfig();
 
 	public abstract Properties getStandardProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 8c9a9c7..98dc85f 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -155,6 +155,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
 			Assert.assertEquals(1, apps.size()); // Only one running
 			ApplicationReport app = apps.get(0);
+
 			Assert.assertEquals("MyCustomName", app.getName());
 			ApplicationId id = app.getApplicationId();
 			yc.killApplication(id);


Mime
View raw message