samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Garrett Barton <garrett.bar...@gmail.com>
Subject Re: Nothing occurs from collector.send()
Date Wed, 03 Jun 2015 16:06:35 GMT
It must have been junk data. I started using a new topic for everything
(metrics/source/dest) and data is flowing fine now.  When I switch it back
to the other topics I get the hung behavior with nothing erroring out in
the logs.

On Wed, Jun 3, 2015 at 10:10 AM, Garrett Barton <garrett.barton@gmail.com>
wrote:

> Thanks all for the responses.
> As you can probably tell from the job name I am doing a validation process
> which is taking in json and testing for a ton of things.  This is why I am
> using String serde's instead of Samza's built in json support, the json
> could be malformed and I need to still operate on it.  My serde issues were
> from the malformed json being sent around, switching to StringSerde solved
> this.  I have not seen that behavior in SAMZA-608 yet.
>
> here is my current config, I started with the hello-samza wiki stat config:
>
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactoryjob.name=validate-records
>
> # YARN
> yarn.package.path=hdfs://p934/user/bartong/realtime-validator.tar.gz
>
> # Task
> task.class=samza.RealtimeValidator
> task.inputs=kafka.sample-raw-streamtask.window.ms=60000
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> task.checkpoint.system=kafka
> # Normally, this would be 3, but we have only one broker.
> task.checkpoint.replication.factor=1
>
> # Serializers
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> serializers.registry.long.class=org.apache.samza.serializers.LongSerdeFactory
> serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
>
> # Systems
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.key.serde=string
> systems.kafka.samza.msg.serde=string
> systems.kafka.streams.metrics-samza.samza.msg.serde=metrics
> systems.kafka.consumer.zookeeper.connect=zk1:2181/
> systems.kafka.consumer.auto.offset.reset=smallest
> systems.kafka.producer.bootstrap.servers=ka1:6667
>
> # Key-value storage
> stores.validate-records.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
> stores.validate-records.changelog=kafka.validate-records-changelog
> stores.validate-records.key.serde=string
> stores.validate-records.msg.serde=long
>
> # Normally, we'd leave this alone, but we have only one broker.
> stores.validate-records.changelog.replication.factor=1
>
> # Normally, we'd set this much higher, but we want things to look snappy in the demo.
> stores.validate-records.write.batch.size=0
> stores.validate-records.object.cache.size=0
>
>
>
> Some things I've noticed while testing:
> setting auto.offset.reset=smallest is not taking me back to the start of
> the streams.  Its like its being ignored.  I also don't see consumer
> properties outputting to the logs, I do see producer props though.
>
> On Tue, Jun 2, 2015 at 8:24 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> Hi Garret,
>>
>> Regarding the serde issues that Yan mentioned, you can check
>> https://issues.apache.org/jira/browse/SAMZA-608 and see if its
>> description
>> matches what you saw.
>>
>> Guozhang
>>
>> On Tue, Jun 2, 2015 at 4:56 PM, Yan Fang <yanfang724@gmail.com> wrote:
>>
>> > Hi Garrett,
>> >
>> > I guess you run into the serde issues as you mentioned. If you can check
>> > the Samza log and show us, we will be more helpful. Also, maybe pasting
>> the
>> > config here (if you dont mind), we can help to see if you miss
>> something.
>> >
>> > Thanks,
>> >
>> > Fang, Yan
>> > yanfang724@gmail.com
>> >
>> > On Tue, Jun 2, 2015 at 3:01 PM, Garrett Barton <
>> garrett.barton@gmail.com>
>> > wrote:
>> >
>> > > Greetings all,
>> > >
>> > >
>> > >  I am trying to translate an existing workflow from MR into Samza.
>> Thus
>> > far
>> > > everything is coded and kinks with deploying have been worked out.  My
>> > task
>> > > deploys into yarn (2.6.0), consumes records from Kafka (0.8.2.1) fine,
>> > but
>> > > no data from metrics and my output streams are showing up in Kafka.  I
>> > see
>> > > the metrics topic created in Kafka, but its empty (I have a counter
>> > > counting records seen).
>> > >
>> > >  I have debug prints showing me that I am calling collector.send()
>> which
>> > is
>> > > also wrapped in a catch for Throwable.  Nothing at all shows in the
>> logs.
>> > >
>> > >  I do see the checkpoint topic being used, and incremented
>> appropriately.
>> > > So between that and consuming records in the first place I think the
>> > > system.kafka is configured correctly.  I ran into serde issues with
>> > > consumption and sending and fixed those too.
>> > >
>> > > Has anyone run into this kind of behavior?  Am hoping its a dumb
>> config
>> > > issue.
>> > >
>> > > V/R,
>> > > ~Garrett
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message