apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: Writing Custom Partitioner
Date Wed, 17 Feb 2016 13:02:13 GMT
Hi,

The above code fragment is not working in my case as well.

Basically I have a Tuple class which has a "key" component.

Here is my dag:
A -> B

I need to be able to define Key Based stream multiplexing (same key goes to
same downstream partition) when the down stream operator B is partitioned.
I tried implementing this by defining a custom stream codec extending
DefaultStatefulStreamCodec where I define a getPartition() function which
returns tuple.key().hashcode(). However, I get the same distribution
irrespective of what I return in the getPartition() function. I am using
the StatelessPartitioner for partitioning the downstream operator.

Any ideas on what I am doing wrong?

Thanks.

-Bhupesh

On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <shubham@datatorrent.com>
wrote:

> Thanks Tushar.
>
> I implemented the codec as per your suggestion.
>
> To test it i did the following :
>
> 1. Partitioned Counter operator to have 2 instances  using
>
> <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
>
> 2. The tokenizer emits <word,1> . So if i were to group the stream by
> second field ( index 1 ) , all words would go to same instance of counter.
>
>   dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
> TupleStreamCodec(new int[] { 1 })));
>
> 3. Override getStreamCodec() method in counter operator's input port to
> return new TupleStreamCodec(new int[] { 1 })));
>
> But i couldn't observe the desired results. I also added few debug
> statements in TupleStreamCodec class, but surprisingly, i couldn't see them
> in logs.
>
> Am i missing something ?
>
> Is the com.datatorrent.common.partitioner.StatelessPartitioner:2 overriding
> the behavior of TupleStreamCodec ?
> But if i remove that how would i specify how many partitions i need ?
>
> I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
> that StreamCodec is being used by some operators the same way i am using,
> but no where i could see number of partitions being defined for them ( in
> application.java or properties.xml )
>
> Could you guide me more on this.
>
> Thanks,
> Shubham
>
>
>
> On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <tushar@datatorrent.com>
> wrote:
>
> > Hi Shubham,
> >
> > You can implement a custom stream codec to define your own hashCode
> logic,
> > A simple StateFulStreamCodec
> > is as below. You specify indexes on which you want to compute the
> hashCode
> > in constructor of the stream codec,
> > and while computing hashCode you only use elements at those indexes.
> >
> >
> >   public static class TupleStreamCodec extends
> > DefaultStatefulStreamCodec<Tuple>
> >   {
> >     private int[] indexes;
> >
> >     public TupleStreamCodec(int[] indexes) {
> >     this.indexes = indexes;
> >     }
> >
> >     @Override
> >     public int getPartition(Tuple tuple)
> >     {
> >       int hashCode = 1;
> >       for (int idx : indexes) {
> >         hashCode = 31 * hashCode + tuple.get(i).hashCode();
> >       }
> >       return hashCode;
> >     }
> >
> >   }
> >
> > and you can set this stream codec at input port of the Counter operator.
> >     dag.setInputPortAttribute(counter.inport,
> > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0} ));
> >  // partition based on first element
> >
> >   Note : This is sample implementation, does not handle out of index and
> > other errors :)
> >
> >   - Tushar.
> >
> >
> > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <shubham@datatorrent.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I need some suggestions / pointers related to defining a custom
> > > partitioner.
> > >
> > > The operators in my application process a custom tuple class ( lets
> call
> > it
> > > TUPLE) . This data type has a single field ArrayList.. So each tuple
> > > represents a list of values.
> > >
> > > For a typical word count problem, my dag would be
> > >
> > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> ->
Counter ->
> <TUPLE>
> > ->
> > > Console
> > >
> > > and  if i were to use  TUPLE, tokenizer will emit TUPLE that contains
> > array
> > > list with contents <word,count>
> > >
> > > Now i wish to partition Counter and each instance should receive all
> > tuples
> > > containing same word.
> > >
> > > I know that by default , hashCode()  method of custom tuple class would
> > be
> > > used , but in my case custom tuple class is an arrayList  and i wish to
> > > specify that hashCode must be done on just the first field in
> ArrayList.
> > In
> > > a generic case it could also be on multiple fields in array list.
> > >
> > > Do we have any examples that i could refer to ?
> > >
> > > Also can this be done at application level by setting an attribute ?
> > >
> > > Thanks,
> > > Shubham
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message