incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shailendra Mishra <shailend...@gmail.com>
Subject Fwd: FW: Few questions.
Date Thu, 07 Jun 2012 17:13:43 GMT
Posting it to the public forum. - Shailendra


From: Mishra, Shailendra (Financial&Risk)
Sent: Thursday, June 07, 2012 10:10 AM
To: Matthieu Morel
Subject: RE: Few questions.

Thanks. I will also post this whole thread on the public forum. I do
have printPE running on cluster 3, here are the scripts that I use:
On cluster 1:
./s4 newCluster -c=cluster1 -nbTasks=1 -flp=12000
sleep 5
./s4 adapter -appClass=univariate.AggrInputAdapter -c=cluster2
-namedStringParameters=adapter.output.stream:rawmktdataFrom:
morelm@gmail.com [morelm@gmail.com] on behalf of Matthieu Morel
[matthieu.morel@gmail.com]

On cluster 2: (All running on the same m/c, so no need to deploy the
app three times)
./s4 newCluster -c=cluster2 -nbTasks=3 -flp=13000
sleep 5
./s4 node -c=cluster1
on another window:
./s4 node -c=cluster1
./s4 node -c=cluster1
// deploy the univariate stats App to the cluster
./s4 deploy -appName=univariateApp -c=cluster1 -b=`pwd`/build.gradle

On cluster 3:
./s4 newCluster -c=cluster3 -nbTasks=1 -flp=14000
sleep 5
./s4 node -c=cluster3
// deploying printPE app to the cluster
./s4 deploy -appName=printApp -c=cluster3 -b=`pwd`/build.gradle

For the univariateApp: the relevant portion of the code is:
    KeyFinder<Event> aggrStreamKeyFinder = new KeyFinder<Event>() {
      public List<String> get (final Event event) {
        return new ArrayList<String>() {
          {
            add(((AggrData)event).getKey());
          }
        };
      }
    };

    RemoteStream aggrStream = createOutputStream("aggrdata",
aggrStreamKeyFinder);
    AggrPE aggrPE = createPE(AggrPE.class);
    aggrPE.setTimerInterval(60, TimeUnit.SECONDS);
    aggrPE.setDownStream(aggrStream);

and just to show that I have mapped it correctly:
In the printPE the relevant code is:

       PrintPE printPE = createPE(PrintPE.class);



        createInputStream("aggrdata", new KeyFinder<AggrData>() {

public List<String> get(final AggrData aggrData) {

return new ArrayList<String>() {

{

add(aggrData.getKey());

}

};

}

}, printPE);

Yesterday when I was trying it I didn't have a key finder in the
createOutputStream but that should have been OK. I haven't tried
running with this new code yet but will do so soon and let you know.

- Shailendra


ps: I think we should subclass the AdapterApp to a InputAdapterApp,
OutputAdapterApp so that it is easier to do design patterns such as:

InputAdapter -> PE11... -> PE21... -> OutputAdapter

I have coded it haven't tested it, pretty sure that if this case works
it will work too.



Sent: Thursday, June 07, 2012 9:48 AM
To: Mishra, Shailendra (Financial&Risk)
Subject: Re: Few questions.



On Thu, Jun 7, 2012 at 6:09 PM, <Shailendra.Mishra@thomsonreuters.com> wrote:
>
> Thanks. I have got all that working now the only problem I am having is (could also be
a programming error):
> CompPE1, CompPE2, CompPE3 are the same prototype, now I want to emit it to PrintPE (in
this case PrintPE is more like an output adaptor, but since we don't have an output adapter
yet. so I am using a PE).
> This is how I do it: In each of the CompPE's - I instantiate a remote stream and call
the put method on it.
>
> In the printApp, I create a corresponding input stream. The printPE in it's onEvent method
writes the output to a concurrent map. My deployment topology is as follows:
> Adaptor runs on cluster c1
> CompPE1..3 run on cluster c2 which is instantiated -nbtasks = 3
> PrintPE runs on cluster c3.


So it looks like you actually have 3 S4 apps in your system: adaptor
on cluster1, Comp on cluster 2 and Print on cluster 3.

For Comp to communicate with Print, with the pub sub model, you need
comp to be a producer (which I assume you do through the
App#createOutputStream method), and at least a consumer, i.e. Print
app defines an input stream with a matching name. Make sure you have
that.

Another issue could be that you have not defined the c3 cluster in
ZooKeeper. Or that you have no node in that cluster. Is the issue only
on Comp1 or do you see that on Comp2 and Comp3 as well?

It's duly noted that we need to improve the diagnostic description in
the error message :)

Let me know how that works out. I plan to implement another set of
improvements very soon, and will include the feedback you already
provided.

Matthieu


>
>
> I am getting the following stack dump from the onTime method in CompPE1 when it tries
to call put on remote stream:
>
> The routine in compPE is pretty straight forward: In the onEvent - I collect data in
an array on a per key basis.
> onTime method - I calculate the aggregate and put it to the remote stream.
>
> 1) null returned by binding at org.apache.s4.comm.RemoteEmitterFactory.createRemoteEmitter()
>  but parameter 0 of org.apache.s4.comm.tcp.TCPRemoteEmitter.<init>() is not @Nullable
>   while locating org.apache.s4.comm.topology.Cluster annotated with @com.google.inject.assistedinject.Assisted(value=)
>     for parameter 0 at org.apache.s4.comm.tcp.TCPRemoteEmitter.<init>(TCPRemoteEmitter.java:21)
>   while locating org.apache.s4.base.RemoteEmitter annotated with interface com.google.inject.assistedinject.Assisted
>
> 1 error
> at com.google.inject.internal.InjectorImpl$4.get(InjectorImpl.java:987) ~[guice-3.0.jar:na]
> at com.google.inject.assistedinject.FactoryProvider2.invoke(FactoryProvider2.java:632)
~[guice-assistedinject-3.0.jar:na]
> at $Proxy9.createRemoteEmitter(Unknown Source) ~[na:na]
> at org.apache.s4.comm.tcp.RemoteEmitters.getEmitter(RemoteEmitters.java:28) ~[s4-comm-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> at org.apache.s4.core.RemoteSenders.send(RemoteSenders.java:49) ~[s4-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> at org.apache.s4.core.RemoteStream.put(RemoteStream.java:56) ~[s4-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> at univariate.AggrPE.onTime(AggrPE.java:84) ~[na:na]
> at org.apache.s4.core.ProcessingElement$OnTimeTask.run(ProcessingElement.java:619) ~[s4-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> at java.util.TimerThread.mainLoop(Timer.java:512) [na:1.6.0_31]
> at java.util.TimerThread.run(Timer.java:462) [na:1.6.0_31]
>
> - Shailendra
>
>
> ________________________________
> From: morelm@gmail.com [morelm@gmail.com] on behalf of Matthieu Morel [matthieu.morel@gmail.com]
> Sent: Thursday, June 07, 2012 12:34 AM
> To: Mishra, Shailendra (Financial&Risk)
> Subject: Re: Few questions.
>
>
>
> On Wed, Jun 6, 2012 at 5:05 PM, <Shailendra.Mishra@thomsonreuters.com> wrote:
>>
>> Ok, I will post more on the public forum but just while we are on this topic, so
as I understand:
>> the logical topology will look like:
>> Adaptor -> PartitionerPE -> CompPE1, CompPE2, CompPE3 -> PrintPE
>> Right.
>
>
> If CompPE1, CompPE2 and CompPE3 are different prototype definitions (different classes),
then they should probably listen to different streams. If they are different instances of
the same prototype, then you´d have 1 stream and a custom partitioning into 3 partitions.
Note that in S4 the granularity of partitions is arbitrary, and can be very small (e.g. words
in a word count), in which case you have many prototype instances (e.g. 1 per word).
>
> Hope this clarifies things!
>
> Matthieu
>>
>>
>> I will try the windowing stuff and let you know. - Shailendra
>> ________________________________
>> From: morelm@gmail.com [morelm@gmail.com] on behalf of Matthieu Morel [matthieu.morel@gmail.com]
>> Sent: Wednesday, June 06, 2012 7:46 AM
>> To: Mishra, Shailendra (Financial&Risk)
>> Cc: leoneumeyer@gmail.com
>> Subject: Re: Few questions.
>>
>> Hi Shailendra,
>>
>> please don't hesitate to post on the public list, that will be useful for everyone!
>>
>> About partitioning:
>> - you partition data using a KeyFinder. See for example in the twitter example:
>> https://git-wip-us.apache.org/repos/asf?p=incubator-s4.git;a=blob;f=test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java;h=90c31994e20cc311e333ea8eb6bd1485e8b2e857;hb=S4-22#l46
>> - right now, if you use an adapter application in front of a consumer application,
events are broadcasted to all consumer nodes. Maybe that's what is giving you issues. We'll
add a customizable policy, round-robin being probably the default.
>>
>> About windowing:
>> - the idea is that you fill a circular and rotating buffer with slots (in piper,
you provide your own implementation), upon reception of events
>> - you always have access to the latest slot, and you place data in that slot
>> - you define when new slots are generated
>> - you specify the size of a window, i.e. how many slots per window
>>
>> In parallel, you can use a trigger to output data that you compute from data in the
current window. (that trigger could actually be a multiple of slot duration)
>>
>> We'll add examples and documentation for that.
>>
>> Hope this helps, and thanks again for the feedback!
>>
>> Matthieu
>>
>>
>>
>>
>> On Wed, Jun 6, 2012 at 3:15 PM, <Shailendra.Mishra@thomsonreuters.com> wrote:
>>>
>>> Hi Leo, Matthieu:
>>>
>>> Sorry couldn't attend yesterdays hangout session, I was on a plane. I have been
trying to code a few quant use cases using s4 and have a few questions:
>>> - Consider the following topology Input-Adaptor -> PE1, PE2, PE3 <all three
are running the same application> -> PrintPE <which outputs the data>
>>> Ideally, I would like to think PE1..3 as processing specific partition of the
data, but looks like there is no obvious way to do it. So, I thought I would filter out stuff
at the destination based on a partition-id. Now I can interrogate ZK and get my process partition
(haven't tried that but think it is possible). Short of that is there a cheaper way of doing
this. Maybe this is not a suitable way in S4, assuming that to be true - let me ask the question
how would you partition data ?
>>> - Now for the second question, so far for my applications I have been using onTime,
onTrigger methods to implement windowing. The former to do wall clock time based and the latter
to do application time based. However, I came across the notion of WindowingPE which could
be used instead. Would you have an example showing the use of WindowingPE to model what I
have been doing using onTime, onTrigger.
>>> Would greatly appreciate any help.
>>>
>>> - Thanks
>>> - Shailendra
>>>
>>> This email was sent to you by Thomson Reuters, the global news and information
company. Any views expressed in this message are those of the individual sender, except where
the sender specifically states them to be the views of Thomson Reuters.
>>
>>
>>
>> This email was sent to you by Thomson Reuters, the global news and information company.
Any views expressed in this message are those of the individual sender, except where the sender
specifically states them to be the views of Thomson Reuters.
>
>
>
> This email was sent to you by Thomson Reuters, the global news and information company.
Any views expressed in this message are those of the individual sender, except where the sender
specifically states them to be the views of Thomson Reuters.



This email was sent to you by Thomson Reuters, the global news and
information company. Any views expressed in this message are those of
the individual sender, except where the sender specifically states
them to be the views of Thomson Reuters.

Mime
View raw message