Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 74FAC17284 for ; Thu, 5 Feb 2015 15:46:59 +0000 (UTC) Received: (qmail 13193 invoked by uid 500); 5 Feb 2015 15:46:58 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 13147 invoked by uid 500); 5 Feb 2015 15:46:58 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 13130 invoked by uid 99); 5 Feb 2015 15:46:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 15:46:58 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW X-Spam-Check-By: apache.org Received-SPF: error (athena.apache.org: local policy) Received: from [209.85.213.176] (HELO mail-ig0-f176.google.com) (209.85.213.176) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 15:46:52 +0000 Received: by mail-ig0-f176.google.com with SMTP id hl2so44290746igb.3 for ; Thu, 05 Feb 2015 07:45:26 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=x8mqmdbu12WelB1u8+9nwBBBYGdZ5AhpCzfGeRlu+v4=; b=OMVyCzpyVKevu2j5fp+a5zrjCS/a320jQyr/cmxCeywO6DsgeeuldqdjDMldX+/8uA hiXuJ831zQivig2NIX3lYBoE+Skv3j0CKdVTocbRexsQ2tUAs2TpM0d2ivs1KIOfDf+K XKBLnLO6hgiyP2szBprnuZ3GbIsLyd2ppHpyCcNren8NNeCLtqbMnNWT9FfZOx/xhh/I /GNabPASTAL9gP/lWHQJgA7dEHsFSHjdSdy2NNGpmgUJ5hVkZCyWDAGuJeqCug/m+sOU rdIpL0tDkQ+l3sPmguIE3iiXGkQB6V7VVaOpzBN7KYLayvQDImkDbeh+cntUw+D9tSK/ SkzQ== X-Gm-Message-State: ALoCoQkOgH21+n6Js46PZH1juZMClc20kefhZ2BboB/nWTHisbs9SvHvTvVv5bAOY1iXeiPEKQ0+ MIME-Version: 1.0 X-Received: by 10.50.32.7 with SMTP id e7mr9377934igi.21.1423151119896; Thu, 05 Feb 2015 07:45:19 -0800 (PST) Received: by 10.64.166.138 with HTTP; Thu, 5 Feb 2015 07:45:19 -0800 (PST) X-Originating-IP: [50.75.206.122] In-Reply-To: <49382c5c.9c21.14b586447ed.Coremail.mailforlearn@163.com> References: <49382c5c.9c21.14b586447ed.Coremail.mailforlearn@163.com> Date: Thu, 5 Feb 2015 10:45:19 -0500 Message-ID: Subject: Re: problems with kafkaspout From: Prajwal Tuladhar To: user@storm.apache.org Content-Type: multipart/alternative; boundary=047d7b10cbf1f6c84b050e593056 X-Virus-Checked: Checked by ClamAV on apache.org --047d7b10cbf1f6c84b050e593056 Content-Type: text/plain; charset=UTF-8 Seems like jar dependency issue. How are you configuring your app's jar dependencies (maven, gradle, sbt)? On Thu, Feb 5, 2015 at 1:19 AM, JunLuo wrote: > hi everyone, > In my topology, my kafkaspout has many problems and I don't know how > it happens > 1. the latency is very high , I have 10 kafkaspout and the average > latency is up to 10k ms > 2. Failed number is very high, the number of failed is about 50% of > emitted tuple. > 3. An error often occurs, the error info is : > > java.lang.NoClassDefFoundError: Could not initialize class > com.yammer.metrics.Metrics at > kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:52) > at > kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:25) > at > kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:26) > at > kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:37) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50) > at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:54) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) at > kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) at > storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:60) > at storm.kafka.PartitionManager.(PartitionManager.java:64) at > storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) at > storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) at > storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) at > backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at > clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) > > java.lang.ExceptionInInitializerError at > kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:52) > at > kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:25) > at > kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:26) > at > kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:37) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50) > at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:54) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) at > kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) at > storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:60) > at storm.kafka.PartitionManager.(PartitionManager.java:64) at > storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) at > storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) at > storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) at > backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at > clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Shutdown in progress at > java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66) at > java.lang.Runtime.addShutdownHook(Runtime.java:211) at > com.yammer.metrics.Metrics.(Metrics.java:21) ... > > > My storm version is 0.9.3 and kafka version is 2.9.2-0.8.1.1,and > kafkaspout is the external of storm 0.9.3. and my kafka topic have 20 > partitions. can anyone tell me how to fix it? > > thanks for any response. > > Jun. > > > > -- -- Praj --047d7b10cbf1f6c84b050e593056 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Seems like jar dependency issue.

How ar= e you configuring your app's jar dependencies (maven, gradle, sbt)?
=

On Thu, Feb= 5, 2015 at 1:19 AM, JunLuo <mailforlearn@163.com> wrote:=
hi everyone,
=C2=A0= =C2=A0=C2=A0 In my topology, my kafkaspout has many problems and I don'= t know how it happens
=C2=A0 1. the latency is very high , I have 10 kaf= kaspout and the average latency is up to 10k ms
=C2=A0 2. Failed number = is very high, the number of failed is about 50% of emitted tuple.
=C2=A0= 3. An error often occurs, the error info is :

=C2=A0 =C2=A0=C2=A0 java.lang.NoClassDefFoundError: Could not initialize class com.yammer.me= trics.Metrics at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetric= sGroup.scala:52) at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(= FetchRequestAndResponseStats.scala:25) at kafka.consumer.FetchRequestAndRes= ponseMetrics.<init>(FetchRequestAndResponseStats.scala:26) at kafka.c= onsumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseSt= ats.scala:37) at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonf= un$2.apply(FetchRequestAndResponseStats.scala:50) at kafka.consumer.FetchRe= questAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats= .scala:50) at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at kafka.consu= mer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(F= etchRequestAndResponseStats.scala:54) at kafka.consumer.SimpleConsumer.<= init>(SimpleConsumer.scala:39) at kafka.javaapi.consumer.SimpleConsumer.= <init>(SimpleConsumer.scala:34) at storm.kafka.DynamicPartitionConnec= tions.register(DynamicPartitionConnections.java:60) at storm.kafka.Partitio= nManager.<init>(PartitionManager.java:64) at storm.kafka.ZkCoordinato= r.refresh(ZkCoordinator.java:98) at storm.kafka.ZkCoordinator.getMyManagedP= artitions(ZkCoordinator.java:69) at storm.kafka.KafkaSpout.nextTuple(KafkaS= pout.java:135) at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417= .invoke(executor.clj:565) at backtype.storm.util$async_loop$fn__464.invoke(= util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(= Thread.java:745)

=C2=A0=C2=A0=C2=A0=C2=A0 java.lang.ExceptionInIniti= alizerError at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsG= roup.scala:52) at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(Fe= tchRequestAndResponseStats.scala:25) at kafka.consumer.FetchRequestAndRespo= nseMetrics.<init>(FetchRequestAndResponseStats.scala:26) at kafka.con= sumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseStat= s.scala:37) at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun= $2.apply(FetchRequestAndResponseStats.scala:50) at kafka.consumer.FetchRequ= estAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.s= cala:50) at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at kafka.consume= r.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(Fet= chRequestAndResponseStats.scala:54) at kafka.consumer.SimpleConsumer.<in= it>(SimpleConsumer.scala:39) at kafka.javaapi.consumer.SimpleConsumer.&l= t;init>(SimpleConsumer.scala:34) at storm.kafka.DynamicPartitionConnecti= ons.register(DynamicPartitionConnections.java:60) at storm.kafka.PartitionM= anager.<init>(PartitionManager.java:64) at storm.kafka.ZkCoordinator.= refresh(ZkCoordinator.java:98) at storm.kafka.ZkCoordinator.getMyManagedPar= titions(ZkCoordinator.java:69) at storm.kafka.KafkaSpout.nextTuple(KafkaSpo= ut.java:135) at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.i= nvoke(executor.clj:565) at backtype.storm.util$async_loop$fn__464.invoke(ut= il.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Th= read.java:745) Caused by: java.lang.IllegalStateException: Shutdown in prog= ress at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.jav= a:66) at java.lang.Runtime.addShutdownHook(Runtime.java:211) at com.yammer.= metrics.Metrics.<clinit>(Metrics.java:21) ...

=C2=A0=C2=A0=C2= =A0
=C2=A0=C2=A0 My storm version is 0.9.3 and kafka version is 2.9.2-0= .8.1.1,and kafkaspout is the external of storm 0.9.3. and my kafka topic ha= ve 20 partitions. can anyone tell me how to fix it?
    thanks for =
any response.

Jun.

=C2=A0=C2=A0



--
-- Praj
--047d7b10cbf1f6c84b050e593056--