Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8FD821040E for ; Mon, 13 Jan 2014 08:20:33 +0000 (UTC) Received: (qmail 55499 invoked by uid 500); 13 Jan 2014 08:17:44 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 55446 invoked by uid 500); 13 Jan 2014 08:17:23 -0000 Mailing-List: contact user-help@storm.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.incubator.apache.org Delivered-To: mailing list user@storm.incubator.apache.org Received: (qmail 55326 invoked by uid 99); 13 Jan 2014 08:16:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jan 2014 08:16:40 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of yayjsir@gmail.com designates 209.85.160.50 as permitted sender) Received: from [209.85.160.50] (HELO mail-pb0-f50.google.com) (209.85.160.50) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jan 2014 08:16:33 +0000 Received: by mail-pb0-f50.google.com with SMTP id rr13so6996726pbb.23 for ; Mon, 13 Jan 2014 00:16:12 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=from:content-type:message-id:mime-version:subject:date:references :to:in-reply-to; bh=I6tqInjaqbJYF/EXjgL4yPaVThbAsZCFA41lMbbYuAc=; b=IALQdjhgzC6XXZlzw1HLCw7hQbEmZ9knL4gunckfypTUaf7PPKJy4lDbh+YSUN+Nvh 04CTJIJgUOpDU/wcaREnFOsGqAnAQnZlVSuJETBxhWy4hdL9J4v8hIw2bxo7DLJvMwXG kTqyzR6CxSiQAV3MkZW9aUZTpcUR9P5W+U4NHTY1qVKSbAAaECrdSYJYtQFQCQwB6slm jRvnZp6efm7Pmds4y6i7qfq+aZSmY8qxrjULkC+hWQ6UoohOnwMIXNTutQsbc4WpszGg vBQXZu8AD0xXe3O+DFOakNJn/X+A+tQcAGcr0BFkpoeho2xbH649TNkAmEvQxTmdqwLW QLzw== X-Received: by 10.69.8.225 with SMTP id dn1mr28292546pbd.54.1389600972094; Mon, 13 Jan 2014 00:16:12 -0800 (PST) Received: from yglcdfy-pc.riant-cn.com ([125.71.203.77]) by mx.google.com with ESMTPSA id un5sm46329782pab.3.2014.01.13.00.16.08 for (version=TLSv1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Mon, 13 Jan 2014 00:16:11 -0800 (PST) From: Matt Content-Type: multipart/alternative; boundary="Apple-Mail=_704D9231-F184-42C8-9E48-955529951D58" Message-Id: <03B9A6E4-0C3E-435A-B5F7-452BDD46EAA6@gmail.com> Mime-Version: 1.0 (Mac OS X Mail 7.1 \(1827\)) Subject: Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null Date: Mon, 13 Jan 2014 16:16:03 +0800 References: <6DA859E2-3E2C-4F1F-BC58-3AE7428A2339@gmail.com> To: user@storm.incubator.apache.org In-Reply-To: X-Mailer: Apple Mail (2.1827) X-Virus-Checked: Checked by ClamAV on apache.org --Apple-Mail=_704D9231-F184-42C8-9E48-955529951D58 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=GB2312 Hi Edison, Yes, I have already resolved these problems. And thanks a lot for your = help. ------------- Matt =D4=DA 2014=C4=EA1=D4=C213=C8=D5=A3=AC=CF=C2=CE=E74:08=A3=ACEdison = =D0=B4=B5=C0=A3=BA > Hi Matt, >=20 > For the serialization problem, I feel the same with you. They could = use Kryo for a better code reuse. I guess this func is "just work", and = the contributors didn't hear any strong voice, so they just leave it = there at a lower priority? >=20 > BTW, in Trident, the Emitter interface requires a implementation of = "void refreshPartitions(List partitionResponsibilities);", = the comments saying this is used to manage things like connections to = brokers.=20 > In the implementation of TridentKafkaEmitter, of the project I shared = to you, it closes the connection to Kafka broker everytime when = refreshPartition method is called. So in the serverlog of Kafka, lots of = junk information saying socket close keeps coming. >=20 > In the code OpaquePartitionedTridentSpoutExecutor.emitBatch method, = clearly, storm will try to "refresh" the partition everytime when it = update coordinate meta, but why does it have to do this? Avoiding = connection lost or some? > Do you get any clue?=20 >=20 >=20 > 2014/1/13 Matt > HI Edison, >=20 > Thanks for your reply. It does help I think because it does work. But = I am still curious about why, because _partitionStates is also been = flushed when using storm-kakfk-plus. >=20 > At last, I realized that the root cause was not _partitionStates = flushed. The real reason is that I used my customized class to interpret = lastPartitionMeta. My spout is implementing = IOpaquePartitionedTridentSpout, while = storm-kafka-0.8-plus is implementing = IOpaquePartitionedTridentSpout. >=20 > The last template argument(Batch/Map) will be serialized and stored in = zookeeper. Storm will read last batch information from zookeeper if = it=A1=AFs not stored locally. But storm fulfills the serialization for = this instance by using json-simple = hardcoded(https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/= jvm/storm/trident/topology/state/TransactionalState.java#L52) and = json-simple doesn=A1=AFt support reflection, which means not supporting = customized class. >=20 > In other words, storm implicitly requires that batch information must = be Map, String or some others supported by json-simple. But no document = mentioned that. >=20 > And why not use Kryo to serialize/deserialize batch information, which = is widely used in storm already? >=20 > ------------- > Matt >=20 >=20 > =D4=DA 2014=C4=EA1=D4=C26=C8=D5=A3=AC=CF=C2=CE=E74:42=A3=ACEdison = =D0=B4=B5=C0=A3=BA >=20 >> check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus >>=20 >>=20 >> 2014/1/1 yayj >> Hi there, >>=20 >> I was going to implement a partitioned trident spout for Kafka based = on storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not = being developed and it=A1=AFs not corresponding to 0.9. But I found my = customized Emitter implementing = storm.trident.spout.IPartitionedTridentSpout.Emitter didn=A1=AFt work. >>=20 >> The reason I found was that lastPartitionMeta parameter of method = emitPartitionBatchNew was always null in ANY transaction. That=A1=AFs = not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew = describes it =A1=B0returns the metadata that can be used to reconstruct = this partition/batch in the = future.=A1=B1(https://github.com/nathanmarz/storm/blob/moved-to-apache/sto= rm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52) = Furthermore, my implementation is just similar to what storm-kafka = does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/s= rc/jvm/storm/kafka/trident/KafkaUtils.java#L55). >>=20 >> At last, I found the root cause is located in commit d6c2736(revamp = trident spout and partitioned trident spouts to support spouts where the = source can change). In this commit, = PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses = _savedCoordinatorMeta, which is returned by = Coordinator.getPartitionsForBatch() to check partition changing, which = is described in commit log. If changed, FIRST FLUSHING = _partitionStates(line 107: _partitionStates.clear()), which stores All = partition states, and then invoking Emitter.getOrderedPartitions() to = regenerate partition = information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/stor= m-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L1= 05) >>=20 >> Frankly speaking, I thinks it=A1=AFs too arbitrary. The semanteme of = the operation to _partitionStates becomes REPLACEMENT but I think it = should be updating, which means adding new ones, removing lost ones, and = KEEPING STABLE ONES. I don=A1=AFt think that it=A1=AFs logical that one = partition changes, all partitions in the same txid must reconstruct = their states. >>=20 >> So, I=A1=AFm wondering whether my comprehension is correct or not? = And is it feasible to send a pull request to = https://github.com/apache/incubator-storm ? >>=20 >>=20 >> ------------- >> Matt >>=20 >>=20 >>=20 >=20 >=20 --Apple-Mail=_704D9231-F184-42C8-9E48-955529951D58 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=GB2312 Hi = Edison,

Yes, I have already resolved these problems. = And thanks a lot for your help.

-------------
Matt


=D4=DA 2014=C4=EA1=D4=C213=C8=D5=A3=AC=CF=C2=CE=E74:08=A3=AC= Edison <xeseo2005@gmail.com> = =D0=B4=B5=C0=A3=BA

Hi Matt,

For the serialization problem, I = feel the same with you. They could use Kryo for a better code reuse. I = guess this func is "just work", and the contributors didn't hear any = strong voice, so they just leave it there at a lower priority?

BTW, in Trident, the Emitter interface requires = a implementation of "void refreshPartitions(List<Partition> = partitionResponsibilities);", the comments saying this is used to manage = things like connections to brokers. 
In the implementation of TridentKafkaEmitter, of the project I = shared to you, it closes the connection to Kafka broker everytime when = refreshPartition method is called. So in the serverlog of Kafka, lots of = junk information saying socket close keeps coming.

In the code = OpaquePartitionedTridentSpoutExecutor.emitBatch method, clearly, storm = will try to "refresh" the partition everytime when it update coordinate = meta, but why does it have to do this? Avoiding connection lost or = some?
Do you get any clue? 


2014/1/13 Matt = <yayjsir@gmail.com>
HI Edison,

Thanks = for your reply. It does help I think because it does work. But I am = still curious about why, because _partitionStates is also been flushed = when using storm-kakfk-plus.

At last, I realized that the root cause was not = _partitionStates flushed. The real reason is that I used my customized = class to interpret lastPartitionMeta. My spout is = implementing IOpaquePartitionedTridentSpout<Partitions, = Partition, Batch>, while storm-kafka-0.8-plus is = implementing IOpaquePartitionedTridentSpout<Partitions, = Partition, Map>.

The last template argument(Batch/Map) will be = serialized and stored in zookeeper. Storm will read last batch = information from zookeeper if it=A1=AFs not stored locally. But storm = fulfills the serialization for this instance by using json-simple = hardcoded(https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-c= ore/src/jvm/storm/trident/topology/state/TransactionalState.java#L52) = and json-simple doesn=A1=AFt support reflection, which means not = supporting customized class.

In other words, storm implicitly requires that batch = information must be Map, String or some others supported by json-simple. = But no document mentioned that.

And why not use = Kryo to serialize/deserialize batch information, which is widely used in = storm already?

-------------
Matt


=D4=DA 2014=C4=EA1=D4=C26=C8=D5=A3=AC=CF=C2=CE=E74:42=A3=ACE= dison <xeseo2005@gmail.com> = =D0=B4=B5=C0=A3=BA



2014/1/1 = yayj <yayjsir@gmail.com>
Hi = there,

I was going to implement a partitioned trident spout for Kafka = based on storm 0.9.0.1 recently because storm-contrib/storm-kafka seems = not being developed and it=A1=AFs not corresponding to 0.9. But I found = my customized Emitter = implementing storm.trident.spout.IPartitionedTridentSpout.Emitter = didn=A1=AFt work.

The reason I found was that lastPartitionMeta = parameter of method emitPartitionBatchNew was always null in ANY = transaction. That=A1=AFs not acceptable since the JavaDoc of = Emitter.emitPartitionBatchNew describes it =A1=B0returns the metadata = that can be used to reconstruct this partition/batch in the future.=A1=B1(= https://github.com/nathanmarz/storm/blob/moved-to-apache= /storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52<= /a>) Furthermore, my implementation is just similar to what storm-kafka = does(https://github.com/nathanmarz/storm-contrib/blob/master/= storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).

At last, I found the root cause is located in = commit d6c2736(revamp trident spout and partitioned trident = spouts to support spouts where the source can change). In this commit, = PartitionedTridentSpoutExecutor.Emitter.emitBatch() = uses _savedCoordinatorMeta, which is returned by = Coordinator.getPartitionsForBatch() to check partition changing, which is = described in commit log. If changed, FIRST = FLUSHING _partitionStates(line 107: = _partitionStates.clear()), which stores All partition states, and = then invoking Emitter.getOrderedPartitions() to regenerate partition = information.(https://github.com/nathanmarz/storm/blob/moved-to-apache= /storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.ja= va#L105)

Frankly speaking, I thinks it=A1=AF= s too arbitrary. The semanteme of the operation = to _partitionStates becomes REPLACEMENT but I think it should be = updating, which means adding new ones, removing lost ones, and KEEPING = STABLE ONES. I don=A1=AFt think that it=A1=AFs logical that one = partition changes, all partitions in the same txid must reconstruct = their states.

So, I=A1=AFm = wondering whether my comprehension is correct or not? And is = it feasible to send a pull request to https://github.com/apache/incubator-storm ?


-------------
Matt






= --Apple-Mail=_704D9231-F184-42C8-9E48-955529951D58--