Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 80F0E200D0C for ; Tue, 5 Sep 2017 19:21:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7FAAF160BD7; Tue, 5 Sep 2017 17:21:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A9969160BCB for ; Tue, 5 Sep 2017 19:21:14 +0200 (CEST) Received: (qmail 76126 invoked by uid 500); 5 Sep 2017 17:21:13 -0000 Mailing-List: contact commits-help@atlas.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.apache.org Delivered-To: mailing list commits@atlas.apache.org Received: (qmail 76071 invoked by uid 99); 5 Sep 2017 17:21:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Sep 2017 17:21:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C15FE112D; Tue, 5 Sep 2017 17:21:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amestry@apache.org To: commits@atlas.apache.org Message-Id: <315a04bf5b5a47a6bca6f73adfb6d190@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: atlas git commit: ATLAS-2101: Update Implementation to Eliminate Use of Stopwatch Date: Tue, 5 Sep 2017 17:21:13 +0000 (UTC) archived-at: Tue, 05 Sep 2017 17:21:15 -0000 Repository: atlas Updated Branches: refs/heads/branch-0.8 5bd4d5f6f -> 7780b520a ATLAS-2101: Update Implementation to Eliminate Use of Stopwatch Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/7780b520 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/7780b520 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/7780b520 Branch: refs/heads/branch-0.8 Commit: 7780b520a5ce8722954a14353fdb66377c69e8b0 Parents: 5bd4d5f Author: ashutoshm Authored: Tue Sep 5 10:20:27 2017 -0700 Committer: ashutoshm Committed: Tue Sep 5 10:20:54 2017 -0700 ---------------------------------------------------------------------- .../database/idassigner/StandardIDPool.java | 259 +++++++++++++++++++ 1 file changed, 259 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/7780b520/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java new file mode 100644 index 0000000..6c7a086 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.thinkaurelius.titan.graphdb.database.idassigner; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.thinkaurelius.titan.core.TitanException; +import com.thinkaurelius.titan.core.attribute.Duration; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.IDAuthority; +import com.thinkaurelius.titan.diskstorage.IDBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ + +public class StandardIDPool implements IDPool { + + private static final Logger log = + LoggerFactory.getLogger(StandardIDPool.class); + + private static final TimeUnit SCHEDULING_TIME_UNIT = + TimeUnit.MILLISECONDS; // TODO + + private static final IDBlock ID_POOL_EXHAUSTION = new IDBlock() { + @Override + public long numIds() { + throw new UnsupportedOperationException(); + } + + @Override + public long getId(long index) { + throw new UnsupportedOperationException(); + } + }; + + private static final IDBlock UNINITIALIZED_BLOCK = new IDBlock() { + @Override + public long numIds() { + return 0; + } + + @Override + public long getId(long index) { + throw new ArrayIndexOutOfBoundsException(0); + } + }; + + private static final int RENEW_ID_COUNT = 100; + + private final IDAuthority idAuthority; + private final long idUpperBound; //exclusive + private final int partition; + private final int idNamespace; + + private final Duration renewTimeout; + private final double renewBufferPercentage; + + private IDBlock currentBlock; + private long currentIndex; + private long renewBlockIndex; +// private long nextID; +// private long currentMaxID; +// private long renewBufferID; + + private volatile IDBlock nextBlock; + private Future idBlockFuture; + private final ThreadPoolExecutor exec; + + private volatile boolean initialized; + private volatile boolean closed; + + public StandardIDPool(IDAuthority idAuthority, int partition, int idNamespace, long idUpperBound, Duration renewTimeout, double renewBufferPercentage) { + Preconditions.checkArgument(idUpperBound > 0); + this.idAuthority = idAuthority; + Preconditions.checkArgument(partition>=0); + this.partition = partition; + Preconditions.checkArgument(idNamespace>=0); + this.idNamespace = idNamespace; + this.idUpperBound = idUpperBound; + Preconditions.checkArgument(!renewTimeout.isZeroLength(), "Renew-timeout must be positive"); + this.renewTimeout = renewTimeout; + Preconditions.checkArgument(renewBufferPercentage>0.0 && renewBufferPercentage<=1.0,"Renew-buffer percentage must be in (0.0,1.0]"); + this.renewBufferPercentage = renewBufferPercentage; + + currentBlock = UNINITIALIZED_BLOCK; + currentIndex = 0; + renewBlockIndex = 0; + + nextBlock = null; + + // daemon=true would probably be fine too + exec = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), new ThreadFactoryBuilder() + .setDaemon(false) + .setNameFormat("TitanID(" + partition + ")("+idNamespace+")[%d]") + .build()); + //exec.allowCoreThreadTimeOut(false); + //exec.prestartCoreThread(); + idBlockFuture = null; + + initialized = false; + closed = false; + } + + private void waitForIDRenewer() throws InterruptedException { + + long start = swStart(); + if (null != idBlockFuture) { + try { + idBlockFuture.get(renewTimeout.getLength(SCHEDULING_TIME_UNIT), SCHEDULING_TIME_UNIT); + } catch (ExecutionException e) { + String msg = String.format("ID block allocation on partition(%d)-namespace(%d) failed with an exception in %s", + partition, idNamespace, swStop(start)); + throw new TitanException(msg, e); + } catch (TimeoutException e) { + // Attempt to cancel the renewer + idBlockFuture.cancel(true); + String msg = String.format("ID block allocation on partition(%d)-namespace(%d) timed out in %s", + partition, idNamespace, swStop(start)); + throw new TitanException(msg, e); + } catch (CancellationException e) { + String msg = String.format("ID block allocation on partition(%d)-namespace(%d) was cancelled after %s", + partition, idNamespace, swStop(start)); + throw new TitanException(msg, e); + } finally { + idBlockFuture = null; + } + // Allow InterruptedException to propagate up the stack + } + } + + private long swStop(long start) { + return swStart() - start; + } + + private synchronized void nextBlock() throws InterruptedException { + assert currentIndex == currentBlock.numIds(); + Preconditions.checkState(!closed,"ID Pool has been closed for partition(%s)-namespace(%s) - cannot apply for new id block", + partition,idNamespace); + + waitForIDRenewer(); + if (nextBlock == ID_POOL_EXHAUSTION) + throw new IDPoolExhaustedException("Exhausted ID Pool for partition(" + partition+")-namespace("+idNamespace+")"); + + Preconditions.checkArgument(nextBlock!=null); + + currentBlock = nextBlock; + currentIndex = 0; + + log.debug("ID partition({})-namespace({}) acquired block: [{}]", partition, idNamespace, currentBlock); + + assert currentBlock.numIds()>0; + + nextBlock = null; + + assert RENEW_ID_COUNT>0; + renewBlockIndex = Math.max(0,currentBlock.numIds()-Math.max(RENEW_ID_COUNT, Math.round(currentBlock.numIds()*renewBufferPercentage))); + assert renewBlockIndex=currentIndex; + } + + private void renewBuffer() { + Preconditions.checkArgument(nextBlock == null, nextBlock); + try { + long start = swStart(); + IDBlock idBlock = idAuthority.getIDBlock(partition, idNamespace, renewTimeout); + log.debug("Retrieved ID block from authority on partition({})-namespace({}) in {}", partition, idNamespace, swStop(start)); + Preconditions.checkArgument(idBlock!=null && idBlock.numIds()>0); + nextBlock = idBlock; + } catch (BackendException e) { + throw new TitanException("Could not acquire new ID block from storage", e); + } catch (IDPoolExhaustedException e) { + nextBlock = ID_POOL_EXHAUSTION; + } + } + + private long swStart() { + return System.currentTimeMillis(); + } + + @Override + public synchronized long nextID() { + assert currentIndex <= currentBlock.numIds(); + if (!initialized) { + startNextIDAcquisition(); + initialized = true; + } + + if (currentIndex == currentBlock.numIds()) { + try { + nextBlock(); + } catch (InterruptedException e) { + throw new TitanException("Could not renew id block due to interruption", e); + } + } + + if (currentIndex == renewBlockIndex) { + startNextIDAcquisition(); + } + long returnId = currentBlock.getId(currentIndex); + currentIndex++; + if (returnId >= idUpperBound) throw new IDPoolExhaustedException("Reached id upper bound of " + idUpperBound); + log.trace("partition({})-namespace({}) Returned id: {}", partition, idNamespace, returnId); + return returnId; + } + + @Override + public synchronized void close() { + closed=true; + //Wait for renewer to finish -- call exec.shutdownNow() instead? + try { + waitForIDRenewer(); + } catch (InterruptedException e) { + throw new TitanException("Interrupted while waiting for id renewer thread to finish", e); + } + exec.shutdownNow(); + } + + private void startNextIDAcquisition() { + Preconditions.checkArgument(idBlockFuture == null, idBlockFuture); + if (closed) return; //Don't renew anymore if closed + //Renew buffer + log.debug("Starting id block renewal thread upon {}", currentIndex); + idBlockFuture = exec.submit(new IDBlockRunnable()); + } + + private class IDBlockRunnable implements Runnable { + @Override + public void run() { + renewBuffer(); + } + } +}