samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Garrett Barton <garrett.bar...@gmail.com>
Subject Re: Nothing occurs from collector.send()
Date Wed, 03 Jun 2015 14:10:40 GMT
Thanks all for the responses.
As you can probably tell from the job name I am doing a validation process
which is taking in json and testing for a ton of things.  This is why I am
using String serde's instead of Samza's built in json support, the json
could be malformed and I need to still operate on it.  My serde issues were
from the malformed json being sent around, switching to StringSerde solved
this.  I have not seen that behavior in SAMZA-608 yet.

here is my current config, I started with the hello-samza wiki stat config:

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactoryjob.name=validate-records

# YARN
yarn.package.path=hdfs://p934/user/bartong/realtime-validator.tar.gz

# Task
task.class=samza.RealtimeValidator
task.inputs=kafka.sample-raw-streamtask.window.ms=60000
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1

# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.long.class=org.apache.samza.serializers.LongSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory

# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=string
systems.kafka.samza.msg.serde=string
systems.kafka.streams.metrics-samza.samza.msg.serde=metrics
systems.kafka.consumer.zookeeper.connect=zk1:2181/
systems.kafka.consumer.auto.offset.reset=smallest
systems.kafka.producer.bootstrap.servers=ka1:6667

# Key-value storage
stores.validate-records.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.validate-records.changelog=kafka.validate-records-changelog
stores.validate-records.key.serde=string
stores.validate-records.msg.serde=long

# Normally, we'd leave this alone, but we have only one broker.
stores.validate-records.changelog.replication.factor=1

# Normally, we'd set this much higher, but we want things to look
snappy in the demo.
stores.validate-records.write.batch.size=0
stores.validate-records.object.cache.size=0



Some things I've noticed while testing:
setting auto.offset.reset=smallest is not taking me back to the start of
the streams.  Its like its being ignored.  I also don't see consumer
properties outputting to the logs, I do see producer props though.

On Tue, Jun 2, 2015 at 8:24 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Hi Garret,
>
> Regarding the serde issues that Yan mentioned, you can check
> https://issues.apache.org/jira/browse/SAMZA-608 and see if its description
> matches what you saw.
>
> Guozhang
>
> On Tue, Jun 2, 2015 at 4:56 PM, Yan Fang <yanfang724@gmail.com> wrote:
>
> > Hi Garrett,
> >
> > I guess you run into the serde issues as you mentioned. If you can check
> > the Samza log and show us, we will be more helpful. Also, maybe pasting
> the
> > config here (if you dont mind), we can help to see if you miss something.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang724@gmail.com
> >
> > On Tue, Jun 2, 2015 at 3:01 PM, Garrett Barton <garrett.barton@gmail.com
> >
> > wrote:
> >
> > > Greetings all,
> > >
> > >
> > >  I am trying to translate an existing workflow from MR into Samza. Thus
> > far
> > > everything is coded and kinks with deploying have been worked out.  My
> > task
> > > deploys into yarn (2.6.0), consumes records from Kafka (0.8.2.1) fine,
> > but
> > > no data from metrics and my output streams are showing up in Kafka.  I
> > see
> > > the metrics topic created in Kafka, but its empty (I have a counter
> > > counting records seen).
> > >
> > >  I have debug prints showing me that I am calling collector.send()
> which
> > is
> > > also wrapped in a catch for Throwable.  Nothing at all shows in the
> logs.
> > >
> > >  I do see the checkpoint topic being used, and incremented
> appropriately.
> > > So between that and consuming records in the first place I think the
> > > system.kafka is configured correctly.  I ran into serde issues with
> > > consumption and sending and fixed those too.
> > >
> > > Has anyone run into this kind of behavior?  Am hoping its a dumb config
> > > issue.
> > >
> > > V/R,
> > > ~Garrett
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message