tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jihoon Son <ghoon...@gmail.com>
Subject Re: Repartitioner.scheduleFragmentsForJoinQuery() questions
Date Wed, 19 Feb 2014 08:17:26 GMT
Hi, Min.

Sorry for the lack of comments. This module includes have some legacy codes
which are hard to understand directly.

Tajo currently supports only the 2-way join. This is why we use the array
of size 2. In the future, we should improve to support the n-way join.

Tajo provides three join algorithms, i.e., immediate join, broadcast join,
and repartition join. The query optimizer chooses a proper algorithm
according to the size of input relations. I think that you already know
about these algorithms, but I'll present simple descriptions to answer your
questions.

The immediate join is used when both of the input relations are small
enough to be processed in one node. Thus, only one node performs the join
in one phase.
If only one relation is small, the broadcast join is used. The small
relation is replicated to all join workers. After that, join workers
perform the join with the whole small relation and a portion of the large
relation. This algorithm is also performed in one phase.
Finally, the repartition join is used when all the input relations are
large. The repartition join is performed in two phase. Both relations are
partitioned by the join key in the first phase and shuffled to workers of
the second phase.

The if condition which you asked checkes whether the join query is
performed in two phase or not. If it is, the if condition is satisfied. (I
think that the comment of "// if it is a real table stored on storage" is
wrong and should be fixed to have an opposite meaning.)

The fragment is used in different ways for each join algorithm. In the
immediate join, a fragment represents an HDFS block. Since each input
relation is sufficiently small, it can be stored as only one HDFS block.

In the broadcast join, the fragment of the smaller relation represents the
whole smaller relation while each fragment of the larger relation is a
portion of the larger relation.

In the repartition join, the fragments are fake fragments which is not
actually used. In the two-phase join, workers of the second phase fetch the
intermediate data from the workers of the first phase. Thus, the workers of
the second phase require only the fetch urls. However, in the current Tajo,
workers require a fake fragment which virtually represents the fetches of a
relation. This is a legacy implementation and we should simplify In the
future. Actually, we have a plan of representing both of file fragments and
fetch urls with an unified interface.

This is why only two fragments are scheduled for joins.

I hope that this will be an answer to your question.
If you are still confused, please fell free to ask.

Thanks,
Jihoon


2014-02-19 15:40 GMT+09:00 Min Zhou <coderplay@gmail.com>:

> Hi all,
>
> I spent more than half a day try to understand the logic
> of Repartitioner.scheduleFragmentsForJoinQuery(), still be confused.
> Can anyone help me ?
>
>
>  public static void scheduleFragmentsForJoinQuery(SubQuery subQuery)
>       throws IOException {
>     MasterPlan masterPlan = subQuery.getMasterPlan();
>     ExecutionBlock execBlock = subQuery.getBlock();
>     QueryMasterTask.QueryMasterTaskContext masterContext =
> subQuery.getContext();
>     AbstractStorageManager storageManager = subQuery.getStorageManager();
>
>     ScanNode[] scans = execBlock.getScanNodes();
>
>     Path tablePath;
>     // DOES TAJO ONLY SUPPORT 2 WAY JOINS?
>     // WHY THIS ARRAY SIZE IS ONLY 2?
>     FileFragment[] fragments = new FileFragment[2];
>     long[] stats = new long[2];
>
>     // initialize variables from the child operators
>     for (int i = 0; i < 2; i++) {
>       TableDesc tableDesc =
> masterContext.getTableDescMap().get(scans[i].getCanonicalName());
>       if (tableDesc == null) { // if it is a real table stored on storage
> // WHAT'S THE COMMENT MEAN?
>         // WHICH KIND OF SQL CAN RUN INTO HERE?
>         // TODO - to be fixed (wrong directory)
>         ExecutionBlock [] childBlocks = new ExecutionBlock[2];
>         childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
>         childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
>
>         tablePath = storageManager.getTablePath(scans[i].getTableName());
>         stats[i] =
>
> masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
>         fragments[i] = new FileFragment(scans[i].getCanonicalName(),
> tablePath, 0, 0, new String[]{UNKNOWN_HOST});
>       } else {
>         tablePath = tableDesc.getPath();
>         try {
>           stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
>         } catch (PlanningException e) {
>           throw new IOException(e);
>         }
>         fragments[i] =
> storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(),
> tableDesc.getSchema(),
>             tablePath).get(0);  // WHY JUST RETURN THE FIRST MEMBER OF THE
> SPLITS ARRAY?
>       }
>     }
>
>     LOG.info(String.format("Left Volume: %d, Right Volume: %d", stats[0],
> stats[1]));
>
>     // Assigning either fragments or fetch urls to query units
>     boolean leftSmall =
> execBlock.isBroadcastTable(scans[0].getCanonicalName());
>     boolean rightSmall =
> execBlock.isBroadcastTable(scans[1].getCanonicalName());
>
>     if (leftSmall && rightSmall) {
>       LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on
> Single Machine");
>       SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
>     } else if (leftSmall ^ rightSmall) {
>       int broadcastIdx = leftSmall ? 0 : 1;
>       int baseScanIdx = leftSmall ? 1 : 0;
>       LOG.info(String.format("[BRDCAST JOIN] base_table=%s,
> base_volume=%d",
>           scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
>       scheduleLeafTasksWithBroadcastTable(subQuery, baseScanIdx,
> fragments[broadcastIdx]);
>     } else {
>       LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
>       // The hash map is modeling as follows:
>       // <Part Id, <Table Name, Intermediate Data>>
>       Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries
= new
> HashMap<Integer, Map<String, List<IntermediateEntry>>>();
>
>       // Grouping IntermediateData by a partition key and a table name
>       for (ScanNode scan : scans) {
>         SubQuery childSubQuery =
>
> masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
>         for (QueryUnit task : childSubQuery.getQueryUnits()) {
>           if (task.getIntermediateData() != null) {
>             for (IntermediateEntry intermEntry :
> task.getIntermediateData()) {
>               if (hashEntries.containsKey(intermEntry.getPartId())) {
>                 Map<String, List<IntermediateEntry>> tbNameToInterm =
>                     hashEntries.get(intermEntry.getPartId());
>
>                 if (tbNameToInterm.containsKey(scan.getCanonicalName())) {
>
> tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
>                 } else {
>                   tbNameToInterm.put(scan.getCanonicalName(),
> TUtil.newList(intermEntry));
>                 }
>               } else {
>                 Map<String, List<IntermediateEntry>> tbNameToInterm =
>                     new HashMap<String, List<IntermediateEntry>>();
>                 tbNameToInterm.put(scan.getCanonicalName(),
> TUtil.newList(intermEntry));
>                 hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
>               }
>             }
>           }
>         }
>       }
>
>       // hashEntries can be zero if there are no input data.
>       // In the case, it will cause the zero divided exception.
>       // it avoids this problem.
>       int [] avgSize = new int[2];
>       avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0] /
> hashEntries.size());
>       avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1] /
> hashEntries.size());
>       int bothFetchSize = avgSize[0] + avgSize[1];
>
>       // Getting the desire number of join tasks according to the volumn
>       // of a larger table
>       int largerIdx = stats[0] >= stats[1] ? 0 : 1;
>       int desireJoinTaskVolumn = subQuery.getContext().getConf().
>           getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
>
>       // calculate the number of tasks according to the data size
>       int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
>       LOG.info("Larger intermediate data is approximately " + mb + " MB");
>       // determine the number of task per 64MB
>       int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
>       LOG.info("The calculated number of tasks is " + maxTaskNum);
>       LOG.info("The number of total shuffle keys is " +
> hashEntries.size());
>       // the number of join tasks cannot be larger than the number of
>       // distinct partition ids.
>       int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
>       LOG.info("The determined number of join tasks is " + joinTaskNum);
>
>       SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]); //
> WHY JUST ONLY 2 FRAGMENT FOR THIS JOIN? JUST 2 HDFS BLOCKS?
>
>       // Assign partitions to tasks in a round robin manner.
>       for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
>           : hashEntries.entrySet()) {
>         addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
>       }
>
>     }
>   }
>
>
> Thanks,
> Min
>
> --
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
>
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com
>



-- 
Jihoon Son

Database & Information Systems Group,
Prof. Yon Dohn Chung Lab.
Dept. of Computer Science & Engineering,
Korea University
1, 5-ga, Anam-dong, Seongbuk-gu,
Seoul, 136-713, Republic of Korea

Tel : +82-2-3290-3580
E-mail : jihoonson@korea.ac.kr

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message