samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jordi Blasi Uribarri <jbl...@nextel.es>
Subject Missing a change log offset for SystemStreamPartition
Date Wed, 05 Aug 2015 07:51:42 GMT
Hi,

I am trying to use the Keystore to manage some state information. Basically this is the code
I am using. As long as I have tested, the rest is working correctly.

private KeyValueStore<String, String> storestp;

public void init(Config config, TaskContext context) {
                 this.storestp = (KeyValueStore<String, String>) context.getStore("stepdb");
               }

       public void process(IncomingMessageEnvelope envelope,
                    MessageCollector collector,
                    TaskCoordinator coordinator)
                    {
                           …
String str = storestp.get(code)
…
}

When I load it, it goes to running but, whe I send the messages through Kafka stream It goes
to Failed state. I have found this Exception:
Exception in thread "main" org.apache.samza.SamzaException: Missing a change log offset for
SystemStreamPartition [kafka, stepdb-changelog, 2].
        at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
        at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:58)
        at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
        at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
        at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
        at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
        at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
        at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
        at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

I have seen that the stepdb-changelog stream exists in Kafka. As a try to regenerate the missing
offset and tes it I have connected through the command line and send a message to the stream.
It was received correctly. Now I am seeing the following Exception:

Exception in thread "main" java.lang.NullPointerException
        at scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
        at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
        at scala.collection.SeqLike$class.size(SeqLike.scala:106)
        at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
        at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94)
        at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:79)
        at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:112)
        at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:106)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:106)
        at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:64)
        at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
        at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
        at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
        at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

Is there something wrong?

Thanks,

    Jordi
________________________________
Jordi Blasi Uribarri
Área I+D+i

jblasi@nextel.es
Oficina Bilbao

[http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message