apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mukkamula, Suryavamshivardhan (CWM-NR)" <suryavamshivardhan.mukkam...@rbc.com>
Subject RE: Reading Multiple directories in parallel
Date Tue, 28 Jun 2016 20:48:20 GMT
Hi,

Thank you so much Ram, it worked !!

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, June, 28 4:12 PM
To: users@apex.apache.org
Subject: Re: Reading Multiple directories in parallel

The return collection should match the function return type:

List<Partition<AbstractFileInputOperator>> newPartitions = new ArrayList(nPartitions);

But when adding the operator to the collection, use a matching type:

AbstractFileInputOperator op = oper;
newPartitions.add(new DefaultPartition<>(op));

Ram

On Tue, Jun 28, 2016 at 12:12 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com<mailto:suryavamshivardhan.mukkamula@rbc.com>>
wrote:
Hi Ram,

I tried in the same way, but the problem that I face is that the return type of definePartition()
method does not match with super class. Hence I temporarily changed AbstractFileInputOperator,
but looking for a better way to do this.

#############################definePartition() method #######################################

@Override
       public Collection<Partition<AbstractFileInputOperator<String>>> definePartitions(
                     Collection<Partition<AbstractFileInputOperator<String>>>
partitions, PartitioningContext context) {

              final int prevCount = partitions.size();
              if (1 < prevCount) {
                     throw new RuntimeException("Error: Dynamic repartition not supported");
              }

              //compute first and last indices of partitions for each directory
              //final int numDirs = directories.length, numParCounts = partitionCounts.length;
              final int numDirs = inputDirectories.size(), numParCounts = partCounts.size();
              //final int[] sliceFirstIndex = new int[numDirs];
              Map<String,Integer> sliceFirstIndex = new HashMap<String,Integer>(numDirs);

              LOG.info("definePartitions: prevCount = {}, directories.size = {}, " + "partitionCounts.size
= {}",
                           prevCount, numDirs, numParCounts);

              int nPartitions = 0; // desired number of partitions

              for(String sourceId : inputDirectories.keySet()){
                     sliceFirstIndex.put(sourceId, nPartitions);
                     final int nP = Integer.parseInt(partCounts.get(sourceId));
                     LOG.info("definePartitions:sourceId = {} ,no of partitions = {}, dir
= {}", sourceId, nP, inputDirectories.get(sourceId));
                     nPartitions += nP;
              }

              /*if (1 == nPartitions) {
                     LOG.info("definePartitions: Nothing to do in definePartitions");
                     return partitions; // nothing to do
              }*/

              if (nPartitions <= 0) { // error
                     final String msg = String.format("Error: Bad number of partitions %d%n",
nPartitions);
                     LOG.error(msg);
                     throw new RuntimeException(msg);
              }
              this.partitionCount = nPartitions;

              LOG.debug("definePartitions: Creating {} partitions", nPartitions);

              /*
              * Create partitions of scanners, scanner's partition method will do
              * state transfer for DirectoryScanner objects.
              */
              Kryo kryo = new Kryo();

              SlicedDirectoryScanner sds = (SlicedDirectoryScanner) scanner;
              List<SlicedDirectoryScanner> scanners = sds.partition(nPartitions, inputDirectories,
partCounts);

              // return value: new list of partitions (includes old list)
              List<Partition<AbstractFileReader>> newPartitions = new ArrayList(nPartitions);

              // parallel list of storage managers
              Collection<IdempotentStorageManager> newManagers = new ArrayList(nPartitions);

              // setup new partitions
              LOG.info("definePartitions: setting up {} new partitoins with {} monitored directories",
nPartitions, numDirs);

              final IdempotentStorageManager ism = getIdempotentStorageManager();

              for (String sourceId : inputDirectories.keySet()) {
                     int first = sliceFirstIndex.get(sourceId);
                     int last = first + Integer.parseInt(partCounts.get(sourceId));
                     String dir = Helper.changeInputDirectory(sourceId, inputDirectories.get(sourceId),snapDates.get(sourceId));
                     //String dir = inputDirectories.get(sourceId);
                     String inConfig = inputConfigFiles.get(sourceId);
                     String outConfig = outputConfigFiles.get(sourceId);
                     String loadDate = snapDates.get(sourceId);
                     LOG.info("definePartitions: first = {}, last = {}, dir = {}", first,
last, dir);
                     LOG.info("definePartitions: directory = {}, inputConfigFile = {}, outputConfigFile
= {} , loadDate = {}", dir, inConfig,outConfig,loadDate);
                     for (int i = first; i < last; ++i) {
                           AbstractFileReader oper = (AbstractFileReader)cloneObject(kryo,
this);
                           oper.setDirectory(dir);
                           oper.setInputConfFile(inConfig);
                           oper.setOutputConfFile(outConfig);
                           oper.setSourceId(sourceId);
                           oper.setLoadDate(loadDate);
                           oper.setOldInputDir(inputDirectories.get(sourceId));
                           SlicedDirectoryScanner scn = (SlicedDirectoryScanner) scanners.get(i);
                           scn.setStartIndex(first);
                           scn.setEndIndex(last);
                           scn.setDirectory(dir);

                           oper.setScanner(scn);
                           newPartitions.add(new DefaultPartition<>(oper));
                           newManagers.add(oper.getIdempotentStorageManager());
                     }
              }

              ism.partitioned(newManagers, null);
              LOG.info("definePartition: returning {} partitions", newPartitions.size());
              return newPartitions;
       }

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<mailto:ram@datatorrent.com>]
Sent: 2016, June, 28 2:35 PM
To: users@apex.apache.org<mailto:users@apex.apache.org>
Subject: Re: Reading Multiple directories in parallel

You can add those properties in your super class and simply cast the clone to
that class, so change:

AbstractFileInputOperator<String> oper = cloneObject(kryo, this);

to something like:

MyFileInputOperator<String> oper = (MyFileInputOperator) cloneObject(kryo, this);

On Tue, Jun 28, 2016 at 11:17 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com<mailto:suryavamshivardhan.mukkamula@rbc.com>>
wrote:
Hi Ram,

I would like to add the parameters for each partition like below. Each operator to be given
with its own configuration file and source identifier. If there is any other way please let
me know ?
In my current definepartition() method , I am doing similarly like below, but I have to add
setter and getter methods in AbstractFileInputOperator class.

for (int j = 0; j < numDirs; ++j) {
      int first = sliceFirstIndex[j];
      int last = first + partitionCounts[j];
      String dir = directories[j];
      LOG.info("definePartitions: first = {}, last = {}, dir = {}", first, last, dir);
      for (int i = first; i < last; ++i) {
        AbstractFileInputOperator<String> oper = cloneObject(kryo, this);
        oper.setDirectory(dir);
        oper.setSourceId(<sourceId>);
        oper.setConfigFile(<fileName>);
        //oper.setpIndex(i);
        SlicedDirectoryScanner scn = (SlicedDirectoryScanner) scanners.get(i);
        scn.setStartIndex(first);
        scn.setEndIndex(last);
        scn.setDirectory(dir);

        oper.setScanner(scn);
        newPartitions.add(new DefaultPartition<>(oper));
        newManagers.add(oper.getIdempotentStorageManager());
      }
    }


Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<mailto:ram@datatorrent.com>]
Sent: 2016, June, 28 2:03 PM
To: users@apex.apache.org<mailto:users@apex.apache.org>
Subject: Re: Reading Multiple directories in parallel

Not sure I fully understand the question but you can add whatever fields you need
to your class that extends AbstractFileInputOperator. For example,
  https://github.com/DataTorrent/examples/blob/master/tutorials/fileIO-multiDir/src/main/java/com/example/fileIO/FileReaderMultiDir.java
defines fields directories and partitionCounts.

You can then set these fields as needed in definePartitions.

Ram

On Tue, Jun 28, 2016 at 10:31 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com<mailto:suryavamshivardhan.mukkamula@rbc.com>>
wrote:
Hi Ram,

Can you please suggest , how would I add another variable (like ‘directory’) while creating
multiple partitions of AbstractFileInputOperator in the define partition method.

I have currently added variables in the AbstractFileInputOperator , which I guess not a better
way.

These variables are basically used to scan directories in parallel differently.

Regards,
Surya Vamshi


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise)
immediately. You have consented to receive the attached electronically at the above-noted
email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par
retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s)
ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver
une copie de cette confirmation pour les fins de reference future.


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise)
immediately. You have consented to receive the attached electronically at the above-noted
email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par
retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s)
ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver
une copie de cette confirmation pour les fins de reference future.


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise)
immediately. You have consented to receive the attached electronically at the above-noted
email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par
retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s)
ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver
une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise)
immediately. You have consented to receive the attached electronically at the above-noted
email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par
retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s)
ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver
une copie de cette confirmation pour les fins de reference future.
Mime
View raw message