samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@linkedin.com.INVALID>
Subject Re: How to create partitions from input
Date Wed, 05 Nov 2014 21:53:53 GMT
Hey Renato,

It seems to me that your consumer is not reading any messages. This seems
concerning:

2014-11-05 22:28:08 OrderFeed [ERROR] Communications link failure

The last packet sent successfully to the server was 0 milliseconds
ago. The driver has not received any packets from the server.


Regarding creating a new partition, you shouldn't need to do this, but I
don't think it will cause any problems. Since your SystemFactory is using
a single partition SystemAdmin, Samza should only register partition 0
anyway.

Cheers,
Chirs

On 11/5/14 1:48 PM, "Renato Marroquín Mogrovejo"
<renatoj.marroquin@gmail.com> wrote:

>Hi Chris,
>
>I found out it was a silly mistake but as soon as I saw the logs, it was
>easy to take it down.
>Now my consumer gets this and does not do any progress. I guess the
>database records are not getting into the partition, could this be because
>I am creating my partition every time the register method is called?
>Thanks again for the help! It is very much appreciated.
>
>
>Renato M.
>
>2014-11-05 22:28:07 MetricsRegistryMap [DEBUG] Creating new counter
>org.apache.samza.system.SystemConsumersMetrics
>order-messages-per-poll.
>2014-11-05 22:28:07 MetricsRegistryMap [DEBUG] Add new counter
>org.apache.samza.system.SystemConsumersMetrics order-messages-per-poll
>0.
>2014-11-05 22:28:08 OrderFeed [ERROR] Communications link failure
>
>The last packet sent successfully to the server was 0 milliseconds
>ago. The driver has not received any packets from the server.
>2014-11-05 22:28:08 MetricsRegistryMap [DEBUG] Adding new gauge
>org.apache.samza.system.chooser.RoundRobinChooserMetrics
>buffered-messages 0.
>2014-11-05 22:28:08 SamzaContainer [INFO] Entering run loop.
>2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null.
>2014-11-05 22:28:08 SystemConsumers [DEBUG] Refreshing chooser with
>new messages.
>2014-11-05 22:28:08 SystemConsumers [DEBUG] Polling system consumer: order
>2014-11-05 22:28:08 SystemConsumers [DEBUG] Getting fetch map for system:
>order
>2014-11-05 22:28:08 SystemConsumers [DEBUG] Fetching:
>Map(SystemStreamPartition [partition=Partition [partition=0],
>system=order, stream=order] -> 10000)
>2014-11-05 22:28:08 SystemConsumers [DEBUG] Got incoming message
>envelopes: []
>2014-11-05 22:28:08 TaskStorageManager [DEBUG] Flushing stores.
>2014-11-05 22:28:08 SystemProducers [DEBUG] Flushing source: Partition-0
>2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Flushing buffer with
>size: 0.
>2014-11-05 22:28:08 KafkaSystemProducer [INFO] Creating a new producer
>for system kafka.
>2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Created a new producer
>for system kafka.
>2014-11-05 22:28:08 DefaultEventHandler [DEBUG] Handling 0 events
>2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Flushed buffer.
>2014-11-05 22:28:08 OffsetManager [DEBUG] Skipping checkpointing for
>partition Partition [partition=0] because no checkpoint manager is
>defined.
>2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null.
>2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null.
>
>
>2014-11-05 18:17 GMT+01:00 Chris Riccomini
><criccomini@linkedin.com.invalid>
>:
>
>> Hey Renato,
>>
>> Here's your problem:
>>
>> 2014-11-05 15:10:14 SamzaContainer$ [INFO] Failed to create a consumer
>>for
>> order, so skipping.
>> 2014-11-05 15:10:14 SamzaContainer$ [INFO] Got system consumers: Set()
>>
>> Samza is unable to instantiate your SystemConsumer. Turn the log-level
>>to
>> debug, so you can see the full stack trace.
>>
>> Cheers,
>> Chris
>>
>> From: Renato Marroquín Mogrovejo <renatoj.marroquin@gmail.com<mailto:
>> renatoj.marroquin@gmail.com>>
>> Reply-To: "dev@samza.incubator.apache.org<mailto:
>> dev@samza.incubator.apache.org>" <dev@samza.incubator.apache.org<mailto:
>> dev@samza.incubator.apache.org>>
>> Date: Wednesday, November 5, 2014 9:14 AM
>> To: 
>>"dev@samza.incubator.apache.org<mailto:dev@samza.incubator.apache.org>"
>> <dev@samza.incubator.apache.org<mailto:dev@samza.incubator.apache.org>>
>> Subject: Re: How to create partitions from input
>>
>> Hi Chris,
>>
>> I am attaching them. Please let me know if they go through if not I can
>> try putting them somewhere else.
>> Many thanks again!
>>
>>
>> Renato M.
>>
>> 2014-11-05 17:42 GMT+01:00 Chris Riccomini
>><criccomini@linkedin.com.invalid
>> <mailto:criccomini@linkedin.com.invalid>>:
>> Hey Renato,
>>
>> It looks like your config is good. I'm wondering if your consumer is
>> failing to get created. I'm looking for this line in the logs:
>>
>>   "Failed to create a consumer for order, so skipping"
>>
>>
>> If the consumer creation fails, Samza will continue on, and assume that
>> you won't use the consumer, but if you do, it could lead to an exception
>> like you have.
>>
>> Can you attach the logs to the job?
>>
>>
>> Cheers,
>> Chris
>>
>> On 11/5/14 8:15 AM, "Renato Marroquín Mogrovejo"
>> <renatoj.marroquin@gmail.com<mailto:renatoj.marroquin@gmail.com>> wrote:
>>
>> >Hi Chris,
>> >
>> >This is what my config file looks like:
>> >
>> ># Job
>> >job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> >job.name<http://job.name>=order-feed
>> ># YARN
>> 
>>>yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.v
>>>er
>> >sion}-dist.tar.gz
>> ># Task
>> >task.class=samza.examples.order.task.OrderFeedStreamTask
>> >task.inputs=order.order
>> ># Serializers
>> 
>>>serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFa
>>>ct
>> >ory
>> ># Wikipedia System
>> 
>>>systems.order.samza.factory=samza.examples.order.system.OrderSystemFacto
>>>ry
>> >
>> >The complete file is in here
>> >
>> 
>>https://github.com/renato2099/hello-samza/blob/master/samza-job-package/s
>>r
>> >c/main/config/order-feed.properties
>> >Thanks in advance for the help.
>> >
>> >
>> >Renato M.
>> >
>> >2014-11-05 17:06 GMT+01:00 Chris Riccomini
>> 
>>><criccomini@linkedin.com.invalid<mailto:criccomini@linkedin.com.invalid>
>> >>:
>> >>
>> >> Hey Renato,
>> >>
>> >> Could you send the config for your job?
>> >>
>> >> The stack trace is indicating that you are using a system
>>"task.inputs"
>> >> that isn't defined in the configuration. For example, if you have:
>> >>
>> >>   task.inputs=kafka.my-topic
>> >>
>> >> But, you haven't defined a "kafka" system (systems.kafka.*) then you
>> >>will
>> >> see this exception.
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 11/5/14 6:14 AM, "Renato Marroquín Mogrovejo"
>> >> <renatoj.marroquin@gmail.com<mailto:renatoj.marroquin@gmail.com>>
>> wrote:
>> >>
>> >> >Hi Yang,
>> >> >
>> >> >I tried setting the task.input to have the same key as the system's
>> >>name,
>> >> >but I keep on getting error while trying to run it:
>> >> >
>> >> >2014-11-05 09:42:07 OffsetManager [INFO] Successfully loaded
>>starting
>> >> >offsets: Map(SystemStreamPartition [partition=Partition
>>[partition=0],
>> >> >system=order, stream=order] -> null)
>> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Starting task instance
>> >>stores.
>> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Initializing stream tasks.
>> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances
>> >> >with producers.
>> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Starting producer
>> >>multiplexer.
>> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances
>> >> >with consumers.
>> >> >2014-11-05 09:42:07 SamzaContainer [ERROR] Caught exception in
>>process
>> >> >loop.
>> >> >org.apache.samza.system.SystemConsumersException: can't register
>> >> >order's consumer.
>> >> >       at
>> >>
>> 
>>>>org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:
>>>>16
>> >>4
>> >> >)
>> >> >       at
>> >>
>> 
>>>>>org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.ap
>>>>>pl
>> >>>y
>> >> >(TaskInstance.scala:128)
>> >> >       at
>> >>
>> 
>>>>>org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.ap
>>>>>pl
>> >>>y
>> >> >(TaskInstance.scala:124)
>> >> >       at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>> >> >       at
>> >>
>> 
>>>>org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.
>>>>sc
>> >>a
>> >> >la:124)
>> >> >       at
>> >>
>> 
>>>>>org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.app
>>>>>ly
>> >>>(
>> >> >SamzaContainer.scala:577)
>> >> >       at
>> >>
>> 
>>>>>org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.app
>>>>>ly
>> >>>(
>> >> >SamzaContainer.scala:577)
>> >> >       at 
>>scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> >       at
>> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >       at
>> >>
>> 
>>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:2
>>>>>06
>> >>>)
>> >> >       at
>> >>
>> 
>>>>org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer
>>>>.s
>> >>c
>> >> >ala:577)
>> >> >       at
>> >> 
>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:501)
>> >> >       at
>> >>
>> 
>>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:8
>>>>>1)
>> >> >       at
>> >org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >> >Caused by: java.util.NoSuchElementException: key not found: order
>> >> >       at scala.collection.MapLike$class.default(MapLike.scala:228)
>> >> >       at scala.collection.AbstractMap.default(Map.scala:58)
>> >> >       at scala.collection.MapLike$class.apply(MapLike.scala:141)
>> >> >       at scala.collection.AbstractMap.apply(Map.scala:58)
>> >> >       at
>> >>
>> 
>>>>org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:
>>>>16
>> >>5
>> >> >)
>> >> >       ... 13 more
>> >> >
>> >> >
>> >> >
>> >> >Maybe it has to do with me creating the partitions on the start
>>method
>> >>?
>> >I
>> >> >guess the register method gets called first, then the start and
>>finally
>> >> >the
>> >> >stop?
>> >> >Any suggestions?  Thank you very much for your help!
>> >> >
>> >> >
>> >> >Renato M.
>> >> >
>> >> >2014-11-03 18:57 GMT+01:00 Yan Fang <yanfang724@gmail.com<mailto:
>> yanfang724@gmail.com>>:
>> >> >
>> >> >> Hi Renato,
>> >> >>
>> >> >> 1) In the task.inputs option you told me that I should use
>>something
>> >> >>like
>> >> >> "ordersystem.order", I guess I have to use ordersystem because
of
>>the
>> >> >> factory name "OrderSystemFactory"? I tried using "order.order"
>>but I
>> >> >>got an
>> >> >> <o.a.s.s.SystemConsumersException: can't register order's
>>consumer>
>> >>So
>> >I
>> >> >> imagine there is a naming convention for classes, inputs, and
>> >factories?
>> >> >>
>> >> >>   1) There is no restriction in the system name. I just used that
>>as
>> >>an
>> >> >> example. As long as the system name in task.inputs is the same
as
>>the
>> >> >> system name in systems.%systemname.samza.factory. For example,
in
>>the
>> >> >> following properties, the two bold names should be the same.
>> >> >>      task.inputs=*foosystem*.tableName, systems.*foosystem*
>> >> >> .samza.factory=samza.examples.wikipedia.system.OrderSystemFactory
>> >> >>
>> >> >> In your case, the task.inputs=*order*.order , because
>>systems.*order*
>> >> >> .samza.factory=samza.examples.order.system.OrderSystemFactory
>> >> >>
>> >> >> 2) I tried using the "ordersystem.order" name but I kept on
>>getting
>> >>the
>> >> >> NoPartitions exception
>> >> >>
>> >> >>   2) The code in your previous register method should be ok.
>> >> >>
>> >> >> 3) The BlockingEnvelopeMap has a put method to put the incoming
>> >messages
>> >> >> into the partition, and I am putting the table values on the start
>> >> >>method,
>> >> >> is this a bad practice?
>> >> >>
>> >> >>   3) It depends. Since the start method is only called once, the
>> >>system
>> >> >> will only read the table once and put all the records into the
>> >> >> BlockingEnvelop. For the testing purpose, I think it is fine. You
>>can
>> >> >>have
>> >> >> a look at our filesystem
>> >> >> <
>> >> >>
>> >> >>
>> 
>>>https://github.com/apache/incubator-samza/blob/master/samza-core/src/mai
>>>n
>> >>
>> 
>>>>>>/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.sc
>>>>>>al
>> >>>>a
>> >> >> >,
>> >> >> which uses a thread to monitor the file.
>> >> >>
>> >> >> 4) WikipediaFeedStreamTask takes an envelope object and puts that
>>to
>> >> >>Kafka,
>> >> >> but where does it get the envelope from? the consumer right?
>> >> >>
>> >> >>   4) Yes, all the messages are from the put method in the
>>consumer.
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Fang, Yan
>> >> >> yanfang724@gmail.com<mailto:yanfang724@gmail.com>
>> >> >> +1 (206) 849-4108<tel:%2B1%20%28206%29%20849-4108>
>> >> >>
>> >> >> On Mon, Nov 3, 2014 at 1:38 AM, Renato Marroquín Mogrovejo <
>> >> >> renatoj.marroquin@gmail.com<mailto:renatoj.marroquin@gmail.com>>
>> wrote:
>> >> >>
>> >> >> > Thanks for the help Yang!
>> >> >> > I think I understand more things now, but I also have more
>> >>questions
>> >> >>:)
>> >> >> >
>> >> >> > 1) In the task.inputs option you told me that I should use
>> >>something
>> >> >>like
>> >> >> > "ordersystem.order", I guess I have to use ordersystem because
>>of
>> >>the
>> >> >> > factory name "OrderSystemFactory"? I tried using "order.order"
>>but
>> >>I
>> >> >>got
>> >> >> an
>> >> >> > <o.a.s.s.SystemConsumersException: can't register order's
>>consumer>
>> >> >>So I
>> >> >> > imagine there is a naming convention for classes, inputs,
and
>> >> >>factories?
>> >> >> > 2) I tried using the "ordersystem.order" name but I kept on
>>getting
>> >> >>the
>> >> >> > NoPartitions exception
>> >> >> > 3) The BlockingEnvelopeMap has a put method to put the incoming
>> >> >>messages
>> >> >> > into the partition, and I am putting the table values on the
>>start
>> >> >> method,
>> >> >> > is this a bad practice?
>> >> >> > <
>> >> >> >
>> >> >> >
>> >> >>
>> >> >>
>> 
>>>https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/sr
>>>c
>> >> >>/main/java/samza/examples/order/system/OrderConsumer.java
>> >> >> > >
>> >> >> > 4) WikipediaFeedStreamTask takes an envelope object and puts
>>that
>> >>to
>> >> >> Kafka,
>> >> >> > but where does it get the envelope from? the consumer right?
>> >> >> >
>> >> >> > Thanks again Yang!
>> >> >> >
>> >> >> >
>> >> >> > Renato M.
>> >> >> >
>> >> >> > 2014-11-03 3:18 GMT+01:00 Yan Fang <yanfang724@gmail.com<mailto:
>> yanfang724@gmail.com>>:
>> >> >> >
>> >> >> > > Hi Renato,
>> >> >> > >
>> >> >> > > on the hello-samza example for each new incoming message
which
>> >> >>belongs
>> >> >> > > to a channel, a new partition is created, right?
>> >> >> > >
>> >> >> > > -- In WikipediaFeed job of hello-samza, each channel
actually
>>is
>> >> >> treated
>> >> >> > as
>> >> >> > > one stream of the Wikipedia System. Each stream has on
>>partition
>> >>0.
>> >> >> This
>> >> >> > is
>> >> >> > > the code:
>> >> >> > > SystemStreamPartition systemStreamPartition = new
>> >> >> > > SystemStreamPartition(systemName,
>> >> >> > > event.getChannel(), new Partition(0);
>> >> >> > >
>> >> >> > > So in your code, I think each table could be thought
as one
>> >>stream,
>> >> >> with
>> >> >> > > only partition 0. So it will be like
>> >> >>task.input=ordersystem.tableName
>> >> >> > >
>> >> >> > > -- Then you maybe confused by what happened in Kafka.
All the
>> >> >>messages
>> >> >> > sent
>> >> >> > > to  wikipedia-raw topic is done in WikipediaFeedStreamTask
>> >> >> > > <
>> >> >> > >
>> >> >> >
>> >> >>
>> >> >>
>> 
>>>https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/sr
>>>c
>> >> 
>>>>/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
>> >> >> > > >
>> >> >> > > .
>> >> >> > >
>> >> >> > > Exception in thread "main" org.apache.samza.SamzaException:
No
>> >> >> partitions
>> >> >> > > for this task. Can't run a task without partition assignments.
>> >>It's
>> >> >> > likely
>> >> >> > > that the partition manager for this system doesn't know
about
>>the
>> >> >> stream
>> >> >> > > you're trying to read.
>> >> >> > > at
>> >> >> >
>> >>
>> 
>>>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:
>>>>>>77
>> >>>>)
>> >> >> > > at
>> >> 
>>>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >> >> > >
>> >> >> > > -- Is the stream registered to the system?
>> >> >> > > SystemStreamPartition systemStreamPartition = new
>> >> >> > > SystemStreamPartition(systemName,
>> >> >> > > "order", new Partition(0);
>> >> >> > > Is the "order" registered ? such as
>>task.input=ordersystem.order
>> >>?
>> >> >> > >
>> >> >> > > Let me know if you have other questions.
>> >> >> > >
>> >> >> > > Thanks,
>> >> >> > >
>> >> >> > > Fang, Yan
>> >> >> > > yanfang724@gmail.com<mailto:yanfang724@gmail.com>
>> >> >> > > +1 (206) 849-4108<tel:%2B1%20%28206%29%20849-4108>
>> >> >> > >
>> >> >> > > On Sun, Nov 2, 2014 at 4:56 AM, Renato Marroquín Mogrovejo
<
>> >> >> > > 
>>renatoj.marroquin@gmail.com<mailto:renatoj.marroquin@gmail.com>>
>> wrote:
>> >> >> > >
>> >> >> > > > Hello Samza team,
>> >> >> > > >
>> >> >> > > >
>> >> >> > > > I am trying to modify the hello-samza example application
to
>> >> >>replay
>> >> >> > > events
>> >> >> > > > which are in a table. But I am having some troubles.
>> >> >> > > > So on the hello-samza example for each new incoming
message
>> >>which
>> >> >> > belongs
>> >> >> > > > to a channel, a new partition is created, right?
>> >> >> > > > Now in my case, how (where) do I create these partitions?
I
>> >create
>> >> >> them
>> >> >> > > in
>> >> >> > > > [1] but I am almost sure that is wrong because I
keep
>>getting
>> >>the
>> >> >> > > exception
>> >> >> > > > saying that there are no partitions for this task.
I mean
>> >>ideally
>> >> >>I
>> >> >> > would
>> >> >> > > > like to create partitions based on the keys I am
reading
>>from
>> >>the
>> >> >> > table.
>> >> >> > > > Could anybody help me on this task please? Many
thanks in
>> >advance!
>> >> >> > > >
>> >> >> > > >
>> >> >> > > > Renato M.
>> >> >> > > >
>> >> >> > > >
>> >> >> > > > Exception in thread "main" org.apache.samza.SamzaException:
>>No
>> >> >> > partitions
>> >> >> > > > for this task. Can't run a task without partition
>>assignments.
>> >> >>It's
>> >> >> > > likely
>> >> >> > > > that the partition manager for this system doesn't
know
>>about
>> >>the
>> >> >> > stream
>> >> >> > > > you're trying to read.
>> >> >> > > > at
>> >> >> > >
>> >> >>
>> 
>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:77)
>> >> >> > > > at
>> >> >> 
>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >> >> > > >
>> >> >> > > >
>> >> >> > > > [1]
>> >> >> > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >> >>
>> 
>>>https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/sr
>>>c
>> >> >>/main/java/samza/examples/order/system/OrderConsumer.java#L47
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>>
>>
>>


Mime
View raw message