activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From michaelandrepearce <...@git.apache.org>
Subject [GitHub] activemq-artemis pull request #2490: V2 196
Date Wed, 09 Jan 2019 19:51:48 GMT
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516870
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection,
as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted
it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path
performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection
are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created
on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything
that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T>
{
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator>
changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class,
ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    --- End diff --
    
    Like any iterator and iterator should only be interacted by one thread at a time.


---

Mime
View raw message