Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-7350 More accurate memory estimates for admission
  3. IMPALA-7352

HdfsTableSink doesn't take into account insert clustering

    XMLWordPrintableJSON

Details

    Description

      I noticed that the code doesn't check whether the insert is clustered, which would mean it only produces a single partition at a time.

        @Override
        public void computeResourceProfile(TQueryOptions queryOptions) {
          HdfsTable table = (HdfsTable) targetTable_;
          // TODO: Estimate the memory requirements more accurately by partition type.
          HdfsFileFormat format = table.getMajorityFormat();
          PlanNode inputNode = fragment_.getPlanRoot();
          int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
          // Compute the per-instance number of partitions, taking the number of nodes
          // and the data partition of the fragment executing this sink into account.
          long numPartitionsPerInstance =
              fragment_.getPerInstanceNdv(queryOptions.getMt_dop(), partitionKeyExprs_);
          if (numPartitionsPerInstance == -1) {
            numPartitionsPerInstance = DEFAULT_NUM_PARTITIONS;
          }
          long perPartitionMemReq = getPerPartitionMemReq(format);
      
          long perInstanceMemEstimate;
          // The estimate is based purely on the per-partition mem req if the input cardinality_
          // or the avg row size is unknown.
          if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
            perInstanceMemEstimate = numPartitionsPerInstance * perPartitionMemReq;
          } else {
            // The per-partition estimate may be higher than the memory required to buffer
            // the entire input data.
            long perInstanceInputCardinality =
                Math.max(1L, inputNode.getCardinality() / numInstances);
            long perInstanceInputBytes =
                (long) Math.ceil(perInstanceInputCardinality * inputNode.getAvgRowSize());
            long perInstanceMemReq =
                PlanNode.checkedMultiply(numPartitionsPerInstance, perPartitionMemReq);
            perInstanceMemEstimate = Math.min(perInstanceInputBytes, perInstanceMemReq);
          }
          resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate);
        }
      
      

      Attachments

        Activity

          People

            poojanilangekar Pooja Nilangekar
            tarmstrong Tim Armstrong
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: