storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kristopher Kane <kkane.l...@gmail.com>
Subject Re: Increasing worker parallelism decreases throughput and increases tuple timeout
Date Sun, 11 Sep 2016 17:31:20 GMT
This took a while as I could not get INFO logging to come out of the
serializer in Kryo.  The CachedSchemaRegistry and
EnhancedCachedSchmeaRegistry - in our raw scheme deserializer (with
serialization into Avro) is 100% cache after the initial load.  As you
said, the serializer with the IdentityHashMap in
org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74)
 looks
to be the problem - although I couldn't get it to log the cache misses for
me in the serializer itself.

Here is the evidence:

This thread dump is after the topology has been running for a few minutes.
No new schemas are being introduced but still opening 'HttpURLConnection'.
The deserializer bolt says all cache requests are a hit and are not going
to the SchemaRegistry

"Thread-3-disruptor-executor[8 8]-send-queue" - Thread t@49
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <670c88ea> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <2a8ccb96> (a sun.net.www.http.KeepAliveStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3336)
at
com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:503)
at
com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:129)
at
com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:224)
at
com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1244)
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:755)
at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2199)
at
io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:137)
at
io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.lookUpSubjectVersion(RestUtils.java:164)
at
org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersionFromRegistry(EnhancedCachedSchemaRegistry.java:80)
at
org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersion(EnhancedCachedSchemaRegistry.java:176)
- locked <6b561b0f> (a
org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry)
at
org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74)
at
org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(AbstractAvroSerializer.java:50)
at
org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(AbstractAvroSerializer.java:45)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486)
at
backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44)
at
backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44)
at
backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5543.invoke(worker.clj:142)
at
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274)
at
backtype.storm.disruptor$clojure_handler$reify__3196.onEvent(disruptor.clj:58)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.disruptor$consume_loop_STAR_$fn__3209.invoke(disruptor.clj:94)
at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475)
at clojure.lang.AFn.run(AFn.java:22)
at java.lang.Thread.run(Thread.java:745)


I've attached two screen shots of the JVisualVM CPU profilers.  The one
titled 'multiple-jvm' represents a topology with two workers and high CPU
usage on RestUtil.  The file title 'single-jvm' represents a topology with
one worker and no CPU usage for RestUtil.


I think this is pretty good evidence but I would love to know how to log
from the serializer running in Kryo as this would give me 100% proof.  Can
anyone help me understand what is going on there?

Thanks,

Kris

On Wed, Sep 7, 2016 at 12:11 PM, Aaron Niskodé-Dossett <dossett@gmail.com>
wrote:

> Let us know what you find, especially if the serializer needs to be more
> defensive to ensure proper caching.
>
> On Tue, Sep 6, 2016 at 8:45 AM Kristopher Kane <kkane.list@gmail.com>
> wrote:
>
>> Come to think of it, I did see RestUtils rank some what higher in the
>> visualvm CPU profiler but did not give it the attention it deserved.
>>
>> On Tue, Sep 6, 2016 at 9:39 AM, Aaron Niskodé-Dossett <dossett@gmail.com>
>> wrote:
>>
>>> Hi Kris,
>>>
>>> One possibility is that the Serializer isn't actually caching the schema
>>> <-> id mappings and is hitting the schema registry every time.  The call
to
>>> register() in getFingerprint() [1] in particular can be a finicky since the
>>> cache is ultimately in an IDENTITY hash map, not a regular old hashmap[2].
>>> I'm familiar with the Avro deserializer you're using and though it
>>> accounted for this, but perhaps not.
>>>
>>> You could add timing information to the getFingerprint() and getSchema()
>>> calls in ConfluentAvroSerializer.  If the results indicate cache misses,
>>> that's probably your culprit.
>>>
>>> Best, Aaron
>>>
>>>
>>> [1] https://github.com/apache/storm/blob/master/external/
>>> storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/
>>> ConfluentAvroSerializer.java#L66
>>> [2] https://github.com/confluentinc/schema-registry/
>>> blob/v1.0/client/src/main/java/io/confluent/kafka/schemaregistry/client/
>>> CachedSchemaRegistryClient.java#L79
>>>
>>> On Tue, Sep 6, 2016 at 7:40 AM Kristopher Kane <kkane.list@gmail.com>
>>> wrote:
>>>
>>>> Hi everyone.
>>>>
>>>> I have a simple topology that uses the Avro serializer (
>>>> https://github.com/apache/storm/blob/master/external/
>>>> storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/
>>>> ConfluentAvroSerializer.java)  and writes to Elasticsearch.
>>>>
>>>> The topology is like this:
>>>>
>>>> Kafka (raw scheme) -> Avro deserializer -> Elasticsearch
>>>>
>>>> This topology runs well with one worker, however, once I add one more
>>>> worker (total of two) and change nothing else, the topology throughput
>>>> drops and tuples start timing out.
>>>>
>>>> I've attached visualvm/jstatd to the workers when in multi worker mode
>>>> - and added some jmx configs to the worker opts - but I am unable to see
>>>> anything glaring.
>>>>
>>>> I've never seen Storm act this way but have also never worked with a
>>>> custom serializer so assume that it is the culprit but I cannot explain
>>>> why.
>>>>
>>>> Any pointers would be appreciated.
>>>>
>>>> Kris
>>>>
>>>>
>>>>
>>>>
>>>>
>>

Mime
View raw message