samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <nickpa...@gmail.com>
Subject Re: Missing a change log offset for SystemStreamPartition
Date Tue, 11 Aug 2015 01:27:07 GMT
Hi, Jordi,

Agree with Yan. More specifically, your class definition should be
something like:
{code}
public class testStore implements StreamTask, InitableTask {
...
}
{code}

On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang <yanfang724@gmail.com> wrote:

> Hi Jordi,
>
> I think, you need to implement the *InitableTask* interface. Otherwise, the
> content in the init method will not be processed.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri <jblasi@nextel.es>
> wrote:
>
> > Just for making it easier to reproduce the problem I just reduced the
> code
> > of the job to the minimum:
> >
> > package test;
> >
> > import org.apache.samza.config.Config;
> > import org.apache.samza.storage.kv.KeyValueStore;
> > import org.apache.samza.system.IncomingMessageEnvelope;
> > import org.apache.samza.task.MessageCollector;
> > import org.apache.samza.task.StreamTask;
> > import org.apache.samza.task.TaskContext;
> > import org.apache.samza.task.TaskCoordinator;
> >
> > public class testStore implements StreamTask {
> >         private KeyValueStore<String, String> storestp;
> >
> >          public void init(Config config, TaskContext context) {
> >                     this.storestp = (KeyValueStore<String, String>)
> > context.getStore("test11db");
> >                   }
> >
> >         public void process (IncomingMessageEnvelope envelope,
> >                        MessageCollector collector,
> >                        TaskCoordinator coordinator)
> >         {
> >                 String msgin = (String) envelope.getMessage();
> >                 storestp.put("test1",msgin);
> >         }
> > }
> >
> > The properties file contains this:
> >
> > task.class=test.testStore
> > job.name=test.testStore
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.gz
> >
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >
> systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181
> >
> >
> systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093
> >
> >
> systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909
> >
> > # Declare that we want our job's checkpoints to be written to Kafka
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > task.checkpoint.system=kafka
> >
> > # The job consumes a topic called "configtpc" from the "kafka" system
> > task.inputs=kafka.configtpc
> >
> > # Define a serializer/deserializer called "json" which parses JSON
> messages
> >
> >
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> > # Serializer for the system
> > systems.kafka.samza.msg.serde=string
> > systems.kafka.streams.tracetpc.samza.msg.serde=json
> >
> > # Use the key-value store implementation for a store called "my-store"
> >
> >
> stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
> >
> > # Use the Kafka topic "routingdb-changelog" as the changelog stream for
> > this store.
> > # This enables automatic recovery of the store after a failure. If you
> > don't
> > # configure this, no changelog stream will be generated.
> > stores.test11db.changelog=kafka.test11db-changelog
> >
> > # Encode keys and values in the store as UTF-8 strings.
> > stores.test11db.key.serde=string
> > stores.test11db.msg.serde=string
> >
> > # Commit checkpoints every 1 seconds
> > task.commit.ms=1000
> >
> > With this, I am getting just the same error:
> >
> > java version "1.7.0_79"
> > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1)
> > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
> > log4j:WARN No appenders could be found for logger
> > (org.apache.samza.metrics.JmxServer).
> > log4j:WARN Please initialize the log4j system properly.
> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> > more info.
> > Exception in thread "main" org.apache.samza.SamzaException: Missing a
> > change log offset for SystemStreamPartition [kafka, test11db-changelog,
> 2].
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> >         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> >         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> >         at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >         at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> >         at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> >         at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> >         at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >         at
> scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> >         at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
> >         at
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
> >         at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> >         at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> >         at
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
> >         at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> >         at
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
> >         at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> >         at
> > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >
> >
> > The job fails even when there is no message sent to the input topic.
> >
> > Samza is version 0.9.1 and kafka 0.8.2.
> >
> > Thanks,
> >
> >   Jordi
> >
> > -----Mensaje original-----
> > De: Jordi Blasi Uribarri [mailto:jblasi@nextel.es]
> > Enviado el: lunes, 10 de agosto de 2015 10:26
> > Para: dev@samza.apache.org
> > Asunto: RE: Missing a change log offset for SystemStreamPartition
> >
> > Hi,
> >
> > I have migrated samza to the last versión and recreated the job with a
> new
> > store name so the streams were created clean. I am getting the same
> error:
> >
> > java version "1.7.0_79"
> > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) OpenJDK
> > 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No appenders
> > could be found for logger (org.apache.samza.metrics.JmxServer).
> > log4j:WARN Please initialize the log4j system properly.
> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> > more info.
> > Exception in thread "main" org.apache.samza.SamzaException: Missing a
> > change log offset for SystemStreamPartition [kafka, commdb-changelog, 2].
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> >         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> >         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> >         at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >         at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> >         at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> >         at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> >         at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >         at
> scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> >         at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
> >         at
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
> >         at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> >         at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> >         at
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
> >         at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> >         at
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
> >         at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> >         at
> > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >
> > Is there any other info I can attach to help find the problem?
> >
> > Thanks,
> >
> >   Jordi
> >
> > -----Mensaje original-----
> > De: Yan Fang [mailto:yanfang724@gmail.com] Enviado el: viernes, 07 de
> > agosto de 2015 23:21
> > Para: dev@samza.apache.org
> > Asunto: Re: Missing a change log offset for SystemStreamPartition
> >
> > Hi Jordi,
> >
> > Sorry for getting you back late. Was quite busy yesterday.
> >
> > I think the reason of your error is that you mismatched Samza version and
> > Kafka version.
> >
> > Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not
> > supported.
> >
> > So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*.
> > This match is proved working.
> >
> > Hope this helps you.
> >
> > Thanks,
> >
> >
> > Fang, Yan
> > yanfang724@gmail.com
> >
> > On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri <jblasi@nextel.es>
> > wrote:
> >
> > > I changed the job name and the store name. I was defining two
> > > different stores and in case that was the problem, I also eliminated
> the
> > second one.
> > > I am getting the same exception.
> > >
> > > Exception in thread "main" org.apache.samza.SamzaException: Missing a
> > > change log offset for SystemStreamPartition [kafka, testdb-changelog,
> 2].
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> > >         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> > >         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> > >         at
> > >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> > >         at
> > >
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> > >         at
> > >
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> > >         at
> > >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> > >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > >         at
> > >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> > >         at
> > scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> > >         at
> > >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
> > >         at
> > >
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
> > >         at
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> > >         at
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> > >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > >         at
> > >
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > >         at
> > >
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
> > >         at
> > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> > >         at
> > >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
> > >         at
> > >
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> > >         at
> > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > >
> > > As I have the autocreate configured in Kafka I am not creating
> > > anything for the store. Is that ok?
> > >
> > > By the way, is there any problem on having two different stores?
> > >
> > > Thanks,
> > >
> > >     Jordi
> > >
> > > -----Mensaje original-----
> > > De: Yan Fang [mailto:yanfang724@gmail.com] Enviado el: miércoles, 05
> > > de agosto de 2015 20:23
> > > Para: dev@samza.apache.org
> > > Asunto: Re: Missing a change log offset for SystemStreamPartition
> > >
> > > Hi Jordi,
> > >
> > > I wonder, the reason of your first exception is that, you changed the
> > > task number (partition number of your input stream), but still were
> > > using the same changelog stream. It is trying to send to the partition
> > > 2, which does not exist?
> > >
> > > Can you reproduce this exception in a new job? (new store name, new
> > > job
> > > name)
> > >
> > > The second exception is caused by the wrong offset format, I believe.
> > >
> > > Let me know how the new job goes.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang724@gmail.com
> > >
> > > On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri
> > > <jblasi@nextel.es>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am trying to use the Keystore to manage some state information.
> > > > Basically this is the code I am using. As long as I have tested, the
> > > > rest is working correctly.
> > > >
> > > > private KeyValueStore<String, String> storestp;
> > > >
> > > > public void init(Config config, TaskContext context) {
> > > >                  this.storestp = (KeyValueStore<String, String>)
> > > > context.getStore("stepdb");
> > > >                }
> > > >
> > > >        public void process(IncomingMessageEnvelope envelope,
> > > >                     MessageCollector collector,
> > > >                     TaskCoordinator coordinator)
> > > >                     {
> > > >                            …
> > > > String str = storestp.get(code)
> > > > …
> > > > }
> > > >
> > > > When I load it, it goes to running but, whe I send the messages
> > > > through Kafka stream It goes to Failed state. I have found this
> > > Exception:
> > > > Exception in thread "main" org.apache.samza.SamzaException: Missing
> > > > a change log offset for SystemStreamPartition [kafka,
> > stepdb-changelog, 2].
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> > > $anonfun$1.apply(TaskStorageManager.scala:87)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> > > $anonfun$1.apply(TaskStorageManager.scala:87)
> > > >         at
> scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> > > >         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> > > apply(TaskStorageManager.scala:87)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> > > apply(TaskStorageManager.scala:84)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> > > raversableLike.scala:772)
> > > >         at
> > > >
> > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> > > .scala:245)
> > > >         at
> > > >
> > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> > > .scala:245)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> > > raversableLike.scala:772)
> > > >         at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> > > ala:771)
> > > >         at
> > > scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> > > ala:771)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorage
> > > Manager.scala:84)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc
> > > ala:63)
> > > >         at
> > > >
> > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala
> > > :88)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> > > (SamzaContainer.scala:607)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> > > (SamzaContainer.scala:607)
> > > >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > >         at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > >         at
> > > >
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s
> > > cala:607)
> > > >         at
> > > >
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> > > la:108)
> > > >         at
> > > >
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> > > >         at
> > > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > > >
> > > > I have seen that the stepdb-changelog stream exists in Kafka. As a
> > > > try to regenerate the missing offset and tes it I have connected
> > > > through the command line and send a message to the stream. It was
> > > > received
> > > correctly.
> > > > Now I am seeing the following Exception:
> > > >
> > > > Exception in thread "main" java.lang.NullPointerException
> > > >         at
> > > >
> > > scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.sc
> > > ala:126)
> > > >         at
> > > > scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
> > > >         at scala.collection.SeqLike$class.size(SeqLike.scala:106)
> > > >         at
> > > > scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
> > > >         at
> > > >
> > > org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.a
> > > pply(KeyValueStorageEngine.scala:94)
> > > >         at
> > > >
> > > org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.a
> > > pply(KeyValueStorageEngine.scala:79)
> > > >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > >         at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > >         at
> > > >
> > > org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStor
> > > ageEngine.scala:79)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.a
> > > pply(TaskStorageManager.scala:112)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.a
> > > pply(TaskStorageManager.scala:106)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> > > raversableLike.scala:772)
> > > >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> > > ala:771)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageM
> > > anager.scala:106)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc
> > > ala:64)
> > > >         at
> > > >
> > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala
> > > :88)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> > > (SamzaContainer.scala:607)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> > > (SamzaContainer.scala:607)
> > > >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > >         at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > >         at
> > > >
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s
> > > cala:607)
> > > >         at
> > > >
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> > > la:108)
> > > >         at
> > > >
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> > > >         at
> > > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > > >
> > > > Is there something wrong?
> > > >
> > > > Thanks,
> > > >
> > > >     Jordi
> > > > ________________________________
> > > > Jordi Blasi Uribarri
> > > > Área I+D+i
> > > >
> > > > jblasi@nextel.es
> > > > Oficina Bilbao
> > > >
> > > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message