samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Renato Marroquín Mogrovejo <renatoj.marroq...@gmail.com>
Subject Re: How to create partitions from input
Date Wed, 05 Nov 2014 16:15:32 GMT
Hi Chris,

This is what my config file looks like:

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=order-feed
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
task.class=samza.examples.order.task.OrderFeedStreamTask
task.inputs=order.order
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Wikipedia System
systems.order.samza.factory=samza.examples.order.system.OrderSystemFactory

The complete file is in here
https://github.com/renato2099/hello-samza/blob/master/samza-job-package/src/main/config/order-feed.properties
Thanks in advance for the help.


Renato M.

2014-11-05 17:06 GMT+01:00 Chris Riccomini <criccomini@linkedin.com.invalid
>:
>
> Hey Renato,
>
> Could you send the config for your job?
>
> The stack trace is indicating that you are using a system "task.inputs"
> that isn't defined in the configuration. For example, if you have:
>
>   task.inputs=kafka.my-topic
>
> But, you haven't defined a "kafka" system (systems.kafka.*) then you will
> see this exception.
>
> Cheers,
> Chris
>
> On 11/5/14 6:14 AM, "Renato Marroquín Mogrovejo"
> <renatoj.marroquin@gmail.com> wrote:
>
> >Hi Yang,
> >
> >I tried setting the task.input to have the same key as the system's name,
> >but I keep on getting error while trying to run it:
> >
> >2014-11-05 09:42:07 OffsetManager [INFO] Successfully loaded starting
> >offsets: Map(SystemStreamPartition [partition=Partition [partition=0],
> >system=order, stream=order] -> null)
> >2014-11-05 09:42:07 SamzaContainer [INFO] Starting task instance stores.
> >2014-11-05 09:42:07 SamzaContainer [INFO] Initializing stream tasks.
> >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances
> >with producers.
> >2014-11-05 09:42:07 SamzaContainer [INFO] Starting producer multiplexer.
> >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances
> >with consumers.
> >2014-11-05 09:42:07 SamzaContainer [ERROR] Caught exception in process
> >loop.
> >org.apache.samza.system.SystemConsumersException: can't register
> >order's consumer.
> >       at
>
>org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:164
> >)
> >       at
> >org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.apply
> >(TaskInstance.scala:128)
> >       at
> >org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.apply
> >(TaskInstance.scala:124)
> >       at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> >       at
>
>org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.sca
> >la:124)
> >       at
> >org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.apply(
> >SamzaContainer.scala:577)
> >       at
> >org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.apply(
> >SamzaContainer.scala:577)
> >       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >       at
> >scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> >       at
>
>org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.sc
> >ala:577)
> >       at
> >org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:501)
> >       at
> >org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81)
> >       at
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >Caused by: java.util.NoSuchElementException: key not found: order
> >       at scala.collection.MapLike$class.default(MapLike.scala:228)
> >       at scala.collection.AbstractMap.default(Map.scala:58)
> >       at scala.collection.MapLike$class.apply(MapLike.scala:141)
> >       at scala.collection.AbstractMap.apply(Map.scala:58)
> >       at
>
>org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:165
> >)
> >       ... 13 more
> >
> >
> >
> >Maybe it has to do with me creating the partitions on the start method ?
I
> >guess the register method gets called first, then the start and finally
> >the
> >stop?
> >Any suggestions?  Thank you very much for your help!
> >
> >
> >Renato M.
> >
> >2014-11-03 18:57 GMT+01:00 Yan Fang <yanfang724@gmail.com>:
> >
> >> Hi Renato,
> >>
> >> 1) In the task.inputs option you told me that I should use something
> >>like
> >> "ordersystem.order", I guess I have to use ordersystem because of the
> >> factory name "OrderSystemFactory"? I tried using "order.order" but I
> >>got an
> >> <o.a.s.s.SystemConsumersException: can't register order's consumer> So
I
> >> imagine there is a naming convention for classes, inputs, and
factories?
> >>
> >>   1) There is no restriction in the system name. I just used that as an
> >> example. As long as the system name in task.inputs is the same as the
> >> system name in systems.%systemname.samza.factory. For example, in the
> >> following properties, the two bold names should be the same.
> >>      task.inputs=*foosystem*.tableName, systems.*foosystem*
> >> .samza.factory=samza.examples.wikipedia.system.OrderSystemFactory
> >>
> >> In your case, the task.inputs=*order*.order , because systems.*order*
> >> .samza.factory=samza.examples.order.system.OrderSystemFactory
> >>
> >> 2) I tried using the "ordersystem.order" name but I kept on getting the
> >> NoPartitions exception
> >>
> >>   2) The code in your previous register method should be ok.
> >>
> >> 3) The BlockingEnvelopeMap has a put method to put the incoming
messages
> >> into the partition, and I am putting the table values on the start
> >>method,
> >> is this a bad practice?
> >>
> >>   3) It depends. Since the start method is only called once, the system
> >> will only read the table once and put all the records into the
> >> BlockingEnvelop. For the testing purpose, I think it is fine. You can
> >>have
> >> a look at our filesystem
> >> <
> >>
> >>
https://github.com/apache/incubator-samza/blob/master/samza-core/src/main
> >>/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
> >> >,
> >> which uses a thread to monitor the file.
> >>
> >> 4) WikipediaFeedStreamTask takes an envelope object and puts that to
> >>Kafka,
> >> but where does it get the envelope from? the consumer right?
> >>
> >>   4) Yes, all the messages are from the put method in the consumer.
> >>
> >> Thanks,
> >>
> >> Fang, Yan
> >> yanfang724@gmail.com
> >> +1 (206) 849-4108
> >>
> >> On Mon, Nov 3, 2014 at 1:38 AM, Renato Marroquín Mogrovejo <
> >> renatoj.marroquin@gmail.com> wrote:
> >>
> >> > Thanks for the help Yang!
> >> > I think I understand more things now, but I also have more questions
> >>:)
> >> >
> >> > 1) In the task.inputs option you told me that I should use something
> >>like
> >> > "ordersystem.order", I guess I have to use ordersystem because of the
> >> > factory name "OrderSystemFactory"? I tried using "order.order" but I
> >>got
> >> an
> >> > <o.a.s.s.SystemConsumersException: can't register order's consumer>
> >>So I
> >> > imagine there is a naming convention for classes, inputs, and
> >>factories?
> >> > 2) I tried using the "ordersystem.order" name but I kept on getting
> >>the
> >> > NoPartitions exception
> >> > 3) The BlockingEnvelopeMap has a put method to put the incoming
> >>messages
> >> > into the partition, and I am putting the table values on the start
> >> method,
> >> > is this a bad practice?
> >> > <
> >> >
> >> >
> >>
> >>
https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src
> >>/main/java/samza/examples/order/system/OrderConsumer.java
> >> > >
> >> > 4) WikipediaFeedStreamTask takes an envelope object and puts that to
> >> Kafka,
> >> > but where does it get the envelope from? the consumer right?
> >> >
> >> > Thanks again Yang!
> >> >
> >> >
> >> > Renato M.
> >> >
> >> > 2014-11-03 3:18 GMT+01:00 Yan Fang <yanfang724@gmail.com>:
> >> >
> >> > > Hi Renato,
> >> > >
> >> > > on the hello-samza example for each new incoming message which
> >>belongs
> >> > > to a channel, a new partition is created, right?
> >> > >
> >> > > -- In WikipediaFeed job of hello-samza, each channel actually is
> >> treated
> >> > as
> >> > > one stream of the Wikipedia System. Each stream has on partition 0.
> >> This
> >> > is
> >> > > the code:
> >> > > SystemStreamPartition systemStreamPartition = new
> >> > > SystemStreamPartition(systemName,
> >> > > event.getChannel(), new Partition(0);
> >> > >
> >> > > So in your code, I think each table could be thought as one stream,
> >> with
> >> > > only partition 0. So it will be like
> >>task.input=ordersystem.tableName
> >> > >
> >> > > -- Then you maybe confused by what happened in Kafka. All the
> >>messages
> >> > sent
> >> > > to  wikipedia-raw topic is done in WikipediaFeedStreamTask
> >> > > <
> >> > >
> >> >
> >>
> >>
https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src
> >>/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
> >> > > >
> >> > > .
> >> > >
> >> > > Exception in thread "main" org.apache.samza.SamzaException: No
> >> partitions
> >> > > for this task. Can't run a task without partition assignments. It's
> >> > likely
> >> > > that the partition manager for this system doesn't know about the
> >> stream
> >> > > you're trying to read.
> >> > > at
> >> >
> >>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:77)
> >> > > at
> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >> > >
> >> > > -- Is the stream registered to the system?
> >> > > SystemStreamPartition systemStreamPartition = new
> >> > > SystemStreamPartition(systemName,
> >> > > "order", new Partition(0);
> >> > > Is the "order" registered ? such as task.input=ordersystem.order ?
> >> > >
> >> > > Let me know if you have other questions.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Fang, Yan
> >> > > yanfang724@gmail.com
> >> > > +1 (206) 849-4108
> >> > >
> >> > > On Sun, Nov 2, 2014 at 4:56 AM, Renato Marroquín Mogrovejo <
> >> > > renatoj.marroquin@gmail.com> wrote:
> >> > >
> >> > > > Hello Samza team,
> >> > > >
> >> > > >
> >> > > > I am trying to modify the hello-samza example application to
> >>replay
> >> > > events
> >> > > > which are in a table. But I am having some troubles.
> >> > > > So on the hello-samza example for each new incoming message which
> >> > belongs
> >> > > > to a channel, a new partition is created, right?
> >> > > > Now in my case, how (where) do I create these partitions? I
create
> >> them
> >> > > in
> >> > > > [1] but I am almost sure that is wrong because I keep getting
the
> >> > > exception
> >> > > > saying that there are no partitions for this task. I mean ideally
> >>I
> >> > would
> >> > > > like to create partitions based on the keys I am reading from
the
> >> > table.
> >> > > > Could anybody help me on this task please? Many thanks in
advance!
> >> > > >
> >> > > >
> >> > > > Renato M.
> >> > > >
> >> > > >
> >> > > > Exception in thread "main" org.apache.samza.SamzaException: No
> >> > partitions
> >> > > > for this task. Can't run a task without partition assignments.
> >>It's
> >> > > likely
> >> > > > that the partition manager for this system doesn't know about
the
> >> > stream
> >> > > > you're trying to read.
> >> > > > at
> >> > >
> >>
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:77)
> >> > > > at
> >> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >> > > >
> >> > > >
> >> > > > [1]
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> >>
https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src
> >>/main/java/samza/examples/order/system/OrderConsumer.java#L47
> >> > > >
> >> > >
> >> >
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message