apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Munagala Ramanath <...@datatorrent.com>
Subject Re: Reading Multiple directories in parallel
Date Tue, 28 Jun 2016 20:12:07 GMT
The return collection should match the function return type:

*List<Partition<AbstractFileInputOperator>> newPartitions = new
ArrayList(nPartitions);*

But when adding the operator to the collection, use a matching type:

*AbstractFileInputOperator op = oper;*
*newPartitions.add(new DefaultPartition<>(op));*

Ram

On Tue, Jun 28, 2016 at 12:12 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Hi Ram,
>
>
>
> I tried in the same way, but the problem that I face is that the return
> type of definePartition() method does not match with super class. Hence I
> temporarily changed AbstractFileInputOperator, but looking for a better way
> to do this.
>
>
>
> #############################definePartition() method
> #######################################
>
>
>
> @Override
>
>        *public* Collection<Partition<AbstractFileInputOperator<String>>>
> definePartitions(
>
>
> Collection<Partition<AbstractFileInputOperator<String>>> partitions,
> PartitioningContext context) {
>
>
>
>               *final* *int* prevCount = partitions.size();
>
>               *if* (1 < prevCount) {
>
>                      *throw* *new* RuntimeException("Error: Dynamic
> repartition not supported");
>
>               }
>
>
>
>               //compute first and last indices of partitions for each
> directory
>
>               //final * int* numDirs = directories.length, numParCounts =
> partitionCounts.length;
>
>               *final* *int* numDirs = inputDirectories.size(),
> numParCounts = partCounts.size();
>
>               //final * int*[] sliceFirstIndex = new *int*[numDirs];
>
>               Map<String,Integer> sliceFirstIndex = *new*
> HashMap<String,Integer>(numDirs);
>
>
>
>               *LOG*.info("definePartitions: prevCount = {},
> directories.size = {}, " + "partitionCounts.size = {}",
>
>                            prevCount, numDirs, numParCounts);
>
>
>
>               *int* nPartitions = 0; // desired number of partitions
>
>
>
>               *for*(String sourceId : inputDirectories.keySet()){
>
>                      sliceFirstIndex.put(sourceId, nPartitions);
>
>                      *final* *int* nP = Integer.*parseInt*(partCounts.get(
> sourceId));
>
>                      *LOG*.info("definePartitions:sourceId = {} ,no of
> partitions = {}, dir = {}", sourceId, nP, inputDirectories.get(sourceId));
>
>                      nPartitions += nP;
>
>               }
>
>
>
>               /*if (1 == nPartitions) {
>
>                      LOG.info("definePartitions: Nothing to do in
> definePartitions");
>
>                      return partitions; // nothing to do
>
>               }*/
>
>
>
>               *if* (nPartitions <= 0) { // error
>
>                      *final* String msg = String.*format*("Error: Bad
> number of partitions %d%n", nPartitions);
>
>                      *LOG*.error(msg);
>
>                      *throw* *new* RuntimeException(msg);
>
>               }
>
>               *this*.partitionCount = nPartitions;
>
>
>
>               *LOG*.debug("definePartitions: Creating {} partitions",
> nPartitions);
>
>
>
>               /*
>
>               * Create partitions of scanners, scanner's partition method
> will do
>
>               * state transfer for DirectoryScanner objects.
>
>               */
>
>               Kryo kryo = *new* Kryo();
>
>
>
>               SlicedDirectoryScanner sds = (SlicedDirectoryScanner)
> scanner;
>
>               List<SlicedDirectoryScanner> scanners = sds.partition(
> nPartitions, inputDirectories, partCounts);
>
>
>
>               // return value: new list of partitions (includes old list)
>
>               List<Partition<AbstractFileReader>> newPartitions = *new**
> ArrayList(**nPartitions**)*;
>
>
>
>               // parallel list of storage managers
>
>               Collection<IdempotentStorageManager> newManagers = *new**
> ArrayList(**nPartitions**)*;
>
>
>
>               // setup new partitions
>
>               *LOG*.info("definePartitions: setting up {} new partitoins
> with {} monitored directories", nPartitions, numDirs);
>
>
>
>               *final* IdempotentStorageManager ism =
> getIdempotentStorageManager();
>
>
>
>               *for* (String sourceId : inputDirectories.keySet()) {
>
>                      *int* first = sliceFirstIndex.get(sourceId);
>
>                      *int* last = first + Integer.*parseInt*(partCounts
> .get(sourceId));
>
>                      String dir = Helper.*changeInputDirectory*(sourceId,
> inputDirectories.get(sourceId),snapDates.get(sourceId));
>
>                      //String *dir* = inputDirectories.get(sourceId);
>
>                      String inConfig = inputConfigFiles.get(sourceId);
>
>                      String outConfig = outputConfigFiles.get(sourceId);
>
>                      String loadDate = snapDates.get(sourceId);
>
>                      *LOG*.info("definePartitions: first = {}, last = {},
> dir = {}", first, last, dir);
>
>                      *LOG*.info("definePartitions: directory = {},
> inputConfigFile = {}, outputConfigFile = {} , loadDate = {}", dir,
> inConfig,outConfig,loadDate);
>
>                      *for* (*int* i = first; i < last; ++i) {
>
>                            AbstractFileReader oper = (AbstractFileReader)
> *cloneObject*(kryo, *this*);
>
>                            oper.setDirectory(dir);
>
>                            oper.setInputConfFile(inConfig);
>
>                            oper.setOutputConfFile(outConfig);
>
>                            oper.setSourceId(sourceId);
>
>                            oper.setLoadDate(loadDate);
>
>                            oper.setOldInputDir(inputDirectories.get(
> sourceId));
>
>                            SlicedDirectoryScanner scn =
> (SlicedDirectoryScanner) scanners.get(i);
>
>                            scn.setStartIndex(first);
>
>                            scn.setEndIndex(last);
>
>                            scn.setDirectory(dir);
>
>
>
>                            oper.setScanner(scn);
>
>                            newPartitions.add(*new* DefaultPartition<>(oper
> ));
>
>                            newManagers.add(oper
> .getIdempotentStorageManager());
>
>                      }
>
>               }
>
>
>
>               ism.partitioned(newManagers, *null*);
>
>               *LOG*.info("definePartition: returning {} partitions",
> newPartitions.size());
>
>               *return* *newPartitions*;
>
>        }
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, June, 28 2:35 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Reading Multiple directories in parallel
>
>
>
> You can add those properties in your super class and simply cast the clone
> to
>
> that class, so change:
>
>
>
> *AbstractFileInputOperator<String> oper = cloneObject(kryo, this);*
>
>
>
> to something like:
>
>
>
> MyFileInputOperator<String> oper = (MyFileInputOperator) cloneObject(kryo,
> this);
>
>
>
> On Tue, Jun 28, 2016 at 11:17 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Ram,
>
>
>
> I would like to add the parameters for each partition like below. Each
> operator to be given with its own configuration file and source identifier.
> If there is any other way please let me know ?
>
> In my current definepartition() method , I am doing similarly like below,
> but I have to add setter and getter methods in AbstractFileInputOperator
> class.
>
>
>
> for (int j = 0; j < numDirs; ++j) {
>
>       int first = sliceFirstIndex[j];
>
>       int last = first + partitionCounts[j];
>
>       String dir = directories[j];
>
>       LOG.info("definePartitions: first = {}, last = {}, dir = {}", first,
> last, dir);
>
>       for (int i = first; i < last; ++i) {
>
>         AbstractFileInputOperator<String> oper = cloneObject(kryo, this);
>
>         oper.setDirectory(dir);
>
>         oper.setSourceId(<sourceId>);
>
>         oper.setConfigFile(<fileName>);
>
>         //oper.setpIndex(i);
>
>         SlicedDirectoryScanner scn = (SlicedDirectoryScanner)
> scanners.get(i);
>
>         scn.setStartIndex(first);
>
>         scn.setEndIndex(last);
>
>         scn.setDirectory(dir);
>
>
>
>         oper.setScanner(scn);
>
>         newPartitions.add(new DefaultPartition<>(oper));
>
>         newManagers.add(oper.getIdempotentStorageManager());
>
>       }
>
>     }
>
>
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, June, 28 2:03 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Reading Multiple directories in parallel
>
>
>
> Not sure I fully understand the question but you can add whatever fields
> you need
>
> to your class that extends *AbstractFileInputOperator*. For example,
>
>
> https://github.com/DataTorrent/examples/blob/master/tutorials/fileIO-multiDir/src/main/java/com/example/fileIO/FileReaderMultiDir.java
>
> defines fields *directories* and *partitionCounts*.
>
>
>
> You can then set these fields as needed in *definePartitions*.
>
>
>
> Ram
>
>
>
> On Tue, Jun 28, 2016 at 10:31 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Ram,
>
>
>
> Can you please suggest , how would I add another variable (like
> ‘directory’) while creating multiple partitions of
> AbstractFileInputOperator in the define partition method.
>
>
>
> I have currently added variables in the AbstractFileInputOperator , which
> I guess not a better way.
>
>
>
> These variables are basically used to scan directories in parallel
> differently.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>

Mime
View raw message