phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JamesRTaylor <...@git.apache.org>
Subject [GitHub] phoenix pull request #308: Client-side hash aggregation
Date Mon, 16 Jul 2018 22:18:19 GMT
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r202842452
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators
aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    +            sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +    public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +    
    +    @Override
    +    public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +    public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +    public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator=" 
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable
ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private void populateHash() throws SQLException {
    --- End diff --
    
    @geraldss - memory management is tracked by the GlobalMemoryManager. Operations that potentially
use memory allocate (and eventually free) a set of MemoryChunk instances. You can see an example
of this in GroupedAggregateRegionObserver (the runtime code for aggregation). If the memory
used goes over a threshold (phoenix.query.maxGlobalMemoryPercentage and phoenix.query.maxTenantMemoryPercentage
as the allowed percentage of Java heap across all queries that is allowed to be used), then
the query will fail. Most typically, this mechanism is used on the server side as we don't
typically use a lot of memory on the client-side (as we're mostly doing merge joins). One
example where we use this on the client side is for our broadcast join implementation (see
HashCacheClient) to track memory held onto for Hash Join caches.
    
    Classes you may want to look at (or perhaps you already have?): OrderedResultIterator
and MappedByteBufferSortedQueue. Above a certain configurable threshold (phoenix.query.spoolThresholdBytes
defaults to 20MB), we output results into memory mapped files. Have you tried decreasing that
threshold?
     
    Couple of JIRAs you may want to take a look at: PHOENIX-2405 (unclear if this is still
an issue) and PHOENIX-3289. Are you running into issues with too many memory mapped files?


---

Mime
View raw message