[ https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15064359#comment-15064359
]
ASF GitHub Bot commented on DRILL-4134:
---------------------------------------
Github user adeneche commented on a diff in the pull request:
https://github.com/apache/drill/pull/283#discussion_r48051860
--- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Provides a concurrent way to manage account for memory usage without locking. Used
as basis for Allocators. All
+ * operations are threadsafe (except for close).
+ */
+@ThreadSafe
+class Accountant implements AutoCloseable {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class);
+
+ /**
+ * The parent allocator
+ */
+ protected final Accountant parent;
+
+ /**
+ * The amount of memory reserved for this allocator. Releases below this amount of
memory will not be returned to the
+ * parent Accountant until this Accountant is closed.
+ */
+ protected final long reservation;
+
+ private final AtomicLong peakAllocation = new AtomicLong();
+
+ /**
+ * Maximum local memory that can be held. This can be externally updated. Changing
it won't cause past memory to
+ * change but will change responses to future allocation efforts
+ */
+ private final AtomicLong allocationLimit = new AtomicLong();
+
+ /**
+ * Currently allocated amount of memory;
+ */
+ private final AtomicLong locallyHeldMemory = new AtomicLong();
+
+ public Accountant(Accountant parent, long reservation, long maxAllocation) {
+ Preconditions.checkArgument(reservation >= 0, "The initial reservation size must
be non-negative.");
+ Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit
must be non-negative.");
+ Preconditions.checkArgument(reservation <= maxAllocation,
+ "The initial reservation size must be <= the maximum allocation.");
+ Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant
can't reserve memory.");
+
+ this.parent = parent;
+ this.reservation = reservation;
+ this.allocationLimit.set(maxAllocation);
+
+ if (reservation != 0) {
+ // we will allocate a reservation from our parent.
+ final AllocationOutcome outcome = parent.allocateBytes(reservation);
+ if (!outcome.isOk()) {
+ throw new OutOfMemoryException(String.format(
+ "Failure trying to allocate initial reservation for Allocator. "
+ + "Attempted to allocate %d bytes and received an outcome of %s.", reservation,
outcome.name()));
+ }
+ }
+ }
+
+ /**
+ * Attempt to allocate the requested amount of memory. Either completely succeeds or
completely fails. Constructs a a
+ * log of delta
+ *
+ * If it fails, no changes are made to accounting.
+ *
+ * @param size
+ * The amount of memory to reserve in bytes.
+ * @return True if the allocation was successful, false if the allocation failed.
+ */
+ AllocationOutcome allocateBytes(long size) {
+ final AllocationOutcome outcome = allocate(size, true, false);
+ if (!outcome.isOk()) {
+ releaseBytes(size);
+ }
+ return outcome;
+ }
+
+ private void updatePeak() {
+ final long currentMemory = locallyHeldMemory.get();
+
+ long previousPeak;
+ do {
+ previousPeak = peakAllocation.get();
+ if (peakAllocation.compareAndSet(previousPeak, currentMemory)) {
--- End diff --
Just got the results of Vicky's tests, we are no longer seeing the concurrency issue.
I do need more time to go through the PR, I want to make sure everything's good, especially
that more patches will be coming that depend on this PR.
> Incorporate remaining patches from DRILL-1942 Allocator refactor
> ----------------------------------------------------------------
>
> Key: DRILL-4134
> URL: https://issues.apache.org/jira/browse/DRILL-4134
> Project: Apache Drill
> Issue Type: Sub-task
> Components: Execution - Flow
> Reporter: Jacques Nadeau
> Assignee: Jacques Nadeau
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
|