ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cong Guo <cong.g...@huawei.com>
Subject RE: ClassCastException When Using CacheEntryProcessor in StreamVisitor
Date Tue, 05 Jun 2018 15:35:23 GMT
Hello,

Can anyone see this email?

From: Cong Guo
Sent: 2018年6月1日 13:11
To: 'user@ignite.apache.org' <user@ignite.apache.org>
Subject: ClassCastException When Using CacheEntryProcessor in StreamVisitor

Hi,

I want to use IgniteDataStreamer to handle data updates. Is it possible to use CacheEntryProcessor
in StreamVisitor? I write a simple program as follows. It works on a single node, but gets
a ClassCastException on two nodes. The two nodes are on two physical machines. I have set
 peerClassLoadingEnabled to true on both the nodes. How do I use CacheEntryProcessor in StreamVisitor?

The function is like:

private static void streamUpdate(Ignite ignite, IgniteCache<Long, Person> personCache)
{
                                CacheConfiguration<Long, Double> updateCfg = new CacheConfiguration<>("updateCache");
                                try(IgniteCache<Long, Double> updateCache = ignite.getOrCreateCache(updateCfg))
{
                                                try (IgniteDataStreamer<Long, Double>
updateStmr = ignite.dataStreamer(updateCache.getName())) {

                                                                updateStmr.receiver(StreamVisitor.from((cache,e)
-> {
                                                                                Long id =
e.getKey();
                                                                                Double newVal
= e.getValue();
                                                                                personCache.<Long,
BinaryObject>withKeepBinary().invoke(id,
                                                                                         
      new CacheEntryProcessor<Long, BinaryObject, Object>() {
                                                                                         
                      public Object process(MutableEntry<Long, BinaryObject> entry,
Object...objects) throws EntryProcessorException {
                                                                                         
                                      BinaryObjectBuilder bldr = entry.getValue().toBuilder();
                                                                                         
                                      double salary = bldr.getField("salary");
                                                                                         
                                      bldr.setField("salary", salary+newVal);
                                                                                         
                                      entry.setValue(bldr.build());
                                                                                         
                                      return null;
                                                                                         
                      }
                                                                                         
      });
                                                                }));

                                                                Random generator = new Random();
                                                                for(long i=1;i<=EXP_SIZE;i++)
{
                                                                                long rankey
= 1+generator.nextInt(EXP_SIZE);
                                                                                updateStmr.addData(rankey,
10.0);
                                                                }
                                                }//end second try
                                }//end first try
}

Here the Person class is from the ignite example. There is no exception on a single node.
The exception is like:

javax.cache.processor.EntryProcessorException: java.lang.ClassCastException: com.huawei.clusterexperiment.model.Person
cannot be cast to org.apache.ignite.binary.BinaryObject
        at org.apache.ignite.internal.processors.cache.CacheInvokeResult.get(CacheInvokeResult.java:102)
        at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1361)
        at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1405)
        at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.invoke(GatewayProtectedCacheProxy.java:1362)
        at com.huawei.clusterexperiment.Client.lambda$streamUpdate$a02be2b7$1(Client.java:310)
        at org.apache.ignite.stream.StreamVisitor$1.apply(StreamVisitor.java:50)
        at org.apache.ignite.stream.StreamVisitor$1.apply(StreamVisitor.java:48)
        at org.apache.ignite.stream.StreamVisitor.receive(StreamVisitor.java:38)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:137)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:397)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:302)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:59)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
        at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
        at java.lang.Thread.run(Thread.java:745)


Mime
View raw message