tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jihoon Son <ghoon...@gmail.com>
Subject Re: Repartitioner.scheduleFragmentsForJoinQuery() questions
Date Wed, 19 Feb 2014 09:13:38 GMT
Hi, Min.

The following is an example query plan of the repartition join.

* second phase

     join
     /    \
fetch  fetch

* first phase

store  store
   |        |
scan   scan

As you can see in this example, the first phase consists of just scans.
(The store operators are for fetching in the second phase.) The join
operation is performed only in the second phase. Thus, query units of the
first phase are created as in the simple select queries (e.g., select *
from lineitem).

For the second thing about the broadcast join, I cannot check for now
because the JIRA seems to has some problems. I'll check the link later.

Thanks,
Jihoon


2014-02-19 17:55 GMT+09:00 Min Zhou <coderplay@gmail.com>:

> Hi Jihoon,
>
> Thank you for the reply. Still confused about the repartition join, which
> part of the code is the first phase and which part is the second? Where is
> the code to create queryunits for scanning the two tables of a repartition
> join?
>
>
> The second thing I should mention is that current broadcast join may fail
> if the larger table is large enough. see
> https://issues.apache.org/jira/browse/HIVE-1641
>
> Min
>
>
> On Wed, Feb 19, 2014 at 12:17 AM, Jihoon Son <ghoonson@gmail.com> wrote:
>
> > Hi, Min.
> >
> > Sorry for the lack of comments. This module includes have some legacy
> codes
> > which are hard to understand directly.
> >
> > Tajo currently supports only the 2-way join. This is why we use the array
> > of size 2. In the future, we should improve to support the n-way join.
> >
> > Tajo provides three join algorithms, i.e., immediate join, broadcast
> join,
> > and repartition join. The query optimizer chooses a proper algorithm
> > according to the size of input relations. I think that you already know
> > about these algorithms, but I'll present simple descriptions to answer
> your
> > questions.
> >
> > The immediate join is used when both of the input relations are small
> > enough to be processed in one node. Thus, only one node performs the join
> > in one phase.
> > If only one relation is small, the broadcast join is used. The small
> > relation is replicated to all join workers. After that, join workers
> > perform the join with the whole small relation and a portion of the large
> > relation. This algorithm is also performed in one phase.
> > Finally, the repartition join is used when all the input relations are
> > large. The repartition join is performed in two phase. Both relations are
> > partitioned by the join key in the first phase and shuffled to workers of
> > the second phase.
> >
> > The if condition which you asked checkes whether the join query is
> > performed in two phase or not. If it is, the if condition is satisfied.
> (I
> > think that the comment of "// if it is a real table stored on storage" is
> > wrong and should be fixed to have an opposite meaning.)
> >
> > The fragment is used in different ways for each join algorithm. In the
> > immediate join, a fragment represents an HDFS block. Since each input
> > relation is sufficiently small, it can be stored as only one HDFS block.
> >
> > In the broadcast join, the fragment of the smaller relation represents
> the
> > whole smaller relation while each fragment of the larger relation is a
> > portion of the larger relation.
> >
> > In the repartition join, the fragments are fake fragments which is not
> > actually used. In the two-phase join, workers of the second phase fetch
> the
> > intermediate data from the workers of the first phase. Thus, the workers
> of
> > the second phase require only the fetch urls. However, in the current
> Tajo,
> > workers require a fake fragment which virtually represents the fetches
> of a
> > relation. This is a legacy implementation and we should simplify In the
> > future. Actually, we have a plan of representing both of file fragments
> and
> > fetch urls with an unified interface.
> >
> > This is why only two fragments are scheduled for joins.
> >
> > I hope that this will be an answer to your question.
> > If you are still confused, please fell free to ask.
> >
> > Thanks,
> > Jihoon
> >
> >
> > 2014-02-19 15:40 GMT+09:00 Min Zhou <coderplay@gmail.com>:
> >
> > > Hi all,
> > >
> > > I spent more than half a day try to understand the logic
> > > of Repartitioner.scheduleFragmentsForJoinQuery(), still be confused.
> > > Can anyone help me ?
> > >
> > >
> > >  public static void scheduleFragmentsForJoinQuery(SubQuery subQuery)
> > >       throws IOException {
> > >     MasterPlan masterPlan = subQuery.getMasterPlan();
> > >     ExecutionBlock execBlock = subQuery.getBlock();
> > >     QueryMasterTask.QueryMasterTaskContext masterContext =
> > > subQuery.getContext();
> > >     AbstractStorageManager storageManager =
> subQuery.getStorageManager();
> > >
> > >     ScanNode[] scans = execBlock.getScanNodes();
> > >
> > >     Path tablePath;
> > >     // DOES TAJO ONLY SUPPORT 2 WAY JOINS?
> > >     // WHY THIS ARRAY SIZE IS ONLY 2?
> > >     FileFragment[] fragments = new FileFragment[2];
> > >     long[] stats = new long[2];
> > >
> > >     // initialize variables from the child operators
> > >     for (int i = 0; i < 2; i++) {
> > >       TableDesc tableDesc =
> > > masterContext.getTableDescMap().get(scans[i].getCanonicalName());
> > >       if (tableDesc == null) { // if it is a real table stored on
> storage
> > > // WHAT'S THE COMMENT MEAN?
> > >         // WHICH KIND OF SQL CAN RUN INTO HERE?
> > >         // TODO - to be fixed (wrong directory)
> > >         ExecutionBlock [] childBlocks = new ExecutionBlock[2];
> > >         childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
> > >         childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
> > >
> > >         tablePath =
> storageManager.getTablePath(scans[i].getTableName());
> > >         stats[i] =
> > >
> > >
> >
> masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
> > >         fragments[i] = new FileFragment(scans[i].getCanonicalName(),
> > > tablePath, 0, 0, new String[]{UNKNOWN_HOST});
> > >       } else {
> > >         tablePath = tableDesc.getPath();
> > >         try {
> > >           stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
> > >         } catch (PlanningException e) {
> > >           throw new IOException(e);
> > >         }
> > >         fragments[i] =
> > > storageManager.getSplits(scans[i].getCanonicalName(),
> > tableDesc.getMeta(),
> > > tableDesc.getSchema(),
> > >             tablePath).get(0);  // WHY JUST RETURN THE FIRST MEMBER OF
> > THE
> > > SPLITS ARRAY?
> > >       }
> > >     }
> > >
> > >     LOG.info(String.format("Left Volume: %d, Right Volume: %d",
> stats[0],
> > > stats[1]));
> > >
> > >     // Assigning either fragments or fetch urls to query units
> > >     boolean leftSmall =
> > > execBlock.isBroadcastTable(scans[0].getCanonicalName());
> > >     boolean rightSmall =
> > > execBlock.isBroadcastTable(scans[1].getCanonicalName());
> > >
> > >     if (leftSmall && rightSmall) {
> > >       LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on
> > > Single Machine");
> > >       SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
> > >     } else if (leftSmall ^ rightSmall) {
> > >       int broadcastIdx = leftSmall ? 0 : 1;
> > >       int baseScanIdx = leftSmall ? 1 : 0;
> > >       LOG.info(String.format("[BRDCAST JOIN] base_table=%s,
> > > base_volume=%d",
> > >           scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
> > >       scheduleLeafTasksWithBroadcastTable(subQuery, baseScanIdx,
> > > fragments[broadcastIdx]);
> > >     } else {
> > >       LOG.info("[Distributed Join Strategy] : Symmetric Repartition
> > Join");
> > >       // The hash map is modeling as follows:
> > >       // <Part Id, <Table Name, Intermediate Data>>
> > >       Map<Integer, Map<String, List<IntermediateEntry>>>
hashEntries =
> > new
> > > HashMap<Integer, Map<String, List<IntermediateEntry>>>();
> > >
> > >       // Grouping IntermediateData by a partition key and a table name
> > >       for (ScanNode scan : scans) {
> > >         SubQuery childSubQuery =
> > >
> > >
> >
> masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
> > >         for (QueryUnit task : childSubQuery.getQueryUnits()) {
> > >           if (task.getIntermediateData() != null) {
> > >             for (IntermediateEntry intermEntry :
> > > task.getIntermediateData()) {
> > >               if (hashEntries.containsKey(intermEntry.getPartId())) {
> > >                 Map<String, List<IntermediateEntry>> tbNameToInterm
=
> > >                     hashEntries.get(intermEntry.getPartId());
> > >
> > >                 if
> (tbNameToInterm.containsKey(scan.getCanonicalName()))
> > {
> > >
> > > tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
> > >                 } else {
> > >                   tbNameToInterm.put(scan.getCanonicalName(),
> > > TUtil.newList(intermEntry));
> > >                 }
> > >               } else {
> > >                 Map<String, List<IntermediateEntry>> tbNameToInterm
=
> > >                     new HashMap<String, List<IntermediateEntry>>();
> > >                 tbNameToInterm.put(scan.getCanonicalName(),
> > > TUtil.newList(intermEntry));
> > >                 hashEntries.put(intermEntry.getPartId(),
> tbNameToInterm);
> > >               }
> > >             }
> > >           }
> > >         }
> > >       }
> > >
> > >       // hashEntries can be zero if there are no input data.
> > >       // In the case, it will cause the zero divided exception.
> > >       // it avoids this problem.
> > >       int [] avgSize = new int[2];
> > >       avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0] /
> > > hashEntries.size());
> > >       avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1] /
> > > hashEntries.size());
> > >       int bothFetchSize = avgSize[0] + avgSize[1];
> > >
> > >       // Getting the desire number of join tasks according to the
> volumn
> > >       // of a larger table
> > >       int largerIdx = stats[0] >= stats[1] ? 0 : 1;
> > >       int desireJoinTaskVolumn = subQuery.getContext().getConf().
> > >           getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
> > >
> > >       // calculate the number of tasks according to the data size
> > >       int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
> > >       LOG.info("Larger intermediate data is approximately " + mb + "
> > MB");
> > >       // determine the number of task per 64MB
> > >       int maxTaskNum = (int) Math.ceil((double)mb /
> > desireJoinTaskVolumn);
> > >       LOG.info("The calculated number of tasks is " + maxTaskNum);
> > >       LOG.info("The number of total shuffle keys is " +
> > > hashEntries.size());
> > >       // the number of join tasks cannot be larger than the number of
> > >       // distinct partition ids.
> > >       int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
> > >       LOG.info("The determined number of join tasks is " +
> joinTaskNum);
> > >
> > >       SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
> //
> > > WHY JUST ONLY 2 FRAGMENT FOR THIS JOIN? JUST 2 HDFS BLOCKS?
> > >
> > >       // Assign partitions to tasks in a round robin manner.
> > >       for (Entry<Integer, Map<String, List<IntermediateEntry>>>
entry
> > >           : hashEntries.entrySet()) {
> > >         addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
> > >       }
> > >
> > >     }
> > >   }
> > >
> > >
> > > Thanks,
> > > Min
> > >
> > > --
> > > My research interests are distributed systems, parallel computing and
> > > bytecode based virtual machine.
> > >
> > > My profile:
> > > http://www.linkedin.com/in/coderplay
> > > My blog:
> > > http://coderplay.javaeye.com
> > >
> >
> >
> >
> > --
> > Jihoon Son
> >
> > Database & Information Systems Group,
> > Prof. Yon Dohn Chung Lab.
> > Dept. of Computer Science & Engineering,
> > Korea University
> > 1, 5-ga, Anam-dong, Seongbuk-gu,
> > Seoul, 136-713, Republic of Korea
> >
> > Tel : +82-2-3290-3580
> > E-mail : jihoonson@korea.ac.kr
> >
>
>
>
> --
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
>
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com
>



-- 
Jihoon Son

Database & Information Systems Group,
Prof. Yon Dohn Chung Lab.
Dept. of Computer Science & Engineering,
Korea University
1, 5-ga, Anam-dong, Seongbuk-gu,
Seoul, 136-713, Republic of Korea

Tel : +82-2-3290-3580
E-mail : jihoonson@korea.ac.kr

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message