tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jihoon Son <ghoon...@gmail.com>
Subject Re: Repartitioner.scheduleFragmentsForJoinQuery() questions
Date Wed, 19 Feb 2014 10:11:35 GMT
Sorry, Min. I finally got your point.
The answer is, SubQuery.scheduleFragmentsForLeafQuery() is used for scans
of the first phase. This is possible because the two scan operations of the
first phase are executed in different sub queries. Here is an example of
the execution plan.

tajo> select count(*) from lineitem, orders where l_orderkey = o_orderkey;

=======================================================
Block Id: eb_1392803817010_0001_000001 [LEAF]
=======================================================

[Outgoing]
[q_1392803817010_0001] 1 => 3 (type=HASH_SHUFFLE)

"Scan" : {"table":"orders", "target list": ,
  "out schema": {(0) }
  "in schema": {(9) orders.o_orderkey (INT8), orders.o_custkey (INT8),
orders.o_orderstatus (TEXT), orders.o_totalprice (FLOAT8),
orders.o_orderdate (TEXT), orders.o_orderpriority (TEXT), orders.o_clerk
(TEXT), orders.o_shippriority (INT4), orders.o_comment (TEXT)}
=======================================================
Block Id: eb_1392803817010_0001_000002 [LEAF]
=======================================================

[Outgoing]
[q_1392803817010_0001] 2 => 3 (type=HASH_SHUFFLE)

"Scan" : {"table":"lineitem", "target list": ,
  "out schema": {(0) }
  "in schema": {(16) lineitem.l_orderkey (INT8), lineitem.l_partkey (INT8),
lineitem.l_suppkey (INT8), lineitem.l_linenumber (INT8),
lineitem.l_quantity (FLOAT8), lineitem.l_extendedprice (FLOAT8),
lineitem.l_discount (FLOAT8), lineitem.l_tax (FLOAT8),
lineitem.l_returnflag (TEXT), lineitem.l_linestatus (TEXT),
lineitem.l_shipdate (TEXT), lineitem.l_commitdate (TEXT),
lineitem.l_receiptdate (TEXT), lineitem.l_shipinstruct (TEXT),
lineitem.l_shipmode (TEXT), lineitem.l_comment (TEXT)}
=======================================================
Block Id: eb_1392803817010_0001_000003 [INTERMEDIATE]
=======================================================

[Incoming]
[q_1392803817010_0001] 1 => 3 (type=HASH_SHUFFLE)
[q_1392803817010_0001] 2 => 3 (type=HASH_SHUFFLE)

[Outgoing]
[q_1392803817010_0001] 3 => 4 (type=HASH_SHUFFLE, key=, num=1)

"GroupBy": {"grouping fields":[, "target": ["?count_1 (INT8)"],
  "expr": count(),
  "out schema": {(1) ?count_1 (INT8)},
  "in schema": {(0) }}
"Join": "joinType": " CROSS", "target list":
"out schema: {(0) }
"in schema: {(0) }
"Scan" : {"table":"eb_1392803817010_0001_000001",
  "out schema": {(0) }
  "in schema": {(0) } and "Scan" : {"table":"eb_1392803817010_0001_000002",
  "out schema": {(0) }
  "in schema": {(0) }
=======================================================
....

As you can see, the scan operation are performed separately in different
sub queries.

Here is an additional comment. Currently, Tajo can execute only one sub
query at a time. That is, the two sub queries of the first phase should be
executed simultaneously, but they don't. This should also be improved.

Thanks,
Jihoon


2014-02-19 18:47 GMT+09:00 Min Zhou <coderplay@gmail.com>:

> I totally understand the theory of those 3 kind of join implementation.
> Just want to know  which exact file which exact method is for the 1st phase
> and which file which method is the 2nd one. This is my confusing because
> the code is somewhat messy. From my understanding single scan will run into
> SubQuery.scheduleFragmentsForLeafQuery() , which checks the input scan
> nodes length, make sure it's 1.  I don't think the first phase of
> repartition join share the same path of creating query units.
>
> Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
>
>
> Thanks,
> Min
>
>
> On Wed, Feb 19, 2014 at 1:13 AM, Jihoon Son <ghoonson@gmail.com> wrote:
>
> > Hi, Min.
> >
> > The following is an example query plan of the repartition join.
> >
> > * second phase
> >
> >      join
> >      /    \
> > fetch  fetch
> >
> > * first phase
> >
> > store  store
> >    |        |
> > scan   scan
> >
> > As you can see in this example, the first phase consists of just scans.
> > (The store operators are for fetching in the second phase.) The join
> > operation is performed only in the second phase. Thus, query units of the
> > first phase are created as in the simple select queries (e.g., select *
> > from lineitem).
> >
> > For the second thing about the broadcast join, I cannot check for now
> > because the JIRA seems to has some problems. I'll check the link later.
> >
> > Thanks,
> > Jihoon
> >
> >
> > 2014-02-19 17:55 GMT+09:00 Min Zhou <coderplay@gmail.com>:
> >
> > > Hi Jihoon,
> > >
> > > Thank you for the reply. Still confused about the repartition join,
> which
> > > part of the code is the first phase and which part is the second? Where
> > is
> > > the code to create queryunits for scanning the two tables of a
> > repartition
> > > join?
> > >
> > >
> > > The second thing I should mention is that current broadcast join may
> fail
> > > if the larger table is large enough. see
> > > https://issues.apache.org/jira/browse/HIVE-1641
> > >
> > > Min
> > >
> > >
> > > On Wed, Feb 19, 2014 at 12:17 AM, Jihoon Son <ghoonson@gmail.com>
> wrote:
> > >
> > > > Hi, Min.
> > > >
> > > > Sorry for the lack of comments. This module includes have some legacy
> > > codes
> > > > which are hard to understand directly.
> > > >
> > > > Tajo currently supports only the 2-way join. This is why we use the
> > array
> > > > of size 2. In the future, we should improve to support the n-way
> join.
> > > >
> > > > Tajo provides three join algorithms, i.e., immediate join, broadcast
> > > join,
> > > > and repartition join. The query optimizer chooses a proper algorithm
> > > > according to the size of input relations. I think that you already
> know
> > > > about these algorithms, but I'll present simple descriptions to
> answer
> > > your
> > > > questions.
> > > >
> > > > The immediate join is used when both of the input relations are small
> > > > enough to be processed in one node. Thus, only one node performs the
> > join
> > > > in one phase.
> > > > If only one relation is small, the broadcast join is used. The small
> > > > relation is replicated to all join workers. After that, join workers
> > > > perform the join with the whole small relation and a portion of the
> > large
> > > > relation. This algorithm is also performed in one phase.
> > > > Finally, the repartition join is used when all the input relations
> are
> > > > large. The repartition join is performed in two phase. Both relations
> > are
> > > > partitioned by the join key in the first phase and shuffled to
> workers
> > of
> > > > the second phase.
> > > >
> > > > The if condition which you asked checkes whether the join query is
> > > > performed in two phase or not. If it is, the if condition is
> satisfied.
> > > (I
> > > > think that the comment of "// if it is a real table stored on
> storage"
> > is
> > > > wrong and should be fixed to have an opposite meaning.)
> > > >
> > > > The fragment is used in different ways for each join algorithm. In
> the
> > > > immediate join, a fragment represents an HDFS block. Since each input
> > > > relation is sufficiently small, it can be stored as only one HDFS
> > block.
> > > >
> > > > In the broadcast join, the fragment of the smaller relation
> represents
> > > the
> > > > whole smaller relation while each fragment of the larger relation is
> a
> > > > portion of the larger relation.
> > > >
> > > > In the repartition join, the fragments are fake fragments which is
> not
> > > > actually used. In the two-phase join, workers of the second phase
> fetch
> > > the
> > > > intermediate data from the workers of the first phase. Thus, the
> > workers
> > > of
> > > > the second phase require only the fetch urls. However, in the current
> > > Tajo,
> > > > workers require a fake fragment which virtually represents the
> fetches
> > > of a
> > > > relation. This is a legacy implementation and we should simplify In
> the
> > > > future. Actually, we have a plan of representing both of file
> fragments
> > > and
> > > > fetch urls with an unified interface.
> > > >
> > > > This is why only two fragments are scheduled for joins.
> > > >
> > > > I hope that this will be an answer to your question.
> > > > If you are still confused, please fell free to ask.
> > > >
> > > > Thanks,
> > > > Jihoon
> > > >
> > > >
> > > > 2014-02-19 15:40 GMT+09:00 Min Zhou <coderplay@gmail.com>:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I spent more than half a day try to understand the logic
> > > > > of Repartitioner.scheduleFragmentsForJoinQuery(), still be
> confused.
> > > > > Can anyone help me ?
> > > > >
> > > > >
> > > > >  public static void scheduleFragmentsForJoinQuery(SubQuery
> subQuery)
> > > > >       throws IOException {
> > > > >     MasterPlan masterPlan = subQuery.getMasterPlan();
> > > > >     ExecutionBlock execBlock = subQuery.getBlock();
> > > > >     QueryMasterTask.QueryMasterTaskContext masterContext =
> > > > > subQuery.getContext();
> > > > >     AbstractStorageManager storageManager =
> > > subQuery.getStorageManager();
> > > > >
> > > > >     ScanNode[] scans = execBlock.getScanNodes();
> > > > >
> > > > >     Path tablePath;
> > > > >     // DOES TAJO ONLY SUPPORT 2 WAY JOINS?
> > > > >     // WHY THIS ARRAY SIZE IS ONLY 2?
> > > > >     FileFragment[] fragments = new FileFragment[2];
> > > > >     long[] stats = new long[2];
> > > > >
> > > > >     // initialize variables from the child operators
> > > > >     for (int i = 0; i < 2; i++) {
> > > > >       TableDesc tableDesc =
> > > > > masterContext.getTableDescMap().get(scans[i].getCanonicalName());
> > > > >       if (tableDesc == null) { // if it is a real table stored on
> > > storage
> > > > > // WHAT'S THE COMMENT MEAN?
> > > > >         // WHICH KIND OF SQL CAN RUN INTO HERE?
> > > > >         // TODO - to be fixed (wrong directory)
> > > > >         ExecutionBlock [] childBlocks = new ExecutionBlock[2];
> > > > >         childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
> > > > >         childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
> > > > >
> > > > >         tablePath =
> > > storageManager.getTablePath(scans[i].getTableName());
> > > > >         stats[i] =
> > > > >
> > > > >
> > > >
> > >
> >
> masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
> > > > >         fragments[i] = new
> FileFragment(scans[i].getCanonicalName(),
> > > > > tablePath, 0, 0, new String[]{UNKNOWN_HOST});
> > > > >       } else {
> > > > >         tablePath = tableDesc.getPath();
> > > > >         try {
> > > > >           stats[i] =
> GlobalPlanner.computeDescendentVolume(scans[i]);
> > > > >         } catch (PlanningException e) {
> > > > >           throw new IOException(e);
> > > > >         }
> > > > >         fragments[i] =
> > > > > storageManager.getSplits(scans[i].getCanonicalName(),
> > > > tableDesc.getMeta(),
> > > > > tableDesc.getSchema(),
> > > > >             tablePath).get(0);  // WHY JUST RETURN THE FIRST MEMBER
> > OF
> > > > THE
> > > > > SPLITS ARRAY?
> > > > >       }
> > > > >     }
> > > > >
> > > > >     LOG.info(String.format("Left Volume: %d, Right Volume: %d",
> > > stats[0],
> > > > > stats[1]));
> > > > >
> > > > >     // Assigning either fragments or fetch urls to query units
> > > > >     boolean leftSmall =
> > > > > execBlock.isBroadcastTable(scans[0].getCanonicalName());
> > > > >     boolean rightSmall =
> > > > > execBlock.isBroadcastTable(scans[1].getCanonicalName());
> > > > >
> > > > >     if (leftSmall && rightSmall) {
> > > > >       LOG.info("[Distributed Join Strategy] : Immediate Two Way
> Join
> > on
> > > > > Single Machine");
> > > > >       SubQuery.scheduleFragment(subQuery, fragments[0],
> > fragments[1]);
> > > > >     } else if (leftSmall ^ rightSmall) {
> > > > >       int broadcastIdx = leftSmall ? 0 : 1;
> > > > >       int baseScanIdx = leftSmall ? 1 : 0;
> > > > >       LOG.info(String.format("[BRDCAST JOIN] base_table=%s,
> > > > > base_volume=%d",
> > > > >           scans[baseScanIdx].getCanonicalName(),
> > stats[baseScanIdx]));
> > > > >       scheduleLeafTasksWithBroadcastTable(subQuery, baseScanIdx,
> > > > > fragments[broadcastIdx]);
> > > > >     } else {
> > > > >       LOG.info("[Distributed Join Strategy] : Symmetric Repartition
> > > > Join");
> > > > >       // The hash map is modeling as follows:
> > > > >       // <Part Id, <Table Name, Intermediate Data>>
> > > > >       Map<Integer, Map<String, List<IntermediateEntry>>>
> hashEntries
> > =
> > > > new
> > > > > HashMap<Integer, Map<String, List<IntermediateEntry>>>();
> > > > >
> > > > >       // Grouping IntermediateData by a partition key and a table
> > name
> > > > >       for (ScanNode scan : scans) {
> > > > >         SubQuery childSubQuery =
> > > > >
> > > > >
> > > >
> > >
> >
> masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
> > > > >         for (QueryUnit task : childSubQuery.getQueryUnits()) {
> > > > >           if (task.getIntermediateData() != null) {
> > > > >             for (IntermediateEntry intermEntry :
> > > > > task.getIntermediateData()) {
> > > > >               if
> (hashEntries.containsKey(intermEntry.getPartId())) {
> > > > >                 Map<String, List<IntermediateEntry>>
> tbNameToInterm =
> > > > >                     hashEntries.get(intermEntry.getPartId());
> > > > >
> > > > >                 if
> > > (tbNameToInterm.containsKey(scan.getCanonicalName()))
> > > > {
> > > > >
> > > > > tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
> > > > >                 } else {
> > > > >                   tbNameToInterm.put(scan.getCanonicalName(),
> > > > > TUtil.newList(intermEntry));
> > > > >                 }
> > > > >               } else {
> > > > >                 Map<String, List<IntermediateEntry>>
> tbNameToInterm =
> > > > >                     new HashMap<String, List<IntermediateEntry>>();
> > > > >                 tbNameToInterm.put(scan.getCanonicalName(),
> > > > > TUtil.newList(intermEntry));
> > > > >                 hashEntries.put(intermEntry.getPartId(),
> > > tbNameToInterm);
> > > > >               }
> > > > >             }
> > > > >           }
> > > > >         }
> > > > >       }
> > > > >
> > > > >       // hashEntries can be zero if there are no input data.
> > > > >       // In the case, it will cause the zero divided exception.
> > > > >       // it avoids this problem.
> > > > >       int [] avgSize = new int[2];
> > > > >       avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0]
/
> > > > > hashEntries.size());
> > > > >       avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1]
/
> > > > > hashEntries.size());
> > > > >       int bothFetchSize = avgSize[0] + avgSize[1];
> > > > >
> > > > >       // Getting the desire number of join tasks according to the
> > > volumn
> > > > >       // of a larger table
> > > > >       int largerIdx = stats[0] >= stats[1] ? 0 : 1;
> > > > >       int desireJoinTaskVolumn = subQuery.getContext().getConf().
> > > > >           getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
> > > > >
> > > > >       // calculate the number of tasks according to the data size
> > > > >       int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
> > > > >       LOG.info("Larger intermediate data is approximately " + mb
+
> "
> > > > MB");
> > > > >       // determine the number of task per 64MB
> > > > >       int maxTaskNum = (int) Math.ceil((double)mb /
> > > > desireJoinTaskVolumn);
> > > > >       LOG.info("The calculated number of tasks is " + maxTaskNum);
> > > > >       LOG.info("The number of total shuffle keys is " +
> > > > > hashEntries.size());
> > > > >       // the number of join tasks cannot be larger than the number
> of
> > > > >       // distinct partition ids.
> > > > >       int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
> > > > >       LOG.info("The determined number of join tasks is " +
> > > joinTaskNum);
> > > > >
> > > > >       SubQuery.scheduleFragment(subQuery, fragments[0],
> > fragments[1]);
> > > //
> > > > > WHY JUST ONLY 2 FRAGMENT FOR THIS JOIN? JUST 2 HDFS BLOCKS?
> > > > >
> > > > >       // Assign partitions to tasks in a round robin manner.
> > > > >       for (Entry<Integer, Map<String, List<IntermediateEntry>>>
> entry
> > > > >           : hashEntries.entrySet()) {
> > > > >         addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
> > > > >       }
> > > > >
> > > > >     }
> > > > >   }
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Min
> > > > >
> > > > > --
> > > > > My research interests are distributed systems, parallel computing
> and
> > > > > bytecode based virtual machine.
> > > > >
> > > > > My profile:
> > > > > http://www.linkedin.com/in/coderplay
> > > > > My blog:
> > > > > http://coderplay.javaeye.com
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Jihoon Son
> > > >
> > > > Database & Information Systems Group,
> > > > Prof. Yon Dohn Chung Lab.
> > > > Dept. of Computer Science & Engineering,
> > > > Korea University
> > > > 1, 5-ga, Anam-dong, Seongbuk-gu,
> > > > Seoul, 136-713, Republic of Korea
> > > >
> > > > Tel : +82-2-3290-3580
> > > > E-mail : jihoonson@korea.ac.kr
> > > >
> > >
> > >
> > >
> > > --
> > > My research interests are distributed systems, parallel computing and
> > > bytecode based virtual machine.
> > >
> > > My profile:
> > > http://www.linkedin.com/in/coderplay
> > > My blog:
> > > http://coderplay.javaeye.com
> > >
> >
> >
> >
> > --
> > Jihoon Son
> >
> > Database & Information Systems Group,
> > Prof. Yon Dohn Chung Lab.
> > Dept. of Computer Science & Engineering,
> > Korea University
> > 1, 5-ga, Anam-dong, Seongbuk-gu,
> > Seoul, 136-713, Republic of Korea
> >
> > Tel : +82-2-3290-3580
> > E-mail : jihoonson@korea.ac.kr
> >
>
>
>
> --
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
>
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com
>



-- 
Jihoon Son

Database & Information Systems Group,
Prof. Yon Dohn Chung Lab.
Dept. of Computer Science & Engineering,
Korea University
1, 5-ga, Anam-dong, Seongbuk-gu,
Seoul, 136-713, Republic of Korea

Tel : +82-2-3290-3580
E-mail : jihoonson@korea.ac.kr

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message