apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: Writing Custom Partitioner
Date Thu, 18 Feb 2016 13:32:33 GMT
Same for me!
Does anyone know if this is expected? Or can be a bug?

Thanks.
-Bhupesh

On Thu, Feb 18, 2016 at 6:38 PM, Shubham Pathak <shubham@datatorrent.com>
wrote:

> Extending custom stream codec from DefaultKryoStreamCodec worked for my
> case !
>
> Thanks,
> Shubham
>
> On Thu, Feb 18, 2016 at 12:48 AM, Tushar Gosavi <tushar@datatorrent.com>
> wrote:
>
> > Hi Bhupesh,
> >
> > I also did some experiments and found out that stream codec extended
> > from DefaultStatefulStreamCodec does not work. Extending custom stream
> > codec from DefaultKryoStreamCodec worked.
> >
> > Can anyone explain, what is the difference between
> > DefaultStatefulStreamCodec and DefaultKryoStreamCodec? and does platform
> > allow writing custom streamcodec extended from
> DefaultStatefulStreamCodec?
> > In above case it did not work. If it is expected to work then I will open
> > an Jira for the issue.
> >
> > Regards,
> > -Tushar.
> >
> >
> >
> > On Wed, Feb 17, 2016 at 6:32 PM, Bhupesh Chawda <bhupesh@datatorrent.com
> >
> > wrote:
> >
> > > Hi,
> > >
> > > The above code fragment is not working in my case as well.
> > >
> > > Basically I have a Tuple class which has a "key" component.
> > >
> > > Here is my dag:
> > > A -> B
> > >
> > > I need to be able to define Key Based stream multiplexing (same key
> goes
> > to
> > > same downstream partition) when the down stream operator B is
> > partitioned.
> > > I tried implementing this by defining a custom stream codec extending
> > > DefaultStatefulStreamCodec where I define a getPartition() function
> which
> > > returns tuple.key().hashcode(). However, I get the same distribution
> > > irrespective of what I return in the getPartition() function. I am
> using
> > > the StatelessPartitioner for partitioning the downstream operator.
> > >
> > > Any ideas on what I am doing wrong?
> > >
> > > Thanks.
> > >
> > > -Bhupesh
> > >
> > > On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <
> shubham@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Thanks Tushar.
> > > >
> > > > I implemented the codec as per your suggestion.
> > > >
> > > > To test it i did the following :
> > > >
> > > > 1. Partitioned Counter operator to have 2 instances  using
> > > >
> > > >
> > >
> >
> <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
> > > >
> > > > 2. The tokenizer emits <word,1> . So if i were to group the stream
by
> > > > second field ( index 1 ) , all words would go to same instance of
> > > counter.
> > > >
> > > >   dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC,
> > new
> > > > TupleStreamCodec(new int[] { 1 })));
> > > >
> > > > 3. Override getStreamCodec() method in counter operator's input port
> to
> > > > return new TupleStreamCodec(new int[] { 1 })));
> > > >
> > > > But i couldn't observe the desired results. I also added few debug
> > > > statements in TupleStreamCodec class, but surprisingly, i couldn't
> see
> > > them
> > > > in logs.
> > > >
> > > > Am i missing something ?
> > > >
> > > > Is the com.datatorrent.common.partitioner.StatelessPartitioner:2
> > > overriding
> > > > the behavior of TupleStreamCodec ?
> > > > But if i remove that how would i specify how many partitions i need ?
> > > >
> > > > I did take look at FraudDetectionDemo and YahooFinance demo . I
> noticed
> > > > that StreamCodec is being used by some operators the same way i am
> > using,
> > > > but no where i could see number of partitions being defined for them
> (
> > in
> > > > application.java or properties.xml )
> > > >
> > > > Could you guide me more on this.
> > > >
> > > > Thanks,
> > > > Shubham
> > > >
> > > >
> > > >
> > > > On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <
> tushar@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Hi Shubham,
> > > > >
> > > > > You can implement a custom stream codec to define your own hashCode
> > > > logic,
> > > > > A simple StateFulStreamCodec
> > > > > is as below. You specify indexes on which you want to compute the
> > > > hashCode
> > > > > in constructor of the stream codec,
> > > > > and while computing hashCode you only use elements at those
> indexes.
> > > > >
> > > > >
> > > > >   public static class TupleStreamCodec extends
> > > > > DefaultStatefulStreamCodec<Tuple>
> > > > >   {
> > > > >     private int[] indexes;
> > > > >
> > > > >     public TupleStreamCodec(int[] indexes) {
> > > > >     this.indexes = indexes;
> > > > >     }
> > > > >
> > > > >     @Override
> > > > >     public int getPartition(Tuple tuple)
> > > > >     {
> > > > >       int hashCode = 1;
> > > > >       for (int idx : indexes) {
> > > > >         hashCode = 31 * hashCode + tuple.get(i).hashCode();
> > > > >       }
> > > > >       return hashCode;
> > > > >     }
> > > > >
> > > > >   }
> > > > >
> > > > > and you can set this stream codec at input port of the Counter
> > > operator.
> > > > >     dag.setInputPortAttribute(counter.inport,
> > > > > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[]
> {0}
> > > ));
> > > > >  // partition based on first element
> > > > >
> > > > >   Note : This is sample implementation, does not handle out of
> index
> > > and
> > > > > other errors :)
> > > > >
> > > > >   - Tushar.
> > > > >
> > > > >
> > > > > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <
> > > shubham@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I need some suggestions / pointers related to defining a custom
> > > > > > partitioner.
> > > > > >
> > > > > > The operators in my application process a custom tuple class
(
> lets
> > > > call
> > > > > it
> > > > > > TUPLE) . This data type has a single field ArrayList.. So each
> > tuple
> > > > > > represents a list of values.
> > > > > >
> > > > > > For a typical word count problem, my dag would be
> > > > > >
> > > > > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE>
-> Counter ->
> > > > <TUPLE>
> > > > > ->
> > > > > > Console
> > > > > >
> > > > > > and  if i were to use  TUPLE, tokenizer will emit TUPLE that
> > contains
> > > > > array
> > > > > > list with contents <word,count>
> > > > > >
> > > > > > Now i wish to partition Counter and each instance should receive
> > all
> > > > > tuples
> > > > > > containing same word.
> > > > > >
> > > > > > I know that by default , hashCode()  method of custom tuple
class
> > > would
> > > > > be
> > > > > > used , but in my case custom tuple class is an arrayList  and
i
> > wish
> > > to
> > > > > > specify that hashCode must be done on just the first field in
> > > > ArrayList.
> > > > > In
> > > > > > a generic case it could also be on multiple fields in array
list.
> > > > > >
> > > > > > Do we have any examples that i could refer to ?
> > > > > >
> > > > > > Also can this be done at application level by setting an
> attribute
> > ?
> > > > > >
> > > > > > Thanks,
> > > > > > Shubham
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message