tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hyunsik Choi <hyun...@apache.org>
Subject Re: Repartitioner.scheduleFragmentsForJoinQuery() questions
Date Wed, 19 Feb 2014 13:48:29 GMT
Jihoon,

Thank you for your detailed explanation.

Min,

Recently, we've spent on adding comments and refactoring codes. Many parts
have been rewritten as more clear codes. But, some of codes are still messy
as I think also and does not sufficient comments. Please understand it as
an evolving step of Tajo =)

Best regards,
Hyunsik


On Wed, Feb 19, 2014 at 7:11 PM, Jihoon Son <ghoonson@gmail.com> wrote:

> Sorry, Min. I finally got your point.
> The answer is, SubQuery.scheduleFragmentsForLeafQuery() is used for scans
> of the first phase. This is possible because the two scan operations of the
> first phase are executed in different sub queries. Here is an example of
> the execution plan.
>
> tajo> select count(*) from lineitem, orders where l_orderkey = o_orderkey;
>
> =======================================================
> Block Id: eb_1392803817010_0001_000001 [LEAF]
> =======================================================
>
> [Outgoing]
> [q_1392803817010_0001] 1 => 3 (type=HASH_SHUFFLE)
>
> "Scan" : {"table":"orders", "target list": ,
>   "out schema": {(0) }
>   "in schema": {(9) orders.o_orderkey (INT8), orders.o_custkey (INT8),
> orders.o_orderstatus (TEXT), orders.o_totalprice (FLOAT8),
> orders.o_orderdate (TEXT), orders.o_orderpriority (TEXT), orders.o_clerk
> (TEXT), orders.o_shippriority (INT4), orders.o_comment (TEXT)}
> =======================================================
> Block Id: eb_1392803817010_0001_000002 [LEAF]
> =======================================================
>
> [Outgoing]
> [q_1392803817010_0001] 2 => 3 (type=HASH_SHUFFLE)
>
> "Scan" : {"table":"lineitem", "target list": ,
>   "out schema": {(0) }
>   "in schema": {(16) lineitem.l_orderkey (INT8), lineitem.l_partkey (INT8),
> lineitem.l_suppkey (INT8), lineitem.l_linenumber (INT8),
> lineitem.l_quantity (FLOAT8), lineitem.l_extendedprice (FLOAT8),
> lineitem.l_discount (FLOAT8), lineitem.l_tax (FLOAT8),
> lineitem.l_returnflag (TEXT), lineitem.l_linestatus (TEXT),
> lineitem.l_shipdate (TEXT), lineitem.l_commitdate (TEXT),
> lineitem.l_receiptdate (TEXT), lineitem.l_shipinstruct (TEXT),
> lineitem.l_shipmode (TEXT), lineitem.l_comment (TEXT)}
> =======================================================
> Block Id: eb_1392803817010_0001_000003 [INTERMEDIATE]
> =======================================================
>
> [Incoming]
> [q_1392803817010_0001] 1 => 3 (type=HASH_SHUFFLE)
> [q_1392803817010_0001] 2 => 3 (type=HASH_SHUFFLE)
>
> [Outgoing]
> [q_1392803817010_0001] 3 => 4 (type=HASH_SHUFFLE, key=, num=1)
>
> "GroupBy": {"grouping fields":[, "target": ["?count_1 (INT8)"],
>   "expr": count(),
>   "out schema": {(1) ?count_1 (INT8)},
>   "in schema": {(0) }}
> "Join": "joinType": " CROSS", "target list":
> "out schema: {(0) }
> "in schema: {(0) }
> "Scan" : {"table":"eb_1392803817010_0001_000001",
>   "out schema": {(0) }
>   "in schema": {(0) } and "Scan" : {"table":"eb_1392803817010_0001_000002",
>   "out schema": {(0) }
>   "in schema": {(0) }
> =======================================================
> ....
>
> As you can see, the scan operation are performed separately in different
> sub queries.
>
> Here is an additional comment. Currently, Tajo can execute only one sub
> query at a time. That is, the two sub queries of the first phase should be
> executed simultaneously, but they don't. This should also be improved.
>
> Thanks,
> Jihoon
>
>
> 2014-02-19 18:47 GMT+09:00 Min Zhou <coderplay@gmail.com>:
>
> > I totally understand the theory of those 3 kind of join implementation.
> > Just want to know  which exact file which exact method is for the 1st
> phase
> > and which file which method is the 2nd one. This is my confusing because
> > the code is somewhat messy. From my understanding single scan will run
> into
> > SubQuery.scheduleFragmentsForLeafQuery() , which checks the input scan
> > nodes length, make sure it's 1.  I don't think the first phase of
> > repartition join share the same path of creating query units.
> >
> > Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
> >
> >
> > Thanks,
> > Min
> >
> >
> > On Wed, Feb 19, 2014 at 1:13 AM, Jihoon Son <ghoonson@gmail.com> wrote:
> >
> > > Hi, Min.
> > >
> > > The following is an example query plan of the repartition join.
> > >
> > > * second phase
> > >
> > >      join
> > >      /    \
> > > fetch  fetch
> > >
> > > * first phase
> > >
> > > store  store
> > >    |        |
> > > scan   scan
> > >
> > > As you can see in this example, the first phase consists of just scans.
> > > (The store operators are for fetching in the second phase.) The join
> > > operation is performed only in the second phase. Thus, query units of
> the
> > > first phase are created as in the simple select queries (e.g., select *
> > > from lineitem).
> > >
> > > For the second thing about the broadcast join, I cannot check for now
> > > because the JIRA seems to has some problems. I'll check the link later.
> > >
> > > Thanks,
> > > Jihoon
> > >
> > >
> > > 2014-02-19 17:55 GMT+09:00 Min Zhou <coderplay@gmail.com>:
> > >
> > > > Hi Jihoon,
> > > >
> > > > Thank you for the reply. Still confused about the repartition join,
> > which
> > > > part of the code is the first phase and which part is the second?
> Where
> > > is
> > > > the code to create queryunits for scanning the two tables of a
> > > repartition
> > > > join?
> > > >
> > > >
> > > > The second thing I should mention is that current broadcast join may
> > fail
> > > > if the larger table is large enough. see
> > > > https://issues.apache.org/jira/browse/HIVE-1641
> > > >
> > > > Min
> > > >
> > > >
> > > > On Wed, Feb 19, 2014 at 12:17 AM, Jihoon Son <ghoonson@gmail.com>
> > wrote:
> > > >
> > > > > Hi, Min.
> > > > >
> > > > > Sorry for the lack of comments. This module includes have some
> legacy
> > > > codes
> > > > > which are hard to understand directly.
> > > > >
> > > > > Tajo currently supports only the 2-way join. This is why we use the
> > > array
> > > > > of size 2. In the future, we should improve to support the n-way
> > join.
> > > > >
> > > > > Tajo provides three join algorithms, i.e., immediate join,
> broadcast
> > > > join,
> > > > > and repartition join. The query optimizer chooses a proper
> algorithm
> > > > > according to the size of input relations. I think that you already
> > know
> > > > > about these algorithms, but I'll present simple descriptions to
> > answer
> > > > your
> > > > > questions.
> > > > >
> > > > > The immediate join is used when both of the input relations are
> small
> > > > > enough to be processed in one node. Thus, only one node performs
> the
> > > join
> > > > > in one phase.
> > > > > If only one relation is small, the broadcast join is used. The
> small
> > > > > relation is replicated to all join workers. After that, join
> workers
> > > > > perform the join with the whole small relation and a portion of the
> > > large
> > > > > relation. This algorithm is also performed in one phase.
> > > > > Finally, the repartition join is used when all the input relations
> > are
> > > > > large. The repartition join is performed in two phase. Both
> relations
> > > are
> > > > > partitioned by the join key in the first phase and shuffled to
> > workers
> > > of
> > > > > the second phase.
> > > > >
> > > > > The if condition which you asked checkes whether the join query is
> > > > > performed in two phase or not. If it is, the if condition is
> > satisfied.
> > > > (I
> > > > > think that the comment of "// if it is a real table stored on
> > storage"
> > > is
> > > > > wrong and should be fixed to have an opposite meaning.)
> > > > >
> > > > > The fragment is used in different ways for each join algorithm. In
> > the
> > > > > immediate join, a fragment represents an HDFS block. Since each
> input
> > > > > relation is sufficiently small, it can be stored as only one HDFS
> > > block.
> > > > >
> > > > > In the broadcast join, the fragment of the smaller relation
> > represents
> > > > the
> > > > > whole smaller relation while each fragment of the larger relation
> is
> > a
> > > > > portion of the larger relation.
> > > > >
> > > > > In the repartition join, the fragments are fake fragments which is
> > not
> > > > > actually used. In the two-phase join, workers of the second phase
> > fetch
> > > > the
> > > > > intermediate data from the workers of the first phase. Thus, the
> > > workers
> > > > of
> > > > > the second phase require only the fetch urls. However, in the
> current
> > > > Tajo,
> > > > > workers require a fake fragment which virtually represents the
> > fetches
> > > > of a
> > > > > relation. This is a legacy implementation and we should simplify
In
> > the
> > > > > future. Actually, we have a plan of representing both of file
> > fragments
> > > > and
> > > > > fetch urls with an unified interface.
> > > > >
> > > > > This is why only two fragments are scheduled for joins.
> > > > >
> > > > > I hope that this will be an answer to your question.
> > > > > If you are still confused, please fell free to ask.
> > > > >
> > > > > Thanks,
> > > > > Jihoon
> > > > >
> > > > >
> > > > > 2014-02-19 15:40 GMT+09:00 Min Zhou <coderplay@gmail.com>:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I spent more than half a day try to understand the logic
> > > > > > of Repartitioner.scheduleFragmentsForJoinQuery(), still be
> > confused.
> > > > > > Can anyone help me ?
> > > > > >
> > > > > >
> > > > > >  public static void scheduleFragmentsForJoinQuery(SubQuery
> > subQuery)
> > > > > >       throws IOException {
> > > > > >     MasterPlan masterPlan = subQuery.getMasterPlan();
> > > > > >     ExecutionBlock execBlock = subQuery.getBlock();
> > > > > >     QueryMasterTask.QueryMasterTaskContext masterContext =
> > > > > > subQuery.getContext();
> > > > > >     AbstractStorageManager storageManager =
> > > > subQuery.getStorageManager();
> > > > > >
> > > > > >     ScanNode[] scans = execBlock.getScanNodes();
> > > > > >
> > > > > >     Path tablePath;
> > > > > >     // DOES TAJO ONLY SUPPORT 2 WAY JOINS?
> > > > > >     // WHY THIS ARRAY SIZE IS ONLY 2?
> > > > > >     FileFragment[] fragments = new FileFragment[2];
> > > > > >     long[] stats = new long[2];
> > > > > >
> > > > > >     // initialize variables from the child operators
> > > > > >     for (int i = 0; i < 2; i++) {
> > > > > >       TableDesc tableDesc =
> > > > > > masterContext.getTableDescMap().get(scans[i].getCanonicalName());
> > > > > >       if (tableDesc == null) { // if it is a real table stored
on
> > > > storage
> > > > > > // WHAT'S THE COMMENT MEAN?
> > > > > >         // WHICH KIND OF SQL CAN RUN INTO HERE?
> > > > > >         // TODO - to be fixed (wrong directory)
> > > > > >         ExecutionBlock [] childBlocks = new ExecutionBlock[2];
> > > > > >         childBlocks[0] = masterPlan.getChild(execBlock.getId(),
> 0);
> > > > > >         childBlocks[1] = masterPlan.getChild(execBlock.getId(),
> 1);
> > > > > >
> > > > > >         tablePath =
> > > > storageManager.getTablePath(scans[i].getTableName());
> > > > > >         stats[i] =
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
> > > > > >         fragments[i] = new
> > FileFragment(scans[i].getCanonicalName(),
> > > > > > tablePath, 0, 0, new String[]{UNKNOWN_HOST});
> > > > > >       } else {
> > > > > >         tablePath = tableDesc.getPath();
> > > > > >         try {
> > > > > >           stats[i] =
> > GlobalPlanner.computeDescendentVolume(scans[i]);
> > > > > >         } catch (PlanningException e) {
> > > > > >           throw new IOException(e);
> > > > > >         }
> > > > > >         fragments[i] =
> > > > > > storageManager.getSplits(scans[i].getCanonicalName(),
> > > > > tableDesc.getMeta(),
> > > > > > tableDesc.getSchema(),
> > > > > >             tablePath).get(0);  // WHY JUST RETURN THE FIRST
> MEMBER
> > > OF
> > > > > THE
> > > > > > SPLITS ARRAY?
> > > > > >       }
> > > > > >     }
> > > > > >
> > > > > >     LOG.info(String.format("Left Volume: %d, Right Volume: %d",
> > > > stats[0],
> > > > > > stats[1]));
> > > > > >
> > > > > >     // Assigning either fragments or fetch urls to query units
> > > > > >     boolean leftSmall =
> > > > > > execBlock.isBroadcastTable(scans[0].getCanonicalName());
> > > > > >     boolean rightSmall =
> > > > > > execBlock.isBroadcastTable(scans[1].getCanonicalName());
> > > > > >
> > > > > >     if (leftSmall && rightSmall) {
> > > > > >       LOG.info("[Distributed Join Strategy] : Immediate Two
Way
> > Join
> > > on
> > > > > > Single Machine");
> > > > > >       SubQuery.scheduleFragment(subQuery, fragments[0],
> > > fragments[1]);
> > > > > >     } else if (leftSmall ^ rightSmall) {
> > > > > >       int broadcastIdx = leftSmall ? 0 : 1;
> > > > > >       int baseScanIdx = leftSmall ? 1 : 0;
> > > > > >       LOG.info(String.format("[BRDCAST JOIN] base_table=%s,
> > > > > > base_volume=%d",
> > > > > >           scans[baseScanIdx].getCanonicalName(),
> > > stats[baseScanIdx]));
> > > > > >       scheduleLeafTasksWithBroadcastTable(subQuery, baseScanIdx,
> > > > > > fragments[broadcastIdx]);
> > > > > >     } else {
> > > > > >       LOG.info("[Distributed Join Strategy] : Symmetric
> Repartition
> > > > > Join");
> > > > > >       // The hash map is modeling as follows:
> > > > > >       // <Part Id, <Table Name, Intermediate Data>>
> > > > > >       Map<Integer, Map<String, List<IntermediateEntry>>>
> > hashEntries
> > > =
> > > > > new
> > > > > > HashMap<Integer, Map<String, List<IntermediateEntry>>>();
> > > > > >
> > > > > >       // Grouping IntermediateData by a partition key and a
table
> > > name
> > > > > >       for (ScanNode scan : scans) {
> > > > > >         SubQuery childSubQuery =
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
> > > > > >         for (QueryUnit task : childSubQuery.getQueryUnits())
{
> > > > > >           if (task.getIntermediateData() != null) {
> > > > > >             for (IntermediateEntry intermEntry :
> > > > > > task.getIntermediateData()) {
> > > > > >               if
> > (hashEntries.containsKey(intermEntry.getPartId())) {
> > > > > >                 Map<String, List<IntermediateEntry>>
> > tbNameToInterm =
> > > > > >                     hashEntries.get(intermEntry.getPartId());
> > > > > >
> > > > > >                 if
> > > > (tbNameToInterm.containsKey(scan.getCanonicalName()))
> > > > > {
> > > > > >
> > > > > > tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
> > > > > >                 } else {
> > > > > >                   tbNameToInterm.put(scan.getCanonicalName(),
> > > > > > TUtil.newList(intermEntry));
> > > > > >                 }
> > > > > >               } else {
> > > > > >                 Map<String, List<IntermediateEntry>>
> > tbNameToInterm =
> > > > > >                     new HashMap<String,
> List<IntermediateEntry>>();
> > > > > >                 tbNameToInterm.put(scan.getCanonicalName(),
> > > > > > TUtil.newList(intermEntry));
> > > > > >                 hashEntries.put(intermEntry.getPartId(),
> > > > tbNameToInterm);
> > > > > >               }
> > > > > >             }
> > > > > >           }
> > > > > >         }
> > > > > >       }
> > > > > >
> > > > > >       // hashEntries can be zero if there are no input data.
> > > > > >       // In the case, it will cause the zero divided exception.
> > > > > >       // it avoids this problem.
> > > > > >       int [] avgSize = new int[2];
> > > > > >       avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0]
> /
> > > > > > hashEntries.size());
> > > > > >       avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1]
> /
> > > > > > hashEntries.size());
> > > > > >       int bothFetchSize = avgSize[0] + avgSize[1];
> > > > > >
> > > > > >       // Getting the desire number of join tasks according to
the
> > > > volumn
> > > > > >       // of a larger table
> > > > > >       int largerIdx = stats[0] >= stats[1] ? 0 : 1;
> > > > > >       int desireJoinTaskVolumn = subQuery.getContext().getConf().
> > > > > >           getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
> > > > > >
> > > > > >       // calculate the number of tasks according to the data
size
> > > > > >       int mb = (int) Math.ceil((double)stats[largerIdx] /
> 1048576);
> > > > > >       LOG.info("Larger intermediate data is approximately "
+ mb
> +
> > "
> > > > > MB");
> > > > > >       // determine the number of task per 64MB
> > > > > >       int maxTaskNum = (int) Math.ceil((double)mb /
> > > > > desireJoinTaskVolumn);
> > > > > >       LOG.info("The calculated number of tasks is " +
> maxTaskNum);
> > > > > >       LOG.info("The number of total shuffle keys is " +
> > > > > > hashEntries.size());
> > > > > >       // the number of join tasks cannot be larger than the
> number
> > of
> > > > > >       // distinct partition ids.
> > > > > >       int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
> > > > > >       LOG.info("The determined number of join tasks is " +
> > > > joinTaskNum);
> > > > > >
> > > > > >       SubQuery.scheduleFragment(subQuery, fragments[0],
> > > fragments[1]);
> > > > //
> > > > > > WHY JUST ONLY 2 FRAGMENT FOR THIS JOIN? JUST 2 HDFS BLOCKS?
> > > > > >
> > > > > >       // Assign partitions to tasks in a round robin manner.
> > > > > >       for (Entry<Integer, Map<String, List<IntermediateEntry>>>
> > entry
> > > > > >           : hashEntries.entrySet()) {
> > > > > >         addJoinShuffle(subQuery, entry.getKey(),
> entry.getValue());
> > > > > >       }
> > > > > >
> > > > > >     }
> > > > > >   }
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Min
> > > > > >
> > > > > > --
> > > > > > My research interests are distributed systems, parallel computing
> > and
> > > > > > bytecode based virtual machine.
> > > > > >
> > > > > > My profile:
> > > > > > http://www.linkedin.com/in/coderplay
> > > > > > My blog:
> > > > > > http://coderplay.javaeye.com
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Jihoon Son
> > > > >
> > > > > Database & Information Systems Group,
> > > > > Prof. Yon Dohn Chung Lab.
> > > > > Dept. of Computer Science & Engineering,
> > > > > Korea University
> > > > > 1, 5-ga, Anam-dong, Seongbuk-gu,
> > > > > Seoul, 136-713, Republic of Korea
> > > > >
> > > > > Tel : +82-2-3290-3580
> > > > > E-mail : jihoonson@korea.ac.kr
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > My research interests are distributed systems, parallel computing and
> > > > bytecode based virtual machine.
> > > >
> > > > My profile:
> > > > http://www.linkedin.com/in/coderplay
> > > > My blog:
> > > > http://coderplay.javaeye.com
> > > >
> > >
> > >
> > >
> > > --
> > > Jihoon Son
> > >
> > > Database & Information Systems Group,
> > > Prof. Yon Dohn Chung Lab.
> > > Dept. of Computer Science & Engineering,
> > > Korea University
> > > 1, 5-ga, Anam-dong, Seongbuk-gu,
> > > Seoul, 136-713, Republic of Korea
> > >
> > > Tel : +82-2-3290-3580
> > > E-mail : jihoonson@korea.ac.kr
> > >
> >
> >
> >
> > --
> > My research interests are distributed systems, parallel computing and
> > bytecode based virtual machine.
> >
> > My profile:
> > http://www.linkedin.com/in/coderplay
> > My blog:
> > http://coderplay.javaeye.com
> >
>
>
>
> --
> Jihoon Son
>
> Database & Information Systems Group,
> Prof. Yon Dohn Chung Lab.
> Dept. of Computer Science & Engineering,
> Korea University
> 1, 5-ga, Anam-dong, Seongbuk-gu,
> Seoul, 136-713, Republic of Korea
>
> Tel : +82-2-3290-3580
> E-mail : jihoonson@korea.ac.kr
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message