ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From svonn <sveng...@posteo.de>
Subject Re: How to do 'stream processing' and more questions of a Ignite newbie
Date Mon, 11 Dec 2017 16:33:38 GMT
Hi,

Thanks alot for the insight! 
I got a few questions left:

1. Currently I'm developing in my local IDEA IDE, starting ignite in client
mode - How would is deployment supposed to work for live environment? Do you
still start a client node with 'instructions' for the cluster? 
2. I've been reading a bit about the Web Console
https://apacheignite-tools.readme.io/docs/getting-started
which allows you to configure the .xml for Ignite. How important is that?
Currently my xml is pretty generic, can everything the XML does also be done
with Java? In that context I'm also unsure at what point you're supposed to
tell the custer about AffinityKeys - defining them with the client node as
described in question 1. would be 'too late', wouldnt it?

3. Probably caused by my lack of knowledge linked to the above questions,
here's a more specific issue:
(Here is my class, peer class loading is disabled)

public class Interpolation {

    static Ignite ignite = Ignition.start("local-config.xml");
    static IgniteCache<BinaryObject, BinaryObject> gpsCache =
Ignition.ignite().cache("GpsPoint").withKeepBinary();
    static IgniteCache<BinaryObject, BinaryObject> apCache =
Ignition.ignite().cache("AccelerationPoint").withKeepBinary();

    public static Cache.Entry<BinaryObject, BinaryObject>
getPreviousGpsPoint(BinaryObject gpsPointKey) {

        IgniteCache<BinaryObject, BinaryObject> gpsCache =
Ignition.ignite().cache("GpsPoint").withKeepBinary();

        ScanQuery<BinaryObject, BinaryObject> scan = new ScanQuery<>(
                new IgniteBiPredicate<BinaryObject, BinaryObject>() {
                    @Override
                    public boolean apply(BinaryObject key, BinaryObject
value) {
                        return
(key.<String>field("deviceId").equals(gpsPointKey.<String>field("deviceId"))
                                &&
key.<Long>field("measurementId").equals(gpsPointKey.<Long>field("measurementId"))
                                && key.<Long>field("timestamp") -
gpsPointKey.<Long>field("timestamp") < 0
                                && key.<Long>field("timestamp") -
gpsPointKey.<Long>field("timestamp") > -10000);
                    }
                }
        );

        Iterable<?> col = gpsCache.query(scan);
        for (Object next : col) {
            System.out.println(next);
        }

        return null;
    }

    public static void main(String[] args) throws InterruptedException {

        ContinuousQuery<BinaryObject, BinaryObject> continuousQuery = new
ContinuousQuery<>();

        continuousQuery.setLocalListener(new
CacheEntryUpdatedListener<BinaryObject, BinaryObject>() {
            @Override
            public void onUpdated(Iterable<CacheEntryEvent&lt;? extends
BinaryObject, ? extends BinaryObject>> evts) {
                for (CacheEntryEvent<? extends BinaryObject, ? extends
BinaryObject> e : evts) {
                    final Cache.Entry<BinaryObject, BinaryObject>
previousGpsPoint = getPreviousGpsPoint(e.getKey());
                }
            }
        });
        continuousQuery.setInitialQuery(new ScanQuery<BinaryObject,
BinaryObject>((k, v) -> true));

        continuousQuery.setRemoteFilter(e -> true);

        try (QueryCursor<Cache.Entry&lt;BinaryObject, BinaryObject>>
initialCursor = gpsCache.query(continuousQuery)) {
            try {
                for (Cache.Entry<BinaryObject, BinaryObject> e :
initialCursor) {
                    final Cache.Entry<BinaryObject, BinaryObject>
previousGpsPoint = getPreviousGpsPoint(e.getKey());
                }
                while (true) {
                    Thread.sleep(500);
                }
            } catch (Exception e) {
                System.out.println("Error executing initial query");
                e.printStackTrace();
            }
        }
    }
}
 

Currently I'm trying my first steps with the continuous queries - without
having peer class loading enabled I'm always running into the error:

Failed to find class with given class loader for unmarshalling (make sure
same versions of all classes are available on all nodes or enable
peer-class-loading) [clsLdr=sun.misc.Launcher$AppClassLoader@266474c2,
cls=de.tudresden.inf.streambench.ignite.tasks.Interpolation]
	at
org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:124)
	at
org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:94)
	at
org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:143)
	at
org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:82)
	at
org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9795)
	at
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage.message(TcpDiscoveryCustomEventMessage.java:81)
	at
org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.notifyDiscoveryListener(ServerImpl.java:5460)
	at
org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processCustomMessage(ServerImpl.java:5346)
	at
org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processMessage(ServerImpl.java:2656)
	at
org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processMessage(ServerImpl.java:2447)
	at
org.apache.ignite.spi.discovery.tcp.ServerImpl$MessageWorkerAdapter.body(ServerImpl.java:6648)
	at
org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.body(ServerImpl.java:2533)
	at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:62)
Caused by: java.lang.ClassNotFoundException:
de.tudresden.inf.streambench.ignite.tasks.Interpolation
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at
org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8497)
	at
org.apache.ignite.marshaller.jdk.JdkMarshallerObjectInputStream.resolveClass(JdkMarshallerObjectInputStream.java:54)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1678)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1518)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.readExternal(CacheContinuousQueryHandler.java:1163)
	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2076)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2025)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at
org.apache.ignite.internal.processors.continuous.StartRequestData.readExternal(StartRequestData.java:260)
	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2076)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2025)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at
org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:121)
	... 12 more

3. - This question might already by answered by answering 1 and 2 - I don't
understand how deployment in Docker is supposed to work yet.

When enabling peer class loading, I was able to run the above code when
running the logic from 'getPreviousGpsPoint' directly within the listener /
initial query cursor.
I encountered two issues here:
4.1. Before refactoring the logic to a method, the scan query would find too
many results - From what I've seen it didn't matter if I scanned for
timestamp differences between 0 and 1 or 0 and 10000. 
4.2. After refactoring it to the method it was treated differently - it
looked like it tried to run directly on the nodes, where it failed becaused
it couldn't cast my class "Interpolation" to IgniteBiPredicate - since I had
no clue what the hell it was trying to do I turned off peer class loading,
spooky stuff.
4.3. Closely linked to all of that: How would you run the continuous query
on the server nodes instead of the client node - currently it would try to
process anything directly on the client node, wouldn't it?


Best regards
Svonn





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Mime
View raw message