Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 61468200CB9 for ; Sun, 2 Jul 2017 21:04:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6063F160C03; Sun, 2 Jul 2017 19:04:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 60165160BFE for ; Sun, 2 Jul 2017 21:04:56 +0200 (CEST) Received: (qmail 13975 invoked by uid 500); 2 Jul 2017 19:04:55 -0000 Mailing-List: contact commits-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list commits@apex.apache.org Received: (qmail 13878 invoked by uid 99); 2 Jul 2017 19:04:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Jul 2017 19:04:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 205F3E152F; Sun, 2 Jul 2017 19:04:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@apex.apache.org Date: Sun, 02 Jul 2017 19:04:54 -0000 Message-Id: <3945e7e206104a84982685dded67f54d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] apex-malhar git commit: APEXMALHAR-2459 1)Refactor the existing Kafka Input Operator. 2)Added the support of KafkaInputOperator using 0.10 consumer API archived-at: Sun, 02 Jul 2017 19:04:58 -0000 Repository: apex-malhar Updated Branches: refs/heads/master 4df68580e -> b42d8e741 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java deleted file mode 100644 index e9fcc36..0000000 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.kafka; - -import java.io.File; -import java.io.IOException; -import java.net.ServerSocket; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -import org.I0Itec.zkclient.ZkClient; - -import org.apache.commons.io.FileUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; - -import com.google.common.base.Throwables; - -import kafka.admin.AdminUtils; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.MockTime; -import kafka.utils.TestUtils; -import kafka.utils.Time; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; - -public class EmbeddedKafka -{ - private static final String KAFKA_PATH = "/tmp/kafka-test"; - - private ZkClient zkClient; - private ZkUtils zkUtils; - private String BROKERHOST = "127.0.0.1"; - private String BROKERPORT = "9092"; - private EmbeddedZookeeper zkServer; - private KafkaServer kafkaServer; - - public String getBroker() - { - return BROKERHOST + ":" + BROKERPORT; - } - - public void start() throws IOException - { - // Find port - try { - ServerSocket serverSocket = new ServerSocket(0); - BROKERPORT = Integer.toString(serverSocket.getLocalPort()); - serverSocket.close(); - } catch (IOException e) { - throw Throwables.propagate(e); - } - - // Setup Zookeeper - zkServer = new EmbeddedZookeeper(); - String zkConnect = BROKERHOST + ":" + zkServer.port(); - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); - zkUtils = ZkUtils.apply(zkClient, false); - - // Setup brokers - cleanupDir(); - Properties props = new Properties(); - props.setProperty("zookeeper.connect", zkConnect); - props.setProperty("broker.id", "0"); - props.setProperty("log.dirs", KAFKA_PATH); - props.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - KafkaConfig config = new KafkaConfig(props); - Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); - } - - public void stop() throws IOException - { - kafkaServer.shutdown(); - zkClient.close(); - zkServer.shutdown(); - cleanupDir(); - } - - private void cleanupDir() throws IOException - { - FileUtils.deleteDirectory(new File(KAFKA_PATH)); - } - - public void createTopic(String topic) - { - AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties()); - List servers = new ArrayList(); - servers.add(kafkaServer); - TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000); - } - - public void publish(String topic, List messages) - { - Properties producerProps = new Properties(); - producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); - producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); - producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - - try (KafkaProducer producer = new KafkaProducer<>(producerProps)) { - for (String message : messages) { - ProducerRecord data = new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8)); - producer.send(data); - } - } - - List servers = new ArrayList(); - servers.add(kafkaServer); - TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000); - } - - public List consume(String topic, int timeout) - { - return consume(topic, timeout, true); - } - - public List consume(String topic, int timeout, boolean earliest) - { - Properties consumerProps = new Properties(); - consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); - consumerProps.setProperty("group.id", "group0"); - consumerProps.setProperty("client.id", "consumer0"); - consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - // to make sure the consumer starts from the beginning of the topic - consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest"); - KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(Arrays.asList(topic)); - - List messages = new ArrayList<>(); - - ConsumerRecords records = consumer.poll(timeout); - for (ConsumerRecord record : records) { - messages.add(new String(record.value())); - } - - consumer.close(); - - return messages; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java deleted file mode 100644 index 81d7ade..0000000 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.kafka; - -import java.text.ParseException; -import java.util.Properties; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import org.apache.kafka.common.KafkaException; - -import com.datatorrent.api.Context; - -public class KafkaConsumerPropertiesTest -{ - - KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator(); - @Rule - public Watcher watcher = new Watcher(); - - public class Watcher extends TestWatcher - { - Context.OperatorContext context; - - @Override - protected void starting(Description description) - { - super.starting(description); - kafkaInput.setClusters("localhost:8087"); - kafkaInput.setInitialPartitionCount(1); - kafkaInput.setTopics("apexTest"); - kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); - Properties prop = new Properties(); - prop.setProperty("security.protocol", "SASL_PLAINTEXT"); - prop.setProperty("sasl.kerberos.service.name", "kafka"); - kafkaInput.setConsumerProps(prop); - } - - @Override - protected void finished(Description description) - { - super.finished(description); - } - } - - @Test - public void TestConsumerProperties() throws ParseException - { - //Added test on this check to ensure consumer properties are set and not reset between. - if (null != kafkaInput.getConsumerProps().get("security.protocol")) { - try { - kafkaInput.definePartitions(null, null); - } catch (KafkaException e) { - //Ensures the properties of the consumer are set/not reset. - Assert.assertEquals( - "java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in " - + "secure mode.", e.getCause().getMessage()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java deleted file mode 100644 index abf3a5b..0000000 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.apex.malhar.kafka; - -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; - -public class KafkaHelper implements Serializer, - Deserializer -{ - @Override - public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes) - { - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - int nameLength = byteBuffer.getInt(); - byte[] name = new byte[nameLength]; - - byteBuffer.get(name, 0, nameLength); - - return new KafkaOutputOperatorTest.Person(new String(name), byteBuffer.getInt()); - } - - @Override - public byte[] serialize(String s, KafkaOutputOperatorTest.Person person) - { - byte[] name = person.name.getBytes(); - - ByteBuffer byteBuffer = ByteBuffer.allocate(name.length + 4 + 4); - - byteBuffer.putInt(name.length); - byteBuffer.put(name); - byteBuffer.putInt(person.age); - - return byteBuffer.array(); - } - - @Override - public void configure(Map map, boolean b) - { - } - - @Override - public void close() - { - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java deleted file mode 100644 index f16c8f4..0000000 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ /dev/null @@ -1,399 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.kafka; - -import java.io.File; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.LoggerFactory; - -import org.apache.apex.malhar.lib.wal.FSWindowDataManager; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.LocalMode; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.stram.StramLocalCluster; - -/** - * A bunch of test to verify the input operator will be automatically partitioned - * per kafka partition This test is launching its - * own Kafka cluster. - */ -@RunWith(Parameterized.class) -public class KafkaInputOperatorTest extends KafkaOperatorTestBase -{ - - private int totalBrokers = 0; - - private String partition = null; - - private String testName = ""; - - public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator; - - public class KafkaTestInfo extends TestWatcher - { - public org.junit.runner.Description desc; - - public String getDir() - { - String methodName = desc.getMethodName(); - String className = desc.getClassName(); - return "target/" + className + "/" + methodName + "/" + testName; - } - - @Override - protected void starting(org.junit.runner.Description description) - { - this.desc = description; - } - } - - @Rule - public final KafkaTestInfo testInfo = new KafkaTestInfo(); - - @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}") - public static Collection testScenario() - { - return Arrays.asList(new Object[][]{ - {true, false, "one_to_one"},// multi cluster with single partition - {true, false, "one_to_many"}, - {true, true, "one_to_one"},// multi cluster with multi partitions - {true, true, "one_to_many"}, - {false, true, "one_to_one"}, // single cluster with multi partitions - {false, true, "one_to_many"}, - {false, false, "one_to_one"}, // single cluster with single partitions - {false, false, "one_to_many"} - }); - } - - @Before - public void before() - { - testName = TEST_TOPIC + testCounter++; - logger.info("before() test case: {}", testName); - tupleCollection.clear(); - //reset count for next new test case - k = 0; - - createTopic(0, testName); - if (hasMultiCluster) { - createTopic(1, testName); - } - - } - - public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition) - { - // This class want to initialize several kafka brokers for multiple partitions - this.hasMultiCluster = hasMultiCluster; - this.hasMultiPartition = hasMultiPartition; - int cluster = 1 + (hasMultiCluster ? 1 : 0); - totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster; - this.partition = partition; - } - - private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class); - private static List tupleCollection = new LinkedList<>(); - - /** - * whether countDown latch count all tuples or just END_TUPLE - */ - private static final boolean countDownAll = false; - private static final int scale = 2; - private static final int totalCount = 10 * scale; - private static final int failureTrigger = 3 * scale; - private static final int tuplesPerWindow = 5 * scale; - private static final int waitTime = 60000 + 300 * scale; - - //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed, - //so, count valid tuple instead. - private static CountDownLatch latch; - private static boolean hasFailure = false; - private static int k = 0; - private static Thread monitorThread; - - /** - * Test Operator to collect tuples from KafkaSingleInputStringOperator. - * - * @param - */ - public static class CollectorModule extends BaseOperator - { - public final transient DefaultInputPort inputPort = new DefaultInputPort() - { - @Override - public void process(byte[] bt) - { - processTuple(bt); - } - }; - - long currentWindowId; - - long operatorId; - - boolean isIdempotentTest = false; - - transient List windowTupleCollector = Lists.newArrayList(); - private transient Map> tupleCollectedInWindow = new HashMap<>(); - private int endTuples = 0; - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - operatorId = context.getId(); - } - - @Override - public void beginWindow(long windowId) - { - super.beginWindow(windowId); - currentWindowId = windowId; - windowTupleCollector.clear(); - endTuples = 0; - } - - public void processTuple(byte[] bt) - { - String tuple = new String(bt); - if (hasFailure && k++ == failureTrigger) { - //you can only kill yourself once - hasFailure = false; - throw new RuntimeException(); - } - if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) { - endTuples++; - } - - windowTupleCollector.add(tuple); - } - - @Override - public void endWindow() - { - super.endWindow(); - if (isIdempotentTest) { - String key = operatorId + "," + currentWindowId; - List msgsInWin = tupleCollectedInWindow.get(key); - if (msgsInWin != null) { - Assert.assertEquals( - "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector); - } else { - List newList = Lists.newArrayList(); - newList.addAll(windowTupleCollector); - tupleCollectedInWindow.put(key, newList); - } - } - - //discard the tuples of this window if except happened - int tupleSize = windowTupleCollector.size(); - tupleCollection.addAll(windowTupleCollector); - - int countDownTupleSize = countDownAll ? tupleSize : endTuples; - - if (latch != null) { - Assert.assertTrue( - "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize); - while (countDownTupleSize > 0) { - latch.countDown(); - --countDownTupleSize; - } - if (latch.getCount() == 0) { - /** - * The time before countDown() and the shutdown() of the application - * will cause fatal error: - * "Catastrophic Error: Invalid State - the operator blocked forever!" - * as the activeQueues could be cleared but alive haven't changed yet. - * throw the ShutdownException to let the engine shutdown; - */ - try { - throw new ShutdownException(); - //lc.shutdown(); - } finally { - /** - * interrupt the engine thread, let it wake from sleep and handle - * the shutdown at this time, all payload should be handled. so it - * should be ok to interrupt - */ - monitorThread.interrupt(); - } - } - } - } - - } - - /** - * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives - * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform. - * - * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e. - * consumer) and send using emitTuples() interface on output port] - * - * - * @throws Exception - */ - @Test - public void testInputOperator() throws Exception - { - hasFailure = false; - testInputOperator(false, false); - } - - @Test - public void testInputOperatorWithFailure() throws Exception - { - hasFailure = true; - testInputOperator(true, false); - } - - @Test - public void testIdempotentInputOperatorWithFailure() throws Exception - { - hasFailure = true; - testInputOperator(true, true); - } - - public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception - { - // each broker should get a END_TUPLE message - latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers); - - logger.info( - "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" + - " hasMultiPartition: {}, partition: {}", - testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition); - - // Start producer - KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster); - p.setSendCount(totalCount); - Thread t = new Thread(p); - t.start(); - - int expectedReceiveCount = totalCount + totalBrokers; - - // Create DAG for testing. - LocalMode lma = LocalMode.newInstance(); - DAG dag = lma.getDAG(); - - // Create KafkaSinglePortStringInputOperator - KafkaSinglePortInputOperator node = dag.addOperator( - "Kafka input" + testName, KafkaSinglePortInputOperator.class); - node.setInitialPartitionCount(1); - // set topic - node.setTopics(testName); - node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); - node.setClusters(getClusterConfig()); - node.setStrategy(partition); - if (idempotent) { - node.setWindowDataManager(new FSWindowDataManager()); - } - - // Create Test tuple collector - CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class); - collector.isIdempotentTest = idempotent; - - // Connect ports - dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort) - .setLocality(Locality.CONTAINER_LOCAL); - - if (hasFailure) { - setupHasFailureTest(node, dag); - } - - // Create local cluster - LocalMode.Controller lc = lma.getController(); - lc.setHeartbeatMonitoringEnabled(false); - - //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(), - //but Controller.runAsync() don't expose the thread which run it, - //so we don't know when the thread will be terminated. - //create this thread and then call join() to make sure the Controller shutdown completely. - monitorThread = new Thread((StramLocalCluster)lc, "master"); - monitorThread.start(); - - boolean notTimeout = true; - try { - // Wait 60s for consumer finish consuming all the messages - notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS); - lc.shutdown(); - - //wait until control thread finished. - monitorThread.join(); - } catch (Exception e) { - logger.warn(e.getMessage()); - } - - t.join(); - - if (!notTimeout || expectedReceiveCount != tupleCollection.size()) { - logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(), - expectedReceiveCount, testName, tupleCollection); - } - Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " - + tupleCollection, notTimeout); - - // Check results - Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size() - + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection, - expectedReceiveCount == tupleCollection.size()); - - logger.info("End of test case: {}", testName); - } - - private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag) - { - operator.setHoldingBufferSize(5000); - dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1); - //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent( - // APPLICATION_PATH + "failureck", new Configuration())); - operator.setMaxTuplesPerWindow(tuplesPerWindow); - } - - private String getClusterConfig() - { - String l = "localhost:"; - return l + TEST_KAFKA_BROKER_PORT[0][0] + - (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + - (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + - (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java deleted file mode 100644 index 3910546..0000000 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java +++ /dev/null @@ -1,292 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.kafka; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.Properties; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.slf4j.LoggerFactory; - -import org.apache.commons.io.FileUtils; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ZKDatabase; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; - -import kafka.admin.TopicCommand; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; -import kafka.utils.ZkUtils; - -/** - * This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition - * test, this class creates 2 kafka partitions - */ -public class KafkaOperatorTestBase -{ - - public static final String END_TUPLE = "END_TUPLE"; - public static final int[] TEST_ZOOKEEPER_PORT; - public static final int[][] TEST_KAFKA_BROKER_PORT; - public static final String TEST_TOPIC = "testtopic"; - public static int testCounter = 0; - - // get available ports - static { - ServerSocket[] listeners = new ServerSocket[6]; - int[] p = new int[6]; - - try { - for (int i = 0; i < 6; i++) { - listeners[i] = new ServerSocket(0); - p[i] = listeners[i].getLocalPort(); - } - - for (int i = 0; i < 6; i++) { - listeners[i].close(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - - TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]}; - TEST_KAFKA_BROKER_PORT = new int[][]{ - new int[]{p[2], p[3]}, - new int[]{p[4], p[5]} - }; - } - - static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class); - // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer - - // multiple brokers in multiple cluster - private static KafkaServerStartable[][] broker = new KafkaServerStartable[2][2]; - - // multiple cluster - private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2]; - - private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2]; - - public static String baseDir = "target"; - - private static final String zkBaseDir = "zookeeper-server-data"; - private static final String kafkaBaseDir = "kafka-server-data"; - private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"}; - private static final String[][] kafkadir = new String[][]{ - new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"}, - new String[]{"kafka-server-data/2/1", "kafka-server-data/2/2"}}; - protected boolean hasMultiPartition = false; - protected boolean hasMultiCluster = false; - - public static void startZookeeper(final int clusterId) - { - try { - - int numConnections = 100; - int tickTime = 2000; - File dir = new File(baseDir, zkdir[clusterId]); - - zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime); - zkFactory[clusterId] = new NIOServerCnxnFactory(); - zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections); - - zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server. - Thread.sleep(2000); - //kserver.startup(); - } catch (Exception ex) { - logger.error(ex.getLocalizedMessage()); - } - } - - public static void stopZookeeper() - { - for (ZooKeeperServer zs : zkServer) { - if (zs != null) { - zs.shutdown(); - } - } - - for (ServerCnxnFactory zkf : zkFactory) { - if (zkf != null) { - zkf.closeAll(); - zkf.shutdown(); - } - } - zkServer = new ZooKeeperServer[2]; - zkFactory = new ServerCnxnFactory[2]; - } - - public static void startKafkaServer(int clusterid, int brokerid) - { - Properties props = new Properties(); - props.setProperty("broker.id", "" + clusterid * 10 + brokerid); - props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString()); - props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]); - props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]); - props.setProperty("default.replication.factor", "1"); - // set this to 50000 to boost the performance so most test data are in memory before flush to disk - props.setProperty("log.flush.interval.messages", "50000"); - - broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props)); - broker[clusterid][brokerid].startup(); - - } - - public static void startKafkaServer() - { - - FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir)); - //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, - // new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } }; - startKafkaServer(0, 0); - startKafkaServer(0, 1); - startKafkaServer(1, 0); - startKafkaServer(1, 1); - - // startup is asynch operation. wait 2 sec for server to startup - - } - - public static void stopKafkaServer() - { - for (int i = 0; i < broker.length; i++) { - for (int j = 0; j < broker[i].length; j++) { - if (broker[i][j] != null) { - broker[i][j].shutdown(); - broker[i][j].awaitShutdown(); - broker[i][j] = null; - } - } - } - } - - @BeforeClass - public static void beforeTest() - { - try { - startZookeeper(); - startKafkaServer(); - } catch (java.nio.channels.CancelledKeyException ex) { - logger.debug("LSHIL {}", ex.getLocalizedMessage()); - } - } - - public static void startZookeeper() - { - FileUtils.deleteQuietly(new File(baseDir, zkBaseDir)); - startZookeeper(0); - startZookeeper(1); - } - - public void createTopic(int clusterid, String topicName) - { - String[] args = new String[9]; - args[0] = "--zookeeper"; - args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]; - args[2] = "--replication-factor"; - args[3] = "1"; - args[4] = "--partitions"; - if (hasMultiPartition) { - args[5] = "2"; - } else { - args[5] = "1"; - } - args[6] = "--topic"; - args[7] = topicName; - args[8] = "--create"; - - ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000, false); - TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args)); - - } - - @AfterClass - public static void afterTest() - { - try { - stopKafkaServer(); - stopZookeeper(); - } catch (Exception ex) { - logger.debug("LSHIL {}", ex.getLocalizedMessage()); - } - } - - public void setHasMultiPartition(boolean hasMultiPartition) - { - this.hasMultiPartition = hasMultiPartition; - } - - public void setHasMultiCluster(boolean hasMultiCluster) - { - this.hasMultiCluster = hasMultiCluster; - } - - public static class TestZookeeperServer extends ZooKeeperServer - { - - public TestZookeeperServer() - { - super(); - // TODO Auto-generated constructor stub - } - - public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException - { - super(snapDir, logDir, tickTime); - // TODO Auto-generated constructor stub - } - - public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException - { - super(txnLogFactory, treeBuilder); - // TODO Auto-generated constructor stub - } - - public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) - throws IOException - { - super(txnLogFactory, tickTime, treeBuilder); - // TODO Auto-generated constructor stub - } - - public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, - int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb) - { - super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb); - // TODO Auto-generated constructor stub - } - - @Override - protected void registerJMX() - { - } - - @Override - protected void unregisterJMX() - { - } - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java deleted file mode 100644 index 9a67024..0000000 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java +++ /dev/null @@ -1,429 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.kafka; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.apex.malhar.lib.wal.FSWindowDataManager; -import org.apache.commons.io.FileUtils; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; - -import com.datatorrent.api.Attribute; -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.Operator; -import com.datatorrent.common.util.BaseOperator; - -import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; -import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; - -public class KafkaOutputOperatorTest extends KafkaOperatorTestBase -{ - String testName; - private static List tupleCollection = new LinkedList<>(); - private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper"; - private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper"; - - public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator; - - @Before - public void before() - { - FileUtils.deleteQuietly(new File(APPLICATION_PATH)); - testName = TEST_TOPIC + testCounter++; - createTopic(0, testName); - if (hasMultiCluster) { - createTopic(1, testName); - } - } - - @After - public void after() - { - FileUtils.deleteQuietly(new File(APPLICATION_PATH)); - } - - @Test - public void testExactlyOnceWithFailure() throws Exception - { - List toKafka = GenerateList(); - - sendDataToKafka(true, toKafka, true, false); - - List fromKafka = ReadFromKafka(); - - Assert.assertTrue("With Failure", compare(fromKafka, toKafka)); - } - - @Test - public void testExactlyOnceWithNoFailure() throws Exception - { - List toKafka = GenerateList(); - - sendDataToKafka(true, toKafka, false, false); - - List fromKafka = ReadFromKafka(); - - Assert.assertTrue("With No Failure", compare(fromKafka, toKafka)); - } - - @Test - public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception - { - List toKafka = GenerateList(); - - try { - sendDataToKafka(true, toKafka, true, true); - } catch (RuntimeException ex) { - - boolean expectedException = false; - if (ex.getMessage().contains("Violates")) { - expectedException = true; - } - - Assert.assertTrue("Different tuples after recovery", expectedException); - return; - } - - Assert.assertTrue("Wrong tuples during replay, should throw exception", false); - } - - @Test - public void testKafkaOutput() throws Exception - { - List toKafka = GenerateList(); - - sendDataToKafka(false, toKafka, false, false); - - List fromKafka = ReadFromKafka(); - - Assert.assertTrue("No failure", compare(fromKafka, toKafka)); - } - - @Test - public void testKafkaOutputWithFailure() throws Exception - { - List toKafka = GenerateList(); - - sendDataToKafka(false, toKafka, true, true); - - List fromKafka = ReadFromKafka(); - - Assert.assertTrue("No failure", fromKafka.size() > toKafka.size()); - } - - private void sendDataToKafka(boolean exactlyOnce, List toKafka, boolean hasFailure, - boolean differentTuplesAfterRecovery) throws InterruptedException - { - Properties props = new Properties(); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER); - if (!exactlyOnce) { - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_SERIALIZER); - } - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER); - - Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp"); - attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH); - - OperatorContext operatorContext = mockOperatorContext(2, attributeMap); - - cleanUp(operatorContext); - - Operator kafkaOutput; - DefaultInputPort inputPort; - - if (exactlyOnce) { - KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = - ResetKafkaOutput(testName, props, operatorContext); - inputPort = kafkaOutputTemp.inputPort; - kafkaOutput = kafkaOutputTemp; - } else { - KafkaSinglePortOutputOperator kafkaOutputTemp = - ResetKafkaSimpleOutput(testName, props, operatorContext); - inputPort = kafkaOutputTemp.inputPort; - kafkaOutput = kafkaOutputTemp; - } - - kafkaOutput.beginWindow(1); - inputPort.getSink().put(toKafka.get(0)); - inputPort.getSink().put(toKafka.get(1)); - inputPort.getSink().put(toKafka.get(2)); - kafkaOutput.endWindow(); - kafkaOutput.beginWindow(2); - inputPort.getSink().put(toKafka.get(3)); - inputPort.getSink().put(toKafka.get(4)); - inputPort.getSink().put(toKafka.get(5)); - kafkaOutput.endWindow(); - kafkaOutput.beginWindow(3); - inputPort.getSink().put(toKafka.get(6)); - inputPort.getSink().put(toKafka.get(7)); - - if (hasFailure) { - if (exactlyOnce) { - KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = - ResetKafkaOutput(testName, props, operatorContext); - inputPort = kafkaOutputTemp.inputPort; - kafkaOutput = kafkaOutputTemp; - } else { - KafkaSinglePortOutputOperator kafkaOutputTemp = - ResetKafkaSimpleOutput(testName, props, operatorContext); - inputPort = kafkaOutputTemp.inputPort; - kafkaOutput = kafkaOutputTemp; - } - - kafkaOutput.beginWindow(2); - inputPort.getSink().put(toKafka.get(3)); - inputPort.getSink().put(toKafka.get(4)); - inputPort.getSink().put(toKafka.get(5)); - kafkaOutput.endWindow(); - kafkaOutput.beginWindow(3); - inputPort.getSink().put(toKafka.get(6)); - - if (!differentTuplesAfterRecovery) { - inputPort.getSink().put(toKafka.get(7)); - } - } - - inputPort.getSink().put(toKafka.get(8)); - inputPort.getSink().put(toKafka.get(9)); - kafkaOutput.endWindow(); - kafkaOutput.beginWindow(4); - inputPort.getSink().put(toKafka.get(10)); - inputPort.getSink().put(toKafka.get(11)); - kafkaOutput.endWindow(); - - cleanUp(operatorContext); - } - - private KafkaSinglePortExactlyOnceOutputOperator ResetKafkaOutput( - String testName, Properties props, Context.OperatorContext operatorContext) - { - KafkaSinglePortExactlyOnceOutputOperator kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>(); - kafkaOutput.setTopic(testName); - kafkaOutput.setProperties(props); - kafkaOutput.setup(operatorContext); - - return kafkaOutput; - } - - private KafkaSinglePortOutputOperator ResetKafkaSimpleOutput( - String testName, Properties props, Context.OperatorContext operatorContext) - { - KafkaSinglePortOutputOperator kafkaOutput = new KafkaSinglePortOutputOperator<>(); - kafkaOutput.setTopic(testName); - kafkaOutput.setProperties(props); - kafkaOutput.setup(operatorContext); - - return kafkaOutput; - } - - private void cleanUp(Context.OperatorContext operatorContext) - { - FSWindowDataManager windowDataManager = new FSWindowDataManager(); - windowDataManager.setup(operatorContext); - try { - windowDataManager.committed(windowDataManager.getLargestCompletedWindow()); - } catch (IOException e) { - e.printStackTrace(); - } - } - - private boolean compare(List fromKafka, List toKafka) - { - if (fromKafka.size() != toKafka.size()) { - return false; - } - - for (int i = 0; i < fromKafka.size(); ++i) { - if (!fromKafka.get(i).equals(toKafka.get(i))) { - return false; - } - } - - return true; - } - - private String getClusterConfig() - { - String l = "localhost:"; - return l + TEST_KAFKA_BROKER_PORT[0][0] + - (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + - (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + - (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); - } - - private List GenerateList() - { - List people = new ArrayList<>(); - - for (Integer i = 0; i < 12; ++i) { - people.add(new Person(i.toString(), i)); - } - - return people; - } - - private List ReadFromKafka() - { - tupleCollection.clear(); - - // Create KafkaSinglePortStringInputOperator - Properties props = new Properties(); - props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); - props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); - props.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_DESERIALIZER); - props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER); - props.put(GROUP_ID_CONFIG, "KafkaTest"); - - LocalMode lma = LocalMode.newInstance(); - DAG dag = lma.getDAG(); - - // Create KafkaSinglePortStringInputOperator - KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class); - node.setConsumerProps(props); - node.setInitialPartitionCount(1); - // set topic - node.setTopics(testName); - node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); - node.setClusters(getClusterConfig()); - node.setStrategy("one_to_one"); - - // Create Test tuple collector - CollectorModule collector1 = dag.addOperator("collector", new CollectorModule()); - - // Connect ports - dag.addStream("Kafka message", node.outputPort, collector1.inputPort); - - // Create local cluster - final LocalMode.Controller lc = lma.getController(); - lc.setHeartbeatMonitoringEnabled(false); - - lc.run(30000); - - return tupleCollection; - } - - public static class CollectorModule extends BaseOperator - { - public final transient CollectorInputPort inputPort = new CollectorInputPort(this); - - long currentWindowId; - long operatorId; - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - operatorId = context.getId(); - } - - @Override - public void beginWindow(long windowId) - { - super.beginWindow(windowId); - currentWindowId = windowId; - } - - @Override - public void endWindow() - { - super.endWindow(); - } - } - - public static class CollectorInputPort extends DefaultInputPort - { - CollectorModule ownerNode; - - CollectorInputPort(CollectorModule node) - { - this.ownerNode = node; - } - - @Override - public void process(byte[] bt) - { - tupleCollection.add(new KafkaHelper().deserialize("r", bt)); - } - } - - public static class Person - { - public String name; - public Integer age; - - public Person(String name, Integer age) - { - this.name = name; - this.age = age; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Person person = (Person)o; - - if (name != null ? !name.equals(person.name) : person.name != null) { - return false; - } - - return age != null ? age.equals(person.age) : person.age == null; - } - - @Override - public int hashCode() - { - int result = name != null ? name.hashCode() : 0; - result = 31 * result + (age != null ? age.hashCode() : 0); - return result; - } - - @Override - public String toString() - { - return name + age.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java deleted file mode 100644 index 6098bde..0000000 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.kafka; - -import java.util.Map; - -import org.apache.kafka.clients.producer.Partitioner; -import org.apache.kafka.common.Cluster; - -import kafka.utils.VerifiableProperties; - -/** - * A simple partitioner class for test purpose - * Key is a int string - * Messages are distributed to all partitions - * One for even number, the other for odd - */ -public class KafkaTestPartitioner implements Partitioner -{ - public KafkaTestPartitioner(VerifiableProperties props) - { - - } - - public KafkaTestPartitioner() - { - - } - - @Override - public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) - { - int num_partitions = cluster.partitionsForTopic(topic).size(); - return Integer.parseInt((String)key) % num_partitions; - } - - @Override - public void close() - { - - } - - @Override - public void configure(Map map) - { - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java deleted file mode 100644 index 0f18666..0000000 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.kafka; - -import java.util.List; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.serialization.StringSerializer; - -import com.google.common.collect.Lists; - -/** - * A kafka producer for testing - */ -public class KafkaTestProducer implements Runnable -{ - // private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class); - private final Producer producer; - private final Producer producer1; - private final String topic; - private int sendCount = 20; - // to generate a random int as a key for partition - private final Random rand = new Random(); - private boolean hasPartition = false; - private boolean hasMultiCluster = false; - private List messages; - - // http://kafka.apache.org/documentation.html#producerconfigs - private String ackType = "1"; - - public int getSendCount() - { - return sendCount; - } - - public void setSendCount(int sendCount) - { - this.sendCount = sendCount; - } - - public void setMessages(List messages) - { - this.messages = messages; - } - - private Properties createProducerConfig(int cid) - { - Properties props = new Properties(); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName()); - String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0]; - brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]) : ""; - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); - props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000"); - props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType()); - - return props; - } - - public KafkaTestProducer(String topic) - { - this(topic, false); - } - - public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster) - { - // Use random partitioner. Don't need the key type. Just set it to Integer. - // The message is of type String. - this.topic = topic; - this.hasPartition = hasPartition; - this.hasMultiCluster = hasMultiCluster; - producer = new KafkaProducer<>(createProducerConfig(0)); - if (hasMultiCluster) { - producer1 = new KafkaProducer<>(createProducerConfig(1)); - } else { - producer1 = null; - } - } - - public KafkaTestProducer(String topic, boolean hasPartition) - { - this(topic, hasPartition, false); - } - - private transient List> sendTasks = Lists.newArrayList(); - - private void generateMessages() - { - // Create dummy message - int messageNo = 1; - while (messageNo <= sendCount) { - String messageStr = "_" + messageNo++; - int k = rand.nextInt(100); - sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr))); - if (hasMultiCluster && messageNo <= sendCount) { - messageStr = "_" + messageNo++; - sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr))); - } - // logger.debug(String.format("Producing %s", messageStr)); - } - // produce the end tuple to let the test input operator know it's done produce messages - for (int i = 0; i < (hasPartition ? 2 : 1); ++i) { - sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE))); - if (hasMultiCluster) { - sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE))); - } - } - } - - @Override - public void run() - { - if (messages == null) { - generateMessages(); - } else { - for (String msg : messages) { - sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg))); - } - } - - producer.flush(); - if (producer1 != null) { - producer1.flush(); - } - - try { - for (Future task : sendTasks) { - task.get(30, TimeUnit.SECONDS); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - - close(); - } - - public void close() - { - producer.close(); - if (producer1 != null) { - producer1.close(); - } - } - - public String getAckType() - { - return ackType; - } - - public void setAckType(String ackType) - { - this.ackType = ackType; - } -} // End of KafkaTestProducer http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/kafka/src/test/resources/log4j.properties b/kafka/src/test/resources/log4j.properties deleted file mode 100644 index 910e44a..0000000 --- a/kafka/src/test/resources/log4j.properties +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=INFO -#log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=WARN - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -#log4j.logger.org=INFO - -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=INFO -log4j.logger.org.apache.apex=INFO - -log4j.logger.org.apache.kafka=WARN -log4j.logger.kafka.consumer=WARN -log4j.logger.kafka=WARN -log4j.logger.org.apache.zookeeper=WARN http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/sql/pom.xml ---------------------------------------------------------------------- diff --git a/sql/pom.xml b/sql/pom.xml index 113e563..96ac178 100644 --- a/sql/pom.xml +++ b/sql/pom.xml @@ -84,12 +84,6 @@ org.apache.apex malhar-kafka ${project.parent.version} - - - * - * - - org.apache.calcite