samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jordi Blasi Uribarri <jbl...@nextel.es>
Subject RE: Missing a change log offset for SystemStreamPartition
Date Thu, 06 Aug 2015 08:16:23 GMT
I changed the job name and the store name. I was defining two different stores and in case
that was the problem, I also eliminated the second one. I am getting the same exception.

Exception in thread "main" org.apache.samza.SamzaException: Missing a change log offset for
SystemStreamPartition [kafka, testdb-changelog, 2].
        at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
        at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:58)
        at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
        at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
        at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
        at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
        at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
        at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
        at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

As I have the autocreate configured in Kafka I am not creating anything for the store. Is
that ok?

By the way, is there any problem on having two different stores?

Thanks,

    Jordi

-----Mensaje original-----
De: Yan Fang [mailto:yanfang724@gmail.com] 
Enviado el: miércoles, 05 de agosto de 2015 20:23
Para: dev@samza.apache.org
Asunto: Re: Missing a change log offset for SystemStreamPartition

Hi Jordi,

I wonder, the reason of your first exception is that, you changed the task number (partition
number of your input stream), but still were using the same changelog stream. It is trying
to send to the partition 2, which does not exist?

Can you reproduce this exception in a new job? (new store name, new job
name)

The second exception is caused by the wrong offset format, I believe.

Let me know how the new job goes.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri <jblasi@nextel.es>
wrote:

> Hi,
>
> I am trying to use the Keystore to manage some state information.
> Basically this is the code I am using. As long as I have tested, the 
> rest is working correctly.
>
> private KeyValueStore<String, String> storestp;
>
> public void init(Config config, TaskContext context) {
>                  this.storestp = (KeyValueStore<String, String>) 
> context.getStore("stepdb");
>                }
>
>        public void process(IncomingMessageEnvelope envelope,
>                     MessageCollector collector,
>                     TaskCoordinator coordinator)
>                     {
>                            …
> String str = storestp.get(code)
> …
> }
>
> When I load it, it goes to running but, whe I send the messages 
> through Kafka stream It goes to Failed state. I have found this Exception:
> Exception in thread "main" org.apache.samza.SamzaException: Missing a 
> change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2].
>         at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
>         at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
>         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>         at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
>         at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>         at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
>         at
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
>         at
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>         at
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
>         at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
>         at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
>         at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>         at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
> I have seen that the stepdb-changelog stream exists in Kafka. As a try 
> to regenerate the missing offset and tes it I have connected through 
> the command line and send a message to the stream. It was received correctly.
> Now I am seeing the following Exception:
>
> Exception in thread "main" java.lang.NullPointerException
>         at
> scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
>         at
> scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
>         at scala.collection.SeqLike$class.size(SeqLike.scala:106)
>         at
> scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
>         at
> org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94)
>         at
> org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:79)
>         at
> org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:112)
>         at
> org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:106)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:106)
>         at
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:64)
>         at
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>         at
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
>         at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
>         at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
>         at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>         at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
> Is there something wrong?
>
> Thanks,
>
>     Jordi
> ________________________________
> Jordi Blasi Uribarri
> Área I+D+i
>
> jblasi@nextel.es
> Oficina Bilbao
>
> [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
>
Mime
View raw message