apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tushar Gosavi <tus...@datatorrent.com>
Subject Re: Writing Custom Partitioner
Date Wed, 17 Feb 2016 19:18:36 GMT
Hi Bhupesh,

I also did some experiments and found out that stream codec extended
from DefaultStatefulStreamCodec does not work. Extending custom stream
codec from DefaultKryoStreamCodec worked.

Can anyone explain, what is the difference between
DefaultStatefulStreamCodec and DefaultKryoStreamCodec? and does platform
allow writing custom streamcodec extended from DefaultStatefulStreamCodec?
In above case it did not work. If it is expected to work then I will open
an Jira for the issue.

Regards,
-Tushar.



On Wed, Feb 17, 2016 at 6:32 PM, Bhupesh Chawda <bhupesh@datatorrent.com>
wrote:

> Hi,
>
> The above code fragment is not working in my case as well.
>
> Basically I have a Tuple class which has a "key" component.
>
> Here is my dag:
> A -> B
>
> I need to be able to define Key Based stream multiplexing (same key goes to
> same downstream partition) when the down stream operator B is partitioned.
> I tried implementing this by defining a custom stream codec extending
> DefaultStatefulStreamCodec where I define a getPartition() function which
> returns tuple.key().hashcode(). However, I get the same distribution
> irrespective of what I return in the getPartition() function. I am using
> the StatelessPartitioner for partitioning the downstream operator.
>
> Any ideas on what I am doing wrong?
>
> Thanks.
>
> -Bhupesh
>
> On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <shubham@datatorrent.com>
> wrote:
>
> > Thanks Tushar.
> >
> > I implemented the codec as per your suggestion.
> >
> > To test it i did the following :
> >
> > 1. Partitioned Counter operator to have 2 instances  using
> >
> >
> <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
> >
> > 2. The tokenizer emits <word,1> . So if i were to group the stream by
> > second field ( index 1 ) , all words would go to same instance of
> counter.
> >
> >   dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
> > TupleStreamCodec(new int[] { 1 })));
> >
> > 3. Override getStreamCodec() method in counter operator's input port to
> > return new TupleStreamCodec(new int[] { 1 })));
> >
> > But i couldn't observe the desired results. I also added few debug
> > statements in TupleStreamCodec class, but surprisingly, i couldn't see
> them
> > in logs.
> >
> > Am i missing something ?
> >
> > Is the com.datatorrent.common.partitioner.StatelessPartitioner:2
> overriding
> > the behavior of TupleStreamCodec ?
> > But if i remove that how would i specify how many partitions i need ?
> >
> > I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
> > that StreamCodec is being used by some operators the same way i am using,
> > but no where i could see number of partitions being defined for them ( in
> > application.java or properties.xml )
> >
> > Could you guide me more on this.
> >
> > Thanks,
> > Shubham
> >
> >
> >
> > On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <tushar@datatorrent.com>
> > wrote:
> >
> > > Hi Shubham,
> > >
> > > You can implement a custom stream codec to define your own hashCode
> > logic,
> > > A simple StateFulStreamCodec
> > > is as below. You specify indexes on which you want to compute the
> > hashCode
> > > in constructor of the stream codec,
> > > and while computing hashCode you only use elements at those indexes.
> > >
> > >
> > >   public static class TupleStreamCodec extends
> > > DefaultStatefulStreamCodec<Tuple>
> > >   {
> > >     private int[] indexes;
> > >
> > >     public TupleStreamCodec(int[] indexes) {
> > >     this.indexes = indexes;
> > >     }
> > >
> > >     @Override
> > >     public int getPartition(Tuple tuple)
> > >     {
> > >       int hashCode = 1;
> > >       for (int idx : indexes) {
> > >         hashCode = 31 * hashCode + tuple.get(i).hashCode();
> > >       }
> > >       return hashCode;
> > >     }
> > >
> > >   }
> > >
> > > and you can set this stream codec at input port of the Counter
> operator.
> > >     dag.setInputPortAttribute(counter.inport,
> > > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0}
> ));
> > >  // partition based on first element
> > >
> > >   Note : This is sample implementation, does not handle out of index
> and
> > > other errors :)
> > >
> > >   - Tushar.
> > >
> > >
> > > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <
> shubham@datatorrent.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I need some suggestions / pointers related to defining a custom
> > > > partitioner.
> > > >
> > > > The operators in my application process a custom tuple class ( lets
> > call
> > > it
> > > > TUPLE) . This data type has a single field ArrayList.. So each tuple
> > > > represents a list of values.
> > > >
> > > > For a typical word count problem, my dag would be
> > > >
> > > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE>
-> Counter ->
> > <TUPLE>
> > > ->
> > > > Console
> > > >
> > > > and  if i were to use  TUPLE, tokenizer will emit TUPLE that contains
> > > array
> > > > list with contents <word,count>
> > > >
> > > > Now i wish to partition Counter and each instance should receive all
> > > tuples
> > > > containing same word.
> > > >
> > > > I know that by default , hashCode()  method of custom tuple class
> would
> > > be
> > > > used , but in my case custom tuple class is an arrayList  and i wish
> to
> > > > specify that hashCode must be done on just the first field in
> > ArrayList.
> > > In
> > > > a generic case it could also be on multiple fields in array list.
> > > >
> > > > Do we have any examples that i could refer to ?
> > > >
> > > > Also can this be done at application level by setting an attribute ?
> > > >
> > > > Thanks,
> > > > Shubham
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message