incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Gómez Ferro <danie...@yahoo-inc.com>
Subject Re: 'error''
Date Wed, 05 Jun 2013 09:07:25 GMT
Hello,

I assume this is the line on which you are getting the NPE:
     downStream.put(new MyEvent(event.get("name")));

You have to initialize the 'downStream' field in HelloPE, for example 
adding the following to HelloApp.onInit():

	helloPE.setDownStream(dStream);

Also, the .start() method on streams is automatically called by the 
framework, you don't have to call it explicitly. Your HelloApp.onInit() 
should look like this:

     @Override
     protected void onInit() {
         // create a prototype
         HelloPE helloPE = createPE(HelloPE.class);
         MyPE myPE = createPE(MyPE.class);
         // Create a stream that listens to the "names" stream and
         // passes events to the helloPE instance.
         createInputStream("names", new KeyFinder<Event>() {
             @Override
             public List<String> get(Event event) {
                 return Arrays.asList(new String[] {event.get("name")});
             }
         }, helloPE);
         Stream dStream = createStream("my_events",myPE);
         helloPE.setDownStream(dStream);
     }

Hope this helps!

Cheers,

Daniel

On Tue Jun  4 18:51:07 2013, Aboubakr Benabbas wrote:
> hi,
>
> I successfully imported my app to eclipse, extended the myApp that one
> could create with s4, in my app i added another PE that receives
> events from the HelloPE and prints standard output, the build was
> successful but when running the app, after the first input i gave to
> the app, i got the following :
>
> Hello Bob!
> Exception in thread "names" java.lang.NullPointerException
>         at hello.HelloPE.onEvent(HelloPE.java:42)
>         at OverloadDispatcher1793.dispatchEvent(Unknown Source)
>         at
> org.apache.s4.core.ProcessingElement.handleInputEvent(ProcessingElement.java:447)
>         at org.apache.s4.core.Stream.run(Stream.java:309)
>         at java.lang.Thread.run(Thread.java:679)
>
> //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
>
> the code does not show any errors
>
> here are my classes (i did not modify the input adapter):
>
> MyPE.java
>
> //////////////////////////////////////////////////////////////
>
> package hello;
>
> import org.apache.s4.base.Event;
> import org.apache.s4.core.ProcessingElement;
>
> public class MyPE extends ProcessingElement {
>
>
>
>     public void onEvent(MyEvent event) {
>
>         System.out.println("this is my event !!");
>     }
>
>     @Override
>     protected void onCreate() {
>     }
>
>     @Override
>     protected void onRemove() {
>     }
>
> }
> /////////////////////////////////////////////////////////////////////////////
>
> HelloPE.java
>
> package hello;
>
> import org.apache.s4.base.Event;
> import org.apache.s4.core.ProcessingElement;
> import org.apache.s4.core.Stream;
>
> public class HelloPE extends ProcessingElement {
>
>     // you should define downstream streams here and inject them in
> the app definition
>
>     boolean seen = false;
>     Stream <MyEvent> downStream; //this is the last addition
>
>     public void setDownStream(Stream<MyEvent> stream) {
>         this.downStream = stream;
>     }
>     /**
>      * This method is called upon a new Event on an incoming stream
>      */
>     public void onEvent(Event event) {
>         // in this example, we use the default generic Event type, by
> you can also define your own type
>         System.out.println("Hello " + (seen ? "again " : "") +
> event.get("name") + "!");
>         seen = true;
>      downStream.put(new MyEvent(event.get("name")));
>
>     }
>
>     @Override
>     protected void onCreate() {
>     }
>
>     @Override
>     protected void onRemove() {
>     }
>
> }
>
> ///////////////////////////////////////////////////////////////////////////////////
>
> HelloApp.java
>
> package hello;
>
> import java.util.Arrays;
> import java.util.List;
>
> import org.apache.s4.base.Event;
> import org.apache.s4.base.KeyFinder;
> import org.apache.s4.core.App;
> import org.apache.s4.core.Stream;
>
> public class HelloApp extends App {
>
>     @Override
>     protected void onStart() {
>     }
>
>     @Override
>     protected void onInit() {
>         // create a prototype
>         HelloPE helloPE = createPE(HelloPE.class);
>         MyPE myPE = createPE(MyPE.class);
>         // Create a stream that listens to the "names" stream and
> passes events to the helloPE instance.
>         createInputStream("names", new KeyFinder<Event>() {
>
>             @Override
>             public List<String> get(Event event) {
>                 return Arrays.asList(new String[] { event.get("name") });
>             }
>         }, helloPE);
>         Stream dStream = createStream("my_events",myPE);
>         dStream.start();
>     }
>
>
>     @Override
>     protected void onClose() {
>     }
>
> }
>
> package hello;
>
> import java.util.Arrays;
> import java.util.List;
>
> import org.apache.s4.base.Event;
> import org.apache.s4.base.KeyFinder;
> import org.apache.s4.core.App;
> import org.apache.s4.core.Stream;
>
> public class HelloApp extends App {
>
>     @Override
>     protected void onStart() {
>     }
>
>     @Override
>     protected void onInit() {
>         // create a prototype
>         HelloPE helloPE = createPE(HelloPE.class);
>         MyPE myPE = createPE(MyPE.class);
>         // Create a stream that listens to the "names" stream and
> passes events to the helloPE instance.
>         createInputStream("names", new KeyFinder<Event>() {
>
>             @Override
>             public List<String> get(Event event) {
>                 return Arrays.asList(new String[] { event.get("name") });
>             }
>         }, helloPE);
>         Stream dStream = createStream("my_events",myPE);
>         dStream.start();
>     }
>
>
>     @Override
>     protected void onClose() {
>     }
>
> }
>

Mime
View raw message