trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dbirds...@apache.org
Subject [1/4] incubator-trafodion git commit: [TRAFODION-25] Temporary code for UPDATE costing
Date Tue, 04 Apr 2017 15:30:42 GMT
Repository: incubator-trafodion
Updated Branches:
  refs/heads/master 2e1e3ffa7 -> 1926da9c3


[TRAFODION-25] Temporary code for UPDATE costing


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/55e65d2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/55e65d2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/55e65d2b

Branch: refs/heads/master
Commit: 55e65d2b2a65153c6d5ef58d994980795560f72f
Parents: 1bd6c78
Author: Dave Birdsall <dbirdsall@apache.org>
Authored: Mon Apr 3 20:15:46 2017 +0000
Committer: Dave Birdsall <dbirdsall@apache.org>
Committed: Mon Apr 3 20:15:46 2017 +0000

----------------------------------------------------------------------
 core/sql/optimizer/ScmCostMethod.cpp | 332 +++++++++++++++++++++++++++++-
 1 file changed, 330 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/55e65d2b/core/sql/optimizer/ScmCostMethod.cpp
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/ScmCostMethod.cpp b/core/sql/optimizer/ScmCostMethod.cpp
index 16a0d87..c7a82c0 100644
--- a/core/sql/optimizer/ScmCostMethod.cpp
+++ b/core/sql/optimizer/ScmCostMethod.cpp
@@ -3845,8 +3845,336 @@ CostMethodHbaseUpdate::scmComputeOperatorCostInternal(RelExpr* op,
   const PlanWorkSpace* pws,
   Lng32& countOfStreams)
 {
-  // TODO: Write this method; the line below is a stub
-  return CostMethod::scmComputeOperatorCostInternal(op,pws,countOfStreams);
+  // TODO: Write this method; the code below is a copy of the Delete
+  // method which we'll use for the moment. This is better than just
+  // a simple constant stub; we will get parallel Update plans with
+  // this code, for example, that we won't get with constant cost.
+
+  // The theory of operation of Update is somewhat different (since it
+  // might, for example, do a Delete + an Insert, or might do an Hbase
+  // Update -- is that decided before we get here?), so this code will
+  // underestimate the cost in general.
+
+  const Context * myContext = pws->getContext();
+
+  cacheParameters(op,myContext);
+  estimateDegreeOfParallelism();
+
+  const InputPhysicalProperty* ippForMe =
+    myContext->getInputPhysicalProperty();
+
+  // -----------------------------------------
+  // Save off estimated degree of parallelism.
+  // -----------------------------------------
+  countOfStreams = countOfStreams_;
+
+  HbaseDelete* delOp = (HbaseDelete *)op;   // downcast
+
+  CMPASSERT(partFunc_ != NULL);
+
+  //  Later, if and when we start using NodeMaps to track active regions for 
+  //  Trafodion tables in HBase (or native HBase tables), we can use the
+  //  following to get active partitions.
+  //CostScalar activePartitions =
+  // (CostScalar)
+  //   (((NodeMap *)(partFunc_->getNodeMap()))->getNumActivePartitions());
+  //  But for now, we do the following:
+  CostScalar activePartitions = (CostScalar)(partFunc_->getCountOfPartitions());
+
+  const IndexDesc* CIDesc = delOp->getIndexDesc();
+  const CostScalar & recordSizeInKb = CIDesc->getRecordSizeInKb();
+
+  CostScalar tuplesProcessed(csZero);
+  CostScalar tuplesProduced(csZero);
+  CostScalar tuplesSent(csZero);  // we use tuplesSent to model sending rowIDs to Hbase 
+  CostScalar randomIOs(csZero);
+  CostScalar sequentialIOs(csZero);
+
+  CostScalar countOfAsynchronousStreams = activePartitions;
+
+  // figure out if the probes are in order - if they are, then when
+  // scanning, I/O will tend to be sequential
+
+  NABoolean probesInOrder = FALSE;
+  if (ippForMe != NULL)  // input physical properties exist?
+  {
+    // See if the probes are in order.
+
+    // For delete, a partial order is ok.
+    NABoolean partiallyInOrderOK = TRUE;
+    NABoolean probesForceSynchronousAccess = FALSE;
+    ValueIdList targetSortKey = CIDesc->getOrderOfKeyValues();
+    ValueIdSet sourceCharInputs =
+      delOp->getGroupAttr()->getCharacteristicInputs();
+
+    ValueIdSet targetCharInputs;
+    // The char inputs are still in terms of the source. Map them to the target.
+    // Note: The source char outputs in the ipp have already been mapped to
+    // the target. CharOutputs are a set, meaning they do not have duplicates
+    // But we could have cases where two columns of the target are matched to the
+    // same source column, example: Sol: 10-040416-5166, where we have
+    // INSERT INTO b6table1
+    //		  ( SELECT f, h_to_f, f, 8.4
+    //            FROM btre211
+    //            );
+    // Hence we use lists here instead of sets.
+    // Check to see if there are any duplicates in the source Characteristics inputs
+    // if no, we shall perform set operations, as these are faster
+    ValueIdList bottomValues = delOp->updateToSelectMap().getBottomValues();
+    ValueIdSet bottomValuesSet(bottomValues);
+    NABoolean useListInsteadOfSet = FALSE;
+
+    CascadesGroup* group1 = (*CURRSTMT_OPTGLOBALS->memo)[delOp->getGroupId()];
+
+    GenericUpdate* upOperator = (GenericUpdate *) group1->getFirstLogExpr();
+
+    if (((upOperator->getTableName().getSpecialType() == ExtendedQualName::NORMAL_TABLE
) || (upOperator->getTableName().getSpecialType() == ExtendedQualName::GHOST_TABLE )) &&
+     (bottomValuesSet.entries() != bottomValues.entries() ) )
+    {
+
+      ValueIdList targetInputList;
+      // from here get all the bottom values that appear in the sourceCharInputs
+      bottomValues.findCommonElements(sourceCharInputs );
+      bottomValuesSet = bottomValues;
+
+      // we can use the bottomValues only if these contain some duplicate columns of
+      // characteristics inputs, otherwise we shall use the characteristics inputs.
+      if (bottomValuesSet == sourceCharInputs)
+      {
+        useListInsteadOfSet = TRUE;
+	delOp->updateToSelectMap().rewriteValueIdListUpWithIndex(
+	  targetInputList,
+	  bottomValues);
+	targetCharInputs = targetInputList;
+      }
+    }
+
+    if (!useListInsteadOfSet)
+    {
+      delOp->updateToSelectMap().rewriteValueIdSetUp(
+	targetCharInputs,
+	sourceCharInputs);
+    }
+
+    // If a target key column is covered by a constant on the source side,
+    // then we need to remove that column from the target sort key
+    removeConstantsFromTargetSortKey(&targetSortKey,
+                                   &(delOp->updateToSelectMap()));
+    NABoolean orderedNJ = TRUE;
+    // Don't call ordersMatch if njOuterOrder_ is null.
+    if (ippForMe->getAssumeSortedForCosting())
+      orderedNJ = FALSE;
+    else
+      // if leading keys are not same then don't try ordered NJ.
+      orderedNJ =
+        isOrderedNJFeasible(*(ippForMe->getNjOuterOrder()), targetSortKey);
+
+    if (orderedNJ AND 
+        ordersMatch(ippForMe,
+                    CIDesc,
+                    &targetSortKey,
+                    targetCharInputs,
+                    partiallyInOrderOK,
+                    probesForceSynchronousAccess))
+    {
+      probesInOrder = TRUE;
+      if (probesForceSynchronousAccess)
+      {
+        // The probes form a complete order across all partitions and
+        // the clustering key and partitioning key are the same. So, the
+        // only asynchronous I/O we will see will be due to ESPs. So,
+        // limit the count of streams in DP2 by the count of streams in ESP.
+
+        // Get the logPhysPartitioningFunction, which we will use
+        // to get the logical partitioning function. If it's NULL,
+        // it means the table was not partitioned at all, so we don't
+        // need to limit anything since there already is no asynch I/O.
+
+     // TODO: lppf is always null in Trafodion; figure out what to do instead...
+        const LogPhysPartitioningFunction* lppf =
+            partFunc_->castToLogPhysPartitioningFunction();
+        if (lppf != NULL)
+        {
+          PartitioningFunction* logPartFunc =
+            lppf->getLogPartitioningFunction();
+          // Get the number of ESPs:
+          CostScalar numParts = logPartFunc->getCountOfPartitions();
+
+          countOfAsynchronousStreams = MINOF(numParts,
+                                             countOfAsynchronousStreams);
+        } // lppf != NULL
+      } // probesForceSynchronousAccess
+    } // probes are in order
+  } // if input physical properties exist
+
+  CostScalar currentCpus = 
+    (CostScalar)myContext->getPlan()->getPhysicalProperty()->getCurrentCountOfCPUs();
+  CostScalar activeCpus = MINOF(countOfAsynchronousStreams, currentCpus);
+  CostScalar streamsPerCpu =
+    (countOfAsynchronousStreams / activeCpus).getCeiling();
+
+
+  CostScalar noOfProbesPerPartition(csOne);
+
+  CostScalar numRowsToDelete(csOne);
+  CostScalar numRowsToScan(csOne);
+
+  CostScalar commonComputation;
+
+  // Determine # of rows to scan and to delete
+
+  if (delOp->getSearchKey() && delOp->getSearchKey()->isUnique() &&

+    (noOfProbes_ == 1))
+  {
+    // unique access
+
+    activePartitions = csOne;
+    countOfAsynchronousStreams = csOne;
+    activeCpus = csOne;
+    streamsPerCpu = csOne;
+    numRowsToScan = csOne;
+    // assume the 1 row always satisfies any executor predicates so
+    // we'll always do the Delete
+    numRowsToDelete = csOne;
+  }
+  else
+  {
+    // non-unique access
+
+    numRowsToDelete =
+      ((myRowCount_ / activePartitions).getCeiling()).minCsOne();
+    noOfProbesPerPartition =
+      ((noOfProbes_ / countOfAsynchronousStreams).getCeiling()).minCsOne();
+
+    // need to compute the number of rows that satisfy the key predicates
+    // to compute the I/Os that must be performed
+
+    // need to create a new histogram, since the one from input logical
+    // prop. has the histogram for the table after all executor preds are
+    // applied (i.e. the result cardinality)
+    IndexDescHistograms histograms(*CIDesc,CIDesc->getIndexKey().entries());
+
+    // retrieve all of the key preds in key column order
+    ColumnOrderList keyPredsByCol(CIDesc->getIndexKey());
+    delOp->getSearchKey()->getKeyPredicatesByColumn(keyPredsByCol);
+
+    if ( NOT allKeyColumnsHaveHistogramStatistics( histograms, CIDesc ) )
+    {
+      // All key columns do not have histogram data, the best we can
+      // do is use the number of rows that satisfy all predicates
+      // (i.e. the number of rows we will be updating)
+      numRowsToScan = numRowsToDelete;
+    }
+    else
+    {
+      numRowsToScan = numRowsToScanWhenAllKeyColumnsHaveHistograms(
+	histograms,
+	keyPredsByCol,
+	activePartitions,
+	CIDesc
+	);
+      if (numRowsToScan < numRowsToDelete) // sanity test
+      {
+        // we will scan at least as many rows as we delete
+        numRowsToScan = numRowsToDelete;
+      }
+    }
+  }
+
+  // Notes: At execution time, several different TCBs can be created
+  // for a delete. We can class them three ways: Unique, Subset, and
+  // Rowset. Representative examples of the three classes are:
+  //
+  //   ExHbaseUMDtrafUniqueTaskTcb
+  //   ExHbaseUMDtrafSubsetTaskTcb
+  //   ExHbaseAccessSQRowsetTcb
+  //
+  // The theory of operation of each of these differs somewhat. 
+  //
+  // For the Unique variant, we use an HBase "get" to obtain a row, apply
+  // a predicate to it, then do an HBase "delete" to delete it if the
+  // predicate is true. (If there is no predicate, we'll simply do a
+  // "checkAndDelete" so there would be no "get" cost.) 
+  //
+  // For the Subset variant, we use an HBase "scan" to obtain a sequence
+  // of rows, apply a predicate to each, then do an HBase "delete" on
+  // each row that passes the predicate.
+  //
+  // For the Rowset variant, we simply pass all the input keys to 
+  // HBase in batches in HBase "deleteRows" calls. (In Explain plans,
+  // this TCB shows up as "trafodion_delete_vsbb", while the first two
+  // show up as "trafodion_delete".) There is no "get" cost. In plans
+  // with this TCB, there is a separate Scan TCB to obtain the keys,
+  // which then flow to this Rowset TCB via a tuple flow or nested join.
+  // (Such a separate Scan might exist with the first two TCBs also,
+  // e.g., when an index is used to decide which rows to delete.)
+  // The messaging cost to HBase is also reduced since multiple delete
+  // keys are sent per HBase interaction.
+  //
+  // Unfortunately the decisions as to which TCB will be used are
+  // currently made in the generator code and so aren't easily 
+  // available to us here. For the moment then, we make no attempt 
+  // to distinguish a separate "get" cost, nor do we take into account
+  // possible reduced message cost in the Rowset case. Should this
+  // choice be refactored in the future to push it into the Optimizer,
+  // then we can do a better job here. We did attempt to distinguish
+  // the unique case here from the others, but even there our criteria
+  // are not quite the same as in the generator. So at best, this attempt
+  // simply sharpens the cost estimate in this one particular case.
+
+
+  // Compute the I/O cost
+
+  computeIOCostsForCursorOperation(
+    randomIOs /* out */,
+    sequentialIOs /* out */,
+    CIDesc,
+    numRowsToScan,
+    probesInOrder
+    );
+
+  // Compute the tuple cost
+
+  tuplesProduced = numRowsToDelete;
+  tuplesProcessed = numRowsToScan; 
+  tuplesSent = numRowsToDelete;
+
+  CostScalar rowSize = delOp->getIndexDesc()->getRecordLength();
+  CostScalar rowSizeFactor = scmRowSizeFactor(rowSize); 
+  CostScalar outputRowSize = delOp->getGroupAttr()->getRecordLength();
+  CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
+
+  tuplesProcessed *= rowSizeFactor;
+  tuplesSent *= rowSizeFactor;
+  tuplesProduced *= outputRowSizeFactor;
+
+
+  // ---------------------------------------------------------------------
+  // Synthesize and return cost object.
+  // ---------------------------------------------------------------------
+
+  CostScalar probeRowSize = delOp->getIndexDesc()->getKeyLength();
+  Cost * updateCost = 
+    scmCost(tuplesProcessed, tuplesProduced, tuplesSent, randomIOs, sequentialIOs, noOfProbes_,
+	    rowSize, csZero, outputRowSize, probeRowSize);
+
+#ifndef NDEBUG
+if ( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON )
+    {
+      pfp = stdout;
+      fprintf(pfp, "HbaseDelete::scmComputeOperatorCostInternal()\n");
+      updateCost->getScmCplr().print(pfp);
+      fprintf(pfp, "HBase Delete elapsed time: ");
+      fprintf(pfp,"%f", updateCost->
+              convertToElapsedTime(
+                   myContext->getReqdPhysicalProperty()).
+              value());
+      fprintf(pfp,"\n");
+      fprintf(pfp,"CountOfStreams returned %d\n",countOfStreams);
+    }
+#endif
+
+  return updateCost;
 }
 
 // ----QUICKSEARCH FOR HbaseDelete........................................


Mime
View raw message