incubator-blur-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Williams <william...@gmail.com>
Subject Re: [3/3] git commit: Adding a new feature to allow for shards to pull directly from a queue like interface.
Date Wed, 26 Feb 2014 13:02:33 GMT
On Sat, Feb 22, 2014 at 9:47 PM,  <amccurry@apache.org> wrote:
> Adding a new feature to allow for shards to pull directly from a queue like interface.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/052c131e
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/052c131e
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/052c131e
>
> Branch: refs/heads/apache-blur-0.2
> Commit: 052c131e92e0caefb0c513fe52098ad6c6e04d3a
> Parents: 31f23a3
> Author: Aaron McCurry <amccurry@gmail.com>
> Authored: Sat Feb 22 21:47:25 2014 -0500
> Committer: Aaron McCurry <amccurry@gmail.com>
> Committed: Sat Feb 22 21:47:25 2014 -0500
>
> ----------------------------------------------------------------------
>  .../org/apache/blur/manager/IndexManager.java   |  49 +-----
>  .../manager/writer/BlurIndexSimpleWriter.java   |   5 +-
>  .../blur/manager/writer/MutatableAction.java    |  53 +++++++
>  .../apache/blur/manager/writer/QueueReader.java | 125 +++++++++++++++
>  .../org/apache/blur/server/TableContext.java    |  33 ++++
>  .../writer/BlurIndexSimpleWriterTest.java       |   3 -
>  .../writer/QueueReaderBasicInMemory.java        |  49 ++++++
>  .../blur/manager/writer/QueueReaderTest.java    | 158 +++++++++++++++++++
>  .../org/apache/blur/utils/BlurConstants.java    |   4 +
>  9 files changed, 429 insertions(+), 50 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> index 093cd81..c8822d0 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> @@ -460,7 +460,8 @@ public class IndexManager {
>        }
>        String rowId = blurQuery.getRowId();
>        if (rowId != null) {
> -        // reduce the index selection down to the only one that would contain the row.
> +        // reduce the index selection down to the only one that would contain
> +        // the row.
>          Map<String, BlurIndex> map = new HashMap<String, BlurIndex>();
>          String shard = MutationHelper.getShardName(table, rowId, getNumberOfShards(table),
_blurPartitioner);
>          BlurIndex index = getBlurIndex(table, shard);
> @@ -1207,25 +1208,7 @@ public class IndexManager {
>        }
>        ShardContext shardContext = blurIndex.getShardContext();
>        final MutatableAction mutatableAction = new MutatableAction(shardContext);
> -      for (int i = 0; i < mutations.size(); i++) {
> -        RowMutation mutation = mutations.get(i);
> -        RowMutationType type = mutation.rowMutationType;
> -        switch (type) {
> -        case REPLACE_ROW:
> -          Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
> -          mutatableAction.replaceRow(row);
> -          break;
> -        case UPDATE_ROW:
> -          doUpdateRowMutation(mutation, mutatableAction);
> -          break;
> -        case DELETE_ROW:
> -          mutatableAction.deleteRow(mutation.rowId);
> -          break;
> -        default:
> -          throw new RuntimeException("Not supported [" + type + "]");
> -        }
> -      }
> -
> +      mutatableAction.mutate(mutations);
>        return _mutateExecutor.submit(new Callable<Void>() {
>          @Override
>          public Void call() throws Exception {
> @@ -1253,32 +1236,6 @@ public class IndexManager {
>      return map;
>    }
>
> -  private void doUpdateRowMutation(RowMutation mutation, MutatableAction mutatableAction)
throws BlurException,
> -      IOException {
> -    String rowId = mutation.getRowId();
> -
> -    for (RecordMutation recordMutation : mutation.getRecordMutations()) {
> -      RecordMutationType type = recordMutation.recordMutationType;
> -      Record record = recordMutation.getRecord();
> -      switch (type) {
> -      case DELETE_ENTIRE_RECORD:
> -        mutatableAction.deleteRecord(rowId, record.getRecordId());
> -        break;
> -      case APPEND_COLUMN_VALUES:
> -        mutatableAction.appendColumns(rowId, record);
> -        break;
> -      case REPLACE_ENTIRE_RECORD:
> -        mutatableAction.replaceRecord(rowId, record);
> -        break;
> -      case REPLACE_COLUMNS:
> -        mutatableAction.replaceColumns(rowId, record);
> -        break;
> -      default:
> -        throw new RuntimeException("Unsupported record mutation type [" + type + "]");
> -      }
> -    }
> -  }
> -
>    private int getNumberOfShards(String table) {
>      return getTableContext(table).getDescriptor().getShardCount();
>    }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> index f9dc1c8..306479b 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> @@ -67,6 +67,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>    private final AtomicReference<BlurIndexWriter> _writer = new AtomicReference<BlurIndexWriter>();
>    private final boolean _makeReaderExitable = true;
>    private IndexImporter _indexImporter;
> +  private QueueReader _queueReader;
>    private final ReadWriteLock _lock = new ReentrantReadWriteLock();
>    private final Lock _writeLock = _lock.writeLock();
>    private final ReadWriteLock _indexRefreshLock = new ReentrantReadWriteLock();
> @@ -134,6 +135,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>
>    private Thread getWriterOpener(ShardContext shardContext) {
>      Thread thread = new Thread(new Runnable() {
> +
>        @Override
>        public void run() {
>          try {
> @@ -142,6 +144,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>              _writer.notify();
>            }
>            _indexImporter = new IndexImporter(BlurIndexSimpleWriter.this, _shardContext,
TimeUnit.SECONDS, 10);
> +          _queueReader = _tableContext.getQueueReader(BlurIndexSimpleWriter.this, _shardContext);
>          } catch (IOException e) {
>            LOG.error("Unknown error on index writer open.", e);
>          }
> @@ -206,7 +209,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>    @Override
>    public void close() throws IOException {
>      _isClosed.set(true);
> -    IOUtils.cleanup(LOG, _indexImporter, _writer.get(), _indexReader.get());
> +    IOUtils.cleanup(LOG, _indexImporter, _queueReader, _writer.get(), _indexReader.get());
>    }
>
>    @Override
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
> index c8f0dfd..2a7159b 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
> @@ -33,11 +33,16 @@ import org.apache.blur.manager.IndexManager;
>  import org.apache.blur.server.IndexSearcherClosable;
>  import org.apache.blur.server.ShardContext;
>  import org.apache.blur.server.TableContext;
> +import org.apache.blur.thrift.MutationHelper;
>  import org.apache.blur.thrift.generated.Column;
>  import org.apache.blur.thrift.generated.FetchResult;
>  import org.apache.blur.thrift.generated.FetchRowResult;
>  import org.apache.blur.thrift.generated.Record;
> +import org.apache.blur.thrift.generated.RecordMutation;
> +import org.apache.blur.thrift.generated.RecordMutationType;
>  import org.apache.blur.thrift.generated.Row;
> +import org.apache.blur.thrift.generated.RowMutation;
> +import org.apache.blur.thrift.generated.RowMutationType;
>  import org.apache.blur.thrift.generated.Selector;
>  import org.apache.blur.utils.BlurConstants;
>  import org.apache.blur.utils.RowDocumentUtil;
> @@ -376,4 +381,52 @@ public class MutatableAction extends IndexAction {
>
>    }
>
> +  public void mutate(RowMutation mutation) {
> +    RowMutationType type = mutation.rowMutationType;
> +    switch (type) {
> +    case REPLACE_ROW:
> +      Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
> +      replaceRow(row);
> +      break;
> +    case UPDATE_ROW:
> +      doUpdateRowMutation(mutation, this);
> +      break;
> +    case DELETE_ROW:
> +      deleteRow(mutation.rowId);
> +      break;
> +    default:
> +      throw new RuntimeException("Not supported [" + type + "]");
> +    }
> +  }
> +
> +  private void doUpdateRowMutation(RowMutation mutation, MutatableAction mutatableAction)
{
> +    String rowId = mutation.getRowId();
> +    for (RecordMutation recordMutation : mutation.getRecordMutations()) {
> +      RecordMutationType type = recordMutation.recordMutationType;
> +      Record record = recordMutation.getRecord();
> +      switch (type) {
> +      case DELETE_ENTIRE_RECORD:
> +        mutatableAction.deleteRecord(rowId, record.getRecordId());
> +        break;
> +      case APPEND_COLUMN_VALUES:
> +        mutatableAction.appendColumns(rowId, record);
> +        break;
> +      case REPLACE_ENTIRE_RECORD:
> +        mutatableAction.replaceRecord(rowId, record);
> +        break;
> +      case REPLACE_COLUMNS:
> +        mutatableAction.replaceColumns(rowId, record);
> +        break;
> +      default:
> +        throw new RuntimeException("Unsupported record mutation type [" + type + "]");
> +      }
> +    }
> +  }
> +
> +  public void mutate(List<RowMutation> mutations) {
> +    for (int i = 0; i < mutations.size(); i++) {
> +      mutate(mutations.get(i));
> +    }
> +  }
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
> new file mode 100644
> index 0000000..f44dea9
> --- /dev/null
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
> @@ -0,0 +1,125 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.blur.manager.writer;
> +
> +import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF;
> +import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_MAX;
> +
> +import java.io.Closeable;
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.List;
> +import java.util.concurrent.atomic.AtomicBoolean;
> +
> +import org.apache.blur.BlurConfiguration;
> +import org.apache.blur.log.Log;
> +import org.apache.blur.log.LogFactory;
> +import org.apache.blur.server.ShardContext;
> +import org.apache.blur.server.TableContext;
> +import org.apache.blur.thrift.generated.RowMutation;
> +
> +public abstract class QueueReader implements Closeable, Runnable {
> +
> +  private static final Log LOG = LogFactory.getLog(QueueReader.class);
> +
> +  protected final ShardContext _shardContext;
> +  protected final BlurIndex _index;
> +  protected final long _backOff;
> +  protected final Thread _daemon;
> +  protected final AtomicBoolean _running = new AtomicBoolean();
> +  protected final int _max;
> +  protected final TableContext _tableContext;
> +
> +  public QueueReader(BlurIndex index, ShardContext shardContext) {
> +    _running.set(true);
> +    _index = index;
> +    _shardContext = shardContext;
> +    _tableContext = _shardContext.getTableContext();
> +    BlurConfiguration configuration = _tableContext.getBlurConfiguration();
> +    _backOff = configuration.getLong(BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF, 500);
> +    _max = configuration.getInt(BLUR_SHARD_INDEX_QUEUE_READER_MAX, 500);
> +    _daemon = new Thread(this);
> +    _daemon.setName("Queue Loader for [" + _tableContext.getTable() + "/" + shardContext.getShard()
+ "]");
> +    _daemon.setDaemon(true);
> +    _daemon.start();
> +  }

Hmm... there's a timing issue here.  Any subclasses that implement
queue-like things are likely to have costly initialization work to do.
 The problem is that this thread starts [or could start] "taking"
before the subclass' constructor has completed its initialization. If
we keep this, then every implementation would be forced to create a
block on the initial take.

We could move the runnable to an inner class, but that'd force callers
to call 'start' or 'listen' or somesuch, which we could use to start
the thread... thoughts?

Thanks,
--tim

Mime
View raw message