Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7E9FF1783C for ; Thu, 16 Apr 2015 07:33:49 +0000 (UTC) Received: (qmail 50137 invoked by uid 500); 16 Apr 2015 07:33:49 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 50096 invoked by uid 500); 16 Apr 2015 07:33:49 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 50086 invoked by uid 99); 16 Apr 2015 07:33:49 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2015 07:33:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 16 Apr 2015 07:33:27 +0000 Received: (qmail 50048 invoked by uid 99); 16 Apr 2015 07:33:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2015 07:33:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BBB97E09FB; Thu, 16 Apr 2015 07:33:24 +0000 (UTC) From: rmetzger To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request: [FLINK-1753] Extend KafkaITCase, reworked Pers... Content-Type: text/plain Message-Id: <20150416073324.BBB97E09FB@git1-us-west.apache.org> Date: Thu, 16 Apr 2015 07:33:24 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/603#discussion_r28489103 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java --- @@ -709,4 +834,72 @@ private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws U private static final long serialVersionUID = 1L; } +/* @Test + public void testKafkaWithoutFlink() { + + // start consumer: + new Thread(new Runnable() { + @Override + public void run() { + LOG.info("Starting consumer"); + // consume from "testtopic" + + Properties cprops = new Properties(); + cprops.put("zookeeper.connect", zookeeperConnectionString); + cprops.put("group.id", "mygroupid"); + cprops.put("zookeeper.session.timeout.ms", "400"); + cprops.put("zookeeper.sync.time.ms", "200"); + cprops.put("auto.commit.interval.ms", "1000"); + ConsumerConnector jcc = Consumer.createJavaConsumerConnector(new ConsumerConfig(cprops)); + Map tcm = new HashMap(1); + tcm.put("testtopic", 1); + Map>> ms = jcc.createMessageStreams(tcm); + LOG.info("message streams "+ms); + Assert.assertEquals(1, ms.size()); + + List> strList = ms.get("testtopic"); + Assert.assertEquals(1, strList.size()); + + KafkaStream str = strList.get(0); + LOG.info("str = "+str); + + ConsumerIterator it = str.iterator(); + while(it.hasNext()) { + MessageAndMetadata msg = it.next(); + LOG.info("msg = "+msg); + } + LOG.info("Finished consumer"); + } + }).start(); + + // start producer: + Properties props = new Properties(); + String brks = ""; + for(KafkaServer broker : brokers) { + SocketServer srv = broker.socketServer(); + String host = "localhost"; + if(srv.host() != null) { + host = srv.host(); + } + brks += host+":"+broker.socketServer().port()+","; + } + LOG.info("Using broker list "+brks); + props.setProperty("metadata.broker.list", brks); + props.put("serializer.class", StringEncoder.class.getCanonicalName()); + + ProducerConfig config = new ProducerConfig(props); + Producer p = new Producer(config); + + for(int i = 0; i < 10; i++) { + p.send(new KeyedMessage("testtopic", "What a message")); + } + + LOG.info("+++++++++++ FINISHED PRODUCING ++++++++++++++++ "); + + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + e.printStackTrace(); + } --- End diff -- I was so desperate that I wanted to test the Kafka stuff without any Flink code. But in the end I figured it out. I can remove the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---