incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shailendra Mishra <shailend...@gmail.com>
Subject Re: How to use S4 to implement Join operator
Date Thu, 04 Oct 2012 05:20:30 GMT
I get all that my point is you need to get the data from Adaptor and
create key'd data out of it, somewhat as follows:

This is the partitioner App.code:

    protected void onInit() {
        try {
			KeyFinder<Event> quoteKeyFinder = new KeyFinder<Event>() {
				@Override
				public List<String> get(final Event event) {
					// TODO Auto-generated method stub
					return new ArrayList<String>() {
						{
							add(((MktData)event).getRic());
						}
					};
				}
			};
			
			KeyFinder<Event> tradeKeyFinder = new KeyFinder<Event>() {
				@Override
				public List<String> get(final Event event) {
					// TODO Auto-generated method stub
					return new ArrayList<String>() {
						{
							add(((MktData)event).getRic());
						}
					};
				}
			};
			
			RemoteStream quoteStream = createOutputStream("quotedata", quoteKeyFinder);
			RemoteStream tradeStream = createOutputStream("tradedata", tradeKeyFinder);
			PartitionPE partitionPE = createPE(PartitionPE.class);
			partitionPE.setDownstream(quoteStream);
			partitionPE.setDownstream(tradeStream);
			partitionPE.setSingleton(true);
			createInputStream("mktdata", partitionPE);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			throw new RuntimeException();
		}
}

Now the downstream PE will basically get data keyed by the key defined
in the Partitioner App. Pictorially your topoogy will look as follows:

Adaptor -> Partitioner -> JoinPE - PrintPE - Shailendra
On Wed, Oct 3, 2012 at 9:58 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
> Yes, the partition is separated  by the key value.
>  If I don't have partition, the count value should be added to 20.
> So ...
>
> This is the code of APP:
>          OLAJoinPE joinPE = createPE(OLAJoinPE.class);
>             //filterPE.setDownStream(avgSeenStream);
>             joinPE.setTimerInterval(10000, TimeUnit.MILLISECONDS);
>             Stream<Event> JoinStream = createStream("JoinStream", new
> KeyFinder<Event>() {
>
>                 @Override
>                 public List<String> get(final Event event) {
>                     String value = "";
>                     if(event.containsKey("lineitem")){
>                         value = event.get("lineitem");
>                     }
>                     else if(event.containsKey("order")){
>                         value = event.get("order");
>                     }
>                     //System.out.println("OLA APP: value:"+value);
>                     return ImmutableList.of(value);// partition key: static
> is one PE
>                 }
>             }, joinPE);
>
>
>             OLALineitemPE lineitemPE = createPE(OLALineitemPE.class);
>             lineitemPE.setDownStream(JoinStream);
>             lineitemPE.setSingleton(true);// what is meaning?
>             createInputStream("lineitem",lineitemPE);
>
>             OLAOrderPE ordermPE = createPE(OLAOrderPE.class);
>             ordermPE.setDownStream(JoinStream);
>             ordermPE.setSingleton(true);// what is meaning?
>             ordermPE.setTimerInterval(1000, TimeUnit.MILLISECONDS);
>             createInputStream("order",ordermPE);
>
>
>
>
>
>
> 2012/10/4 Shailendra Mishra <shailendrah@gmail.com>
>>
>> Ok. I get it, the default behavior is that both PE's in each partition
>> will get all the data. The reason is as follows:
>> The data coming in from the adaptor doesn't have the notion of a key
>> therefore it sends all the data to the PE's. If you want to partition
>> the data then you can front this with a Partitioner PE which gets the
>> data from the adaptor and then sends it downstream. Since the data has
>> the notion of key therefore each partition won't receive all the data.
>> - Shailendra
>>
>> On Wed, Oct 3, 2012 at 9:42 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
>> > Hi, I am running at the testCluster with two adapter.
>> > The question is that the list stores all the input data. In fact, it
>> > should
>> > store part data in each PE.
>> > The list in each PE should be independent, not store all the data.
>> > How can i implement this?
>> >
>> > This is the reslut:
>> >
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 1||| PEID:1||| count:1
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 2||| PEID:1||| count:2
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 3||| PEID:1||| count:3
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 4||| PEID:1||| count:4
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 5||| PEID:1||| count:5
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 6||| PEID:1||| count:6
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 7||| PEID:2||| count:1
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 8||| PEID:3||| count:1
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 9||| PEID:3||| count:2
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 10||| PEID:3||| count:3
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 11||| PEID:3||| count:4
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 12||| PEID:3||| count:5
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 13||| PEID:3||| count:6
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 14||| PEID:4||| count:1
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 15||| PEID:5||| count:1
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 16||| PEID:5||| count:2
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 17||| PEID:5||| count:3
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 18||| PEID:6||| count:1
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 19||| PEID:7||| count:1
>> > OLAJoinPE------:Table:!||| T_lineitem.size(): 20||| PEID:7||| count:2
>> >
>> >
>> >
>> >
>> > 2012/10/4 Shailendra Mishra <shailendrah@gmail.com>
>> >>
>> >> Ok, this looks correct, so then what is the question. Do you have this
>> >> app running in more than one partition ? - Shailendra
>> >>
>> >> On Wed, Oct 3, 2012 at 9:18 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
>> >> > Hi, Shailendra,
>> >> > Yes, I have a stream with two event types:lineitem and order. The
>> >> > same
>> >> > key
>> >> > will be sent to same PE.
>> >> >
>> >> > this is the code of mine:
>> >> > ---------------------------
>> >> >
>> >> >     private List<Event> T_lineitem = new ArrayList<Event>();
>> >> >     private List<Event> T_order = new ArrayList<Event>();
>> >> >
>> >> >     private long count = 0;
>> >> >
>> >> >     public void onEvent(Event event) {
>> >> >         // in this example, we use the default generic Event type,
by
>> >> > you
>> >> > can also define your own type
>> >> >         String value = "";
>> >> >         count = count+1;
>> >> >         try{
>> >> >             value =
>> >> > event.get("lineitem").replace("|","&").split("&")[0];
>> >> >
>> >> >             T_lineitem.add(event);
>> >> >             System.out.println("JoinPE------:Table:!|||
>> >> > T_lineitem.size():
>> >> > "+T_lineitem.size()+"||| PEID:"+getId()+"||| count:"+count);
>> >> >         }catch(Exception e){
>> >> >
>> >> >         }
>> >> >         try{
>> >> >             value =
>> >> > event.get("order").replace("|","&").split("&")[0];
>> >> >             T_order.add(event);
>> >> >             System.out.println("oinPE------:Table:!|||
>> >> > T_order.size():
>> >> > "+T_order.size()+"||| PEID:"+getId()+"||| count:"+count);
>> >> >
>> >> >
>> >> >         }catch(Exception e){
>> >> >
>> >> >         }
>> >> > }
>> >> > -----------------------------
>> >> >
>> >> > The output is like this :
>> >> > JoinPE------:Table:!||| T_lineitem.size(): 19||| PEID:7||| count:1
>> >> > JoinPE------:Table:!||| T_lineitem.size(): 20||| PEID:7||| count:2
>> >> >
>> >> > That is the problem, count should be more than the size of list.
>> >> >
>> >> > Dingyu
>> >> >
>> >
>> >
>
>

Mime
View raw message