apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sunil Parmar <sunilosu...@gmail.com>
Subject Re: How to consume from two different topics in one apex application
Date Thu, 03 Aug 2017 20:26:29 GMT
One other way to set up Kafka mirror maker to mirror data from 0.8 cluster
into 0.9. It's possible that you may hit some conflict in that as well but
it's easy to set up so quick to test. Once you do so, you'll have all data
in same cluster. You can have shorter retention in 0.9 cluster to manage
disk space.

Sunil Parmar

On Thu, Aug 3, 2017 at 8:39 AM, Thomas Weise <thw@apache.org> wrote:

> I don't think you can use both Kafka client 0.8.x and 0.9.x within the
> same application. The dependencies overlap and will conflict. You can use
> 0.8 client to talk to 0.9 server but since you want to use SSL that's not
> possible (security was only added in 0.9).
>
> I have not tried that, but you might be able to to use the Maven shade
> plugin to shade Apex Kafka operator along with the Kafka client so both
> versions can coexist within one application.
>
> Thomas
>
>
> On Tue, Aug 1, 2017 at 5:42 AM, rishi <rishi.mishra@target.com> wrote:
>
>> Hi, Thanks for the reply.
>>
>> I tried to consume from two different topics in same app , I am getting
>> error (*java.lang.NoSuchMethodError:
>> kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/
>> zkclient/ZkClient;)Lscala/collection/Seq;*)
>> .
>>
>> When I tried consuming from kafka 9 using this(KafkaSinglePortInputOpera
>> tor)
>> operator, I was able to do it successfully , but when I am adding another
>> one more operator(KafkaSinglePortByteArrayInputOperator) to consume from
>> .8
>> in same dag I am getting the error.
>>
>> For testing I am not merging kafka output to any operator, it is writing
>> at
>> two different location in HDFS.
>>
>> Looks like there is some version issue comming , which I am not able to
>> identify . Any help is highly appreciated.
>>
>> My pom.xml looks like this=
>>
>> <properties>
>>
>>     <apex.version>3.4.0</apex.version>
>>     <malhar.version>3.6.0</malhar.version>
>>     <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
>>     <hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version>
>>         <hbase.version>1.1.2.2.3.4.0-3485</hbase.version>
>>         <kafka.version>0.9.0.1</kafka.version>
>>          <confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version>
>>         <kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version>
>>
>>         <avro.version>1.7.7</avro.version>
>>         <json.version>1.1</json.version>
>>         <jodatime.version>2.9.1</jodatime.version>
>>         <kyroserializer.version>0.38</kyroserializer.version>
>>         <junit.version>4.10</junit.version>
>>   </properties>
>>
>>   <repositories>
>>         <repository>
>>             <id>HDPReleases</id>
>>             <name>HDP Releases</name>
>>
>> <url>http://repo.hortonworks.com/content/repositories/releases/</url>
>>             <layout>default</layout>
>>         </repository>
>>         <repository>
>>             <id>HDP Jetty Hadoop</id>
>>             <name>HDP Jetty Hadoop</name>
>>
>> <url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
>>             <layout>default</layout>
>>         </repository>
>>         <repository>
>>             <id>confluent</id>
>>             <url>http://packages.confluent.io/maven</url>
>>         </repository>
>>     </repositories>
>>         <dependencies>
>>         <dependency>
>>             <groupId>org.apache.apex</groupId>
>>             <artifactId>malhar-library</artifactId>
>>             <version>${malhar.version}</version>
>>
>>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.apex</groupId>
>>             <artifactId>apex-common</artifactId>
>>             <version>${apex.version}</version>
>>             <scope>provided</scope>
>>         </dependency>
>>         <dependency>
>>             <groupId>junit</groupId>
>>             <artifactId>junit</artifactId>
>>             <version>${junit.version}</version>
>>             <scope>test</scope>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.apex</groupId>
>>             <artifactId>apex-engine</artifactId>
>>             <version>${apex.version}</version>
>>             <scope>test</scope>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.apache.apex</groupId>
>>             <artifactId>malhar-contrib</artifactId>
>>             <version>${malhar.version}</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.apache.apex</groupId>
>>             <artifactId>malhar-kafka</artifactId>
>>             <version>${malhar.version}</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.apache.avro</groupId>
>>             <artifactId>avro</artifactId>
>>             <version>${avro.version}</version>
>>         </dependency>
>>
>>          <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka_2.11</artifactId>
>>             <version>${confluent.kafka.version}</version>
>>         </dependency>
>>           <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka_2.10</artifactId>
>>             <version>0.8.1.1</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>io.confluent</groupId>
>>             <artifactId>kafka-avro-serializer</artifactId>
>>             <version>${kafka.avro.srlzr.version}</version>
>>             <exclusions>
>>                 <exclusion>
>>                     <groupId>log4j</groupId>
>>                     <artifactId>log4j</artifactId>
>>                 </exclusion>
>>                 <exclusion>
>>                     <groupId>org.slf4j</groupId>
>>                     <artifactId>slf4j-log4j12</artifactId>
>>                 </exclusion>
>>             </exclusions>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>com.googlecode.json-simple</groupId>
>>             <artifactId>json-simple</artifactId>
>>             <version>${json.version}</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.apache.hbase</groupId>
>>             <artifactId>hbase-client</artifactId>
>>             <version>${hbase.version}</version>
>>             <scope>provided</scope>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>joda-time</groupId>
>>             <artifactId>joda-time</artifactId>
>>             <version>${jodatime.version}</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>de.javakaffee</groupId>
>>             <artifactId>kryo-serializers</artifactId>
>>             <version>${kyroserializer.version}</version>
>>         </dependency>
>>     </dependencies>
>>
>>
>> My DAG looks like this=>
>>
>> public void populateDAG(DAG dag, Configuration conf)
>>   {
>>
>>     KafkaSinglePortInputOperator kafkaInTtce =
>> dag.addOperator("Kafka_Input_SSL",new KafkaSinglePortInputOperator());
>>
>> kafkaInTtce.setInitialPartitionCount(Integer.parseInt(conf.
>> get("kafka.partitioncount")));
>>     kafkaInTtce.setTopics(conf.get("kafka.ssl.topic"));
>>     kafkaInTtce.setInitialOffset(conf.get("kafka.offset"));
>>     kafkaInTtce.setClusters(conf.get("kafka.cluster"));
>>
>> kafkaInTtce.setConsumerProps(getKafkaProperties(conf.get("ka
>> fka.cluster"),
>> conf));
>>     kafkaInTtce.setStrategy(conf.get("kafka.strategy"));
>>
>>     AvroBytesConversionOperator avroConversion =
>> dag.addOperator("Avro_Convert", new
>> AvroBytesConversionOperator(conf.get("kafka.schema.registry")));
>>     ColumnsExtractOperator fieldExtract = dag.addOperator("Field_Extract
>> ",
>> new ColumnsExtractOperator());
>>
>>     WriteToHdfs hdfs = dag.addOperator("To_HDFS", new
>> WriteToHdfs(conf.get("hdfs.filename")));
>>     hdfs.setMaxLength(268435456); // new file rotates after every 256mb
>>
>>     dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInTtce.outputPort,
>> avroConversion.input);
>>     dag.addStream("jsonstring_stream", avroConversion.output,
>> fieldExtract.input);
>>     dag.addStream("valid_recs_into_hdfs_stream", fieldExtract.output,
>> hdfs.input);
>>
>>     KafkaSinglePortByteArrayInputOperator kafkaInput =
>> dag.addOperator("Kafka_Input_NonSSL", new
>> KafkaSinglePortByteArrayInputOperator());
>>     CopyofAvroBytesConversionOperator avroConversionEstore =
>> dag.addOperator("Avro_Convert_estore", new
>> CopyofAvroBytesConversionOperator("http://--------"));
>>     CopyOfColumnsExtractOperator fieldExtractEstore =
>> dag.addOperator("Field_Extract_Estore", new
>> CopyOfColumnsExtractOperator());
>>     WriteToHdfs2 hdfs2 = dag.addOperator("To_HDFS2", new
>> WriteToHdfs2("DeviceTypeEstore"));
>>     hdfs2.setMaxLength(268435456);
>>
>>     dag.addStream("Kafka_Avro_estore_Stream", kafkaInput.outputPort,
>> avroConversionEstore.input);
>>     dag.addStream("jsonstring_stream_estore",
>> avroConversionEstore.output,
>> fieldExtractEstore.input);
>>     dag.addStream("valid_recs_into_hdfs_estorestream",
>> fieldExtractEstore.output, hdfs2.input);
>>   }
>>
>>
>> Error I am getting(.dt log)=>
>>
>> 2017-08-01 04:57:38,281 INFO  stram.StreamingAppMaster
>> (StreamingAppMaster.java:main(99)) - Initializing Application Master.
>> 2017-08-01 04:57:38,388 INFO  stram.StreamingAppMasterService
>> (StreamingAppMasterService.java:serviceInit(537)) - Application master,
>> appId=507386, clustertimestamp=1500406884031, attemptId=2
>> 2017-08-01 04:57:38,622 WARN  util.NativeCodeLoader
>> (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop
>> library
>> for your platform... using builtin-java classes where applicable
>> 2017-08-01 04:57:39,441 WARN  shortcircuit.DomainSocketFactory
>> (DomainSocketFactory.java:<init>(117)) - The short-circuit local reads
>> feature cannot be used because libhadoop cannot be loaded.
>> 2017-08-01 04:57:40,088 INFO  kafka.AbstractKafkaInputOperator
>> (AbstractKafkaInputOperator.java:initPartitioner(327)) - Initialize
>> Partitioner
>> 2017-08-01 04:57:40,089 INFO  kafka.AbstractKafkaInputOperator
>> (AbstractKafkaInputOperator.java:initPartitioner(340)) - Actual
>> Partitioner
>> is class org.apache.apex.malhar.kafka.OneToManyPartitioner
>> 2017-08-01 04:57:40,121 INFO  kafka.AbstractKafkaPartitioner
>> (AbstractKafkaPartitioner.java:initMetadataClients(234)) - Consumer
>> Properties :  #
>> #Tue Aug 01 04:57:40 CDT 2017
>> security.protocol=SSL
>> enable.auto.commit=false
>> value.deserializer=org.apache.kafka.common.serialization.Byt
>> eArrayDeserializer
>> group.id=org.apache.apex.malhar.kafka.AbstractKafkaInputOper
>> atorMETA_GROUP
>> ssl.keystore.password=
>> ssl.truststore.location=/home_dir/client.truststore.jks
>> bootstrap.servers=
>> ssl.truststore.password=
>> ssl.keystore.location=/home_dir/server.keystore.jks
>> key.deserializer=org.apache.kafka.common.serialization.ByteA
>> rrayDeserializer
>>
>> 2017-08-01 04:57:40,290 INFO  utils.AppInfoParser
>> (AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0
>> 2017-08-01 04:57:40,291 INFO  utils.AppInfoParser
>> (AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a
>> 2017-08-01 04:57:41,306 INFO  kafka.AbstractKafkaPartitioner
>> (AbstractKafkaPartitioner.java:definePartitions(151)) - Partition change
>> detected:
>> 2017-08-01 04:57:41,307 INFO  kafka.AbstractKafkaPartitioner
>> (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 0
>> with assignment PartitionMeta{cluster='10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-21};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-22};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-23};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-24};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-18};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-19};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-20};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-14};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-25};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-26}
>> 2017-08-01 04:57:41,318 INFO  kafka.AbstractKafkaPartitioner
>> (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 1
>> with assignment PartitionMeta{cluster='10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-6};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-8};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-2};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-3};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-13};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-16};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-9};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-10};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-11};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-28}
>> 2017-08-01 04:57:41,322 INFO  kafka.AbstractKafkaPartitioner
>> (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 2
>> with assignment PartitionMeta{cluster='10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-5};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-7};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-17};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-1};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-4};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-29};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-15};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-0};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-27};PartitionMeta{cluster='
>> 10.66.137.93:9093',
>> topicPartition=firefly-apps-superapp-12}
>> 2017-08-01 04:57:41,365 INFO  zkclient.ZkEventThread
>> (ZkEventThread.java:run(64)) - Starting ZkClient event thread.
>> 2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper
>> (Environment.java:logEnv(100)) - Client
>> environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09
>> GMT
>> 2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper
>> (Environment.java:logEnv(100)) - Client
>> environment:host.name=brdn1351.target.com
>> 2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
>> (Environment.java:logEnv(100)) - Client environment:java.version=1.8.0_73
>> 2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
>> (Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle
>> Corporation
>> 2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
>> (Environment.java:logEnv(100)) - Client
>> environment:java.home=/usr/java/jdk1.8.0_73/jre
>> connectString=10.66.137.94:2181,10.66.137.95:2181,10.66.137.96:2181,
>> 10.66.137.97:2181,10.66.137.98:2181
>> sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6d64b553
>> 2017-08-01 04:57:41,388 INFO  zkclient.ZkClient
>> (ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state
>> SyncConnected
>> 2017-08-01 04:57:41,392 INFO  zookeeper.ClientCnxn
>> (ClientCnxn.java:logStartConnect(975)) - Opening socket connection to
>> server
>> 10.66.137.97/10.66.137.97:2181. Will not attempt to authenticate using
>> SASL
>> (unknown error)
>> 2017-08-01 04:57:41,393 INFO  zookeeper.ClientCnxn
>> (ClientCnxn.java:primeConnection(852)) - Socket connection established to
>> 10.66.137.97/10.66.137.97:2181, initiating session
>> 2017-08-01 04:57:41,445 INFO  zookeeper.ClientCnxn
>> (ClientCnxn.java:onConnected(1235)) - Session establishment complete on
>> server 10.66.137.97/10.66.137.97:2181, sessionid = 0x350dafbffc66af1,
>> negotiated timeout = 30000
>> 2017-08-01 04:57:41,447 INFO  zkclient.ZkClient
>> (ZkClient.java:processStateChanged(711)) - zookeeper state changed
>> (SyncConnected)
>> 2017-08-01 04:57:41,450 ERROR stram.StreamingAppMaster
>> (StreamingAppMaster.java:main(106)) - Exiting Application Master
>> java.lang.NoSuchMethodError:
>> kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/
>> zkclient/ZkClient;)Lscala/collection/Seq;
>>         at
>> com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(K
>> afkaMetadataUtil.java:117)
>>         at
>> com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(Kafk
>> aConsumer.java:139)
>>         at
>> com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.def
>> inePartitions(AbstractKafkaInputOperator.java:506)
>>         at
>> com.datatorrent.stram.plan.physical.PhysicalPlan.initPartiti
>> oning(PhysicalPlan.java:752)
>>         at
>> com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalO
>> perator(PhysicalPlan.java:1676)
>>         at
>> com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(Phys
>> icalPlan.java:378)
>>         at
>> com.datatorrent.stram.StreamingContainerManager.<init>(Strea
>> mingContainerManager.java:418)
>>         at
>> com.datatorrent.stram.StreamingContainerManager.getInstance(
>> StreamingContainerManager.java:3023)
>>         at
>> com.datatorrent.stram.StreamingAppMasterService.serviceInit(
>> StreamingAppMasterService.java:551)
>>         at org.apache.hadoop.service.AbstractService.init(AbstractServi
>> ce.java:163)
>>         at
>> com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMa
>> ster.java:102)
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-apex-users-list.
>> 78494.x6.nabble.com/How-to-consume-from-two-different-topics
>> -in-one-apex-application-tp1797p1801.html
>> Sent from the Apache Apex Users list mailing list archive at Nabble.com.
>>
>
>

Mime
View raw message