apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shubham Pathak <shub...@datatorrent.com>
Subject Re: Writing Custom Partitioner
Date Wed, 10 Feb 2016 13:08:16 GMT
Thanks Tushar.

I implemented the codec as per your suggestion.

To test it i did the following :

1. Partitioned Counter operator to have 2 instances  using
<configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>

2. The tokenizer emits <word,1> . So if i were to group the stream by
second field ( index 1 ) , all words would go to same instance of counter.

  dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
TupleStreamCodec(new int[] { 1 })));

3. Override getStreamCodec() method in counter operator's input port to
return new TupleStreamCodec(new int[] { 1 })));

But i couldn't observe the desired results. I also added few debug
statements in TupleStreamCodec class, but surprisingly, i couldn't see them
in logs.

Am i missing something ?

Is the com.datatorrent.common.partitioner.StatelessPartitioner:2 overriding
the behavior of TupleStreamCodec ?
But if i remove that how would i specify how many partitions i need ?

I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
that StreamCodec is being used by some operators the same way i am using,
but no where i could see number of partitions being defined for them ( in
application.java or properties.xml )

Could you guide me more on this.

Thanks,
Shubham



On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <tushar@datatorrent.com>
wrote:

> Hi Shubham,
>
> You can implement a custom stream codec to define your own hashCode logic,
> A simple StateFulStreamCodec
> is as below. You specify indexes on which you want to compute the hashCode
> in constructor of the stream codec,
> and while computing hashCode you only use elements at those indexes.
>
>
>   public static class TupleStreamCodec extends
> DefaultStatefulStreamCodec<Tuple>
>   {
>     private int[] indexes;
>
>     public TupleStreamCodec(int[] indexes) {
>     this.indexes = indexes;
>     }
>
>     @Override
>     public int getPartition(Tuple tuple)
>     {
>       int hashCode = 1;
>       for (int idx : indexes) {
>         hashCode = 31 * hashCode + tuple.get(i).hashCode();
>       }
>       return hashCode;
>     }
>
>   }
>
> and you can set this stream codec at input port of the Counter operator.
>     dag.setInputPortAttribute(counter.inport,
> Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0} ));
>  // partition based on first element
>
>   Note : This is sample implementation, does not handle out of index and
> other errors :)
>
>   - Tushar.
>
>
> On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <shubham@datatorrent.com>
> wrote:
>
> > Hi,
> >
> > I need some suggestions / pointers related to defining a custom
> > partitioner.
> >
> > The operators in my application process a custom tuple class ( lets call
> it
> > TUPLE) . This data type has a single field ArrayList.. So each tuple
> > represents a list of values.
> >
> > For a typical word count problem, my dag would be
> >
> > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter
->  <TUPLE>
> ->
> > Console
> >
> > and  if i were to use  TUPLE, tokenizer will emit TUPLE that contains
> array
> > list with contents <word,count>
> >
> > Now i wish to partition Counter and each instance should receive all
> tuples
> > containing same word.
> >
> > I know that by default , hashCode()  method of custom tuple class would
> be
> > used , but in my case custom tuple class is an arrayList  and i wish to
> > specify that hashCode must be done on just the first field in ArrayList.
> In
> > a generic case it could also be on multiple fields in array list.
> >
> > Do we have any examples that i could refer to ?
> >
> > Also can this be done at application level by setting an attribute ?
> >
> > Thanks,
> > Shubham
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message