incubator-blur-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron McCurry <amccu...@gmail.com>
Subject Re: [3/3] git commit: Adding a new feature to allow for shards to pull directly from a queue like interface.
Date Wed, 26 Feb 2014 13:07:01 GMT
Yeah I think that's a good idea. Thanks Tim. 

Sent from my iPhone

On Feb 26, 2014, at 8:02 AM, Tim Williams <williamstw@gmail.com> wrote:

> On Sat, Feb 22, 2014 at 9:47 PM,  <amccurry@apache.org> wrote:
>> Adding a new feature to allow for shards to pull directly from a queue like interface.
>> 
>> 
>> Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/052c131e
>> Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/052c131e
>> Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/052c131e
>> 
>> Branch: refs/heads/apache-blur-0.2
>> Commit: 052c131e92e0caefb0c513fe52098ad6c6e04d3a
>> Parents: 31f23a3
>> Author: Aaron McCurry <amccurry@gmail.com>
>> Authored: Sat Feb 22 21:47:25 2014 -0500
>> Committer: Aaron McCurry <amccurry@gmail.com>
>> Committed: Sat Feb 22 21:47:25 2014 -0500
>> 
>> ----------------------------------------------------------------------
>> .../org/apache/blur/manager/IndexManager.java   |  49 +-----
>> .../manager/writer/BlurIndexSimpleWriter.java   |   5 +-
>> .../blur/manager/writer/MutatableAction.java    |  53 +++++++
>> .../apache/blur/manager/writer/QueueReader.java | 125 +++++++++++++++
>> .../org/apache/blur/server/TableContext.java    |  33 ++++
>> .../writer/BlurIndexSimpleWriterTest.java       |   3 -
>> .../writer/QueueReaderBasicInMemory.java        |  49 ++++++
>> .../blur/manager/writer/QueueReaderTest.java    | 158 +++++++++++++++++++
>> .../org/apache/blur/utils/BlurConstants.java    |   4 +
>> 9 files changed, 429 insertions(+), 50 deletions(-)
>> ----------------------------------------------------------------------
>> 
>> 
>> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
>> ----------------------------------------------------------------------
>> diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
>> index 093cd81..c8822d0 100644
>> --- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
>> +++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
>> @@ -460,7 +460,8 @@ public class IndexManager {
>>       }
>>       String rowId = blurQuery.getRowId();
>>       if (rowId != null) {
>> -        // reduce the index selection down to the only one that would contain the
row.
>> +        // reduce the index selection down to the only one that would contain
>> +        // the row.
>>         Map<String, BlurIndex> map = new HashMap<String, BlurIndex>();
>>         String shard = MutationHelper.getShardName(table, rowId, getNumberOfShards(table),
_blurPartitioner);
>>         BlurIndex index = getBlurIndex(table, shard);
>> @@ -1207,25 +1208,7 @@ public class IndexManager {
>>       }
>>       ShardContext shardContext = blurIndex.getShardContext();
>>       final MutatableAction mutatableAction = new MutatableAction(shardContext);
>> -      for (int i = 0; i < mutations.size(); i++) {
>> -        RowMutation mutation = mutations.get(i);
>> -        RowMutationType type = mutation.rowMutationType;
>> -        switch (type) {
>> -        case REPLACE_ROW:
>> -          Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
>> -          mutatableAction.replaceRow(row);
>> -          break;
>> -        case UPDATE_ROW:
>> -          doUpdateRowMutation(mutation, mutatableAction);
>> -          break;
>> -        case DELETE_ROW:
>> -          mutatableAction.deleteRow(mutation.rowId);
>> -          break;
>> -        default:
>> -          throw new RuntimeException("Not supported [" + type + "]");
>> -        }
>> -      }
>> -
>> +      mutatableAction.mutate(mutations);
>>       return _mutateExecutor.submit(new Callable<Void>() {
>>         @Override
>>         public Void call() throws Exception {
>> @@ -1253,32 +1236,6 @@ public class IndexManager {
>>     return map;
>>   }
>> 
>> -  private void doUpdateRowMutation(RowMutation mutation, MutatableAction mutatableAction)
throws BlurException,
>> -      IOException {
>> -    String rowId = mutation.getRowId();
>> -
>> -    for (RecordMutation recordMutation : mutation.getRecordMutations()) {
>> -      RecordMutationType type = recordMutation.recordMutationType;
>> -      Record record = recordMutation.getRecord();
>> -      switch (type) {
>> -      case DELETE_ENTIRE_RECORD:
>> -        mutatableAction.deleteRecord(rowId, record.getRecordId());
>> -        break;
>> -      case APPEND_COLUMN_VALUES:
>> -        mutatableAction.appendColumns(rowId, record);
>> -        break;
>> -      case REPLACE_ENTIRE_RECORD:
>> -        mutatableAction.replaceRecord(rowId, record);
>> -        break;
>> -      case REPLACE_COLUMNS:
>> -        mutatableAction.replaceColumns(rowId, record);
>> -        break;
>> -      default:
>> -        throw new RuntimeException("Unsupported record mutation type [" + type +
"]");
>> -      }
>> -    }
>> -  }
>> -
>>   private int getNumberOfShards(String table) {
>>     return getTableContext(table).getDescriptor().getShardCount();
>>   }
>> 
>> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
>> ----------------------------------------------------------------------
>> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
>> index f9dc1c8..306479b 100644
>> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
>> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
>> @@ -67,6 +67,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>>   private final AtomicReference<BlurIndexWriter> _writer = new AtomicReference<BlurIndexWriter>();
>>   private final boolean _makeReaderExitable = true;
>>   private IndexImporter _indexImporter;
>> +  private QueueReader _queueReader;
>>   private final ReadWriteLock _lock = new ReentrantReadWriteLock();
>>   private final Lock _writeLock = _lock.writeLock();
>>   private final ReadWriteLock _indexRefreshLock = new ReentrantReadWriteLock();
>> @@ -134,6 +135,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>> 
>>   private Thread getWriterOpener(ShardContext shardContext) {
>>     Thread thread = new Thread(new Runnable() {
>> +
>>       @Override
>>       public void run() {
>>         try {
>> @@ -142,6 +144,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>>             _writer.notify();
>>           }
>>           _indexImporter = new IndexImporter(BlurIndexSimpleWriter.this, _shardContext,
TimeUnit.SECONDS, 10);
>> +          _queueReader = _tableContext.getQueueReader(BlurIndexSimpleWriter.this,
_shardContext);
>>         } catch (IOException e) {
>>           LOG.error("Unknown error on index writer open.", e);
>>         }
>> @@ -206,7 +209,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>>   @Override
>>   public void close() throws IOException {
>>     _isClosed.set(true);
>> -    IOUtils.cleanup(LOG, _indexImporter, _writer.get(), _indexReader.get());
>> +    IOUtils.cleanup(LOG, _indexImporter, _queueReader, _writer.get(), _indexReader.get());
>>   }
>> 
>>   @Override
>> 
>> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
>> ----------------------------------------------------------------------
>> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
>> index c8f0dfd..2a7159b 100644
>> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
>> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
>> @@ -33,11 +33,16 @@ import org.apache.blur.manager.IndexManager;
>> import org.apache.blur.server.IndexSearcherClosable;
>> import org.apache.blur.server.ShardContext;
>> import org.apache.blur.server.TableContext;
>> +import org.apache.blur.thrift.MutationHelper;
>> import org.apache.blur.thrift.generated.Column;
>> import org.apache.blur.thrift.generated.FetchResult;
>> import org.apache.blur.thrift.generated.FetchRowResult;
>> import org.apache.blur.thrift.generated.Record;
>> +import org.apache.blur.thrift.generated.RecordMutation;
>> +import org.apache.blur.thrift.generated.RecordMutationType;
>> import org.apache.blur.thrift.generated.Row;
>> +import org.apache.blur.thrift.generated.RowMutation;
>> +import org.apache.blur.thrift.generated.RowMutationType;
>> import org.apache.blur.thrift.generated.Selector;
>> import org.apache.blur.utils.BlurConstants;
>> import org.apache.blur.utils.RowDocumentUtil;
>> @@ -376,4 +381,52 @@ public class MutatableAction extends IndexAction {
>> 
>>   }
>> 
>> +  public void mutate(RowMutation mutation) {
>> +    RowMutationType type = mutation.rowMutationType;
>> +    switch (type) {
>> +    case REPLACE_ROW:
>> +      Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
>> +      replaceRow(row);
>> +      break;
>> +    case UPDATE_ROW:
>> +      doUpdateRowMutation(mutation, this);
>> +      break;
>> +    case DELETE_ROW:
>> +      deleteRow(mutation.rowId);
>> +      break;
>> +    default:
>> +      throw new RuntimeException("Not supported [" + type + "]");
>> +    }
>> +  }
>> +
>> +  private void doUpdateRowMutation(RowMutation mutation, MutatableAction mutatableAction)
{
>> +    String rowId = mutation.getRowId();
>> +    for (RecordMutation recordMutation : mutation.getRecordMutations()) {
>> +      RecordMutationType type = recordMutation.recordMutationType;
>> +      Record record = recordMutation.getRecord();
>> +      switch (type) {
>> +      case DELETE_ENTIRE_RECORD:
>> +        mutatableAction.deleteRecord(rowId, record.getRecordId());
>> +        break;
>> +      case APPEND_COLUMN_VALUES:
>> +        mutatableAction.appendColumns(rowId, record);
>> +        break;
>> +      case REPLACE_ENTIRE_RECORD:
>> +        mutatableAction.replaceRecord(rowId, record);
>> +        break;
>> +      case REPLACE_COLUMNS:
>> +        mutatableAction.replaceColumns(rowId, record);
>> +        break;
>> +      default:
>> +        throw new RuntimeException("Unsupported record mutation type [" + type +
"]");
>> +      }
>> +    }
>> +  }
>> +
>> +  public void mutate(List<RowMutation> mutations) {
>> +    for (int i = 0; i < mutations.size(); i++) {
>> +      mutate(mutations.get(i));
>> +    }
>> +  }
>> +
>> }
>> 
>> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
>> ----------------------------------------------------------------------
>> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
>> new file mode 100644
>> index 0000000..f44dea9
>> --- /dev/null
>> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
>> @@ -0,0 +1,125 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.blur.manager.writer;
>> +
>> +import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF;
>> +import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_MAX;
>> +
>> +import java.io.Closeable;
>> +import java.io.IOException;
>> +import java.util.ArrayList;
>> +import java.util.List;
>> +import java.util.concurrent.atomic.AtomicBoolean;
>> +
>> +import org.apache.blur.BlurConfiguration;
>> +import org.apache.blur.log.Log;
>> +import org.apache.blur.log.LogFactory;
>> +import org.apache.blur.server.ShardContext;
>> +import org.apache.blur.server.TableContext;
>> +import org.apache.blur.thrift.generated.RowMutation;
>> +
>> +public abstract class QueueReader implements Closeable, Runnable {
>> +
>> +  private static final Log LOG = LogFactory.getLog(QueueReader.class);
>> +
>> +  protected final ShardContext _shardContext;
>> +  protected final BlurIndex _index;
>> +  protected final long _backOff;
>> +  protected final Thread _daemon;
>> +  protected final AtomicBoolean _running = new AtomicBoolean();
>> +  protected final int _max;
>> +  protected final TableContext _tableContext;
>> +
>> +  public QueueReader(BlurIndex index, ShardContext shardContext) {
>> +    _running.set(true);
>> +    _index = index;
>> +    _shardContext = shardContext;
>> +    _tableContext = _shardContext.getTableContext();
>> +    BlurConfiguration configuration = _tableContext.getBlurConfiguration();
>> +    _backOff = configuration.getLong(BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF, 500);
>> +    _max = configuration.getInt(BLUR_SHARD_INDEX_QUEUE_READER_MAX, 500);
>> +    _daemon = new Thread(this);
>> +    _daemon.setName("Queue Loader for [" + _tableContext.getTable() + "/" + shardContext.getShard()
+ "]");
>> +    _daemon.setDaemon(true);
>> +    _daemon.start();
>> +  }
> 
> Hmm... there's a timing issue here.  Any subclasses that implement
> queue-like things are likely to have costly initialization work to do.
> The problem is that this thread starts [or could start] "taking"
> before the subclass' constructor has completed its initialization. If
> we keep this, then every implementation would be forced to create a
> block on the initial take.
> 
> We could move the runnable to an inner class, but that'd force callers
> to call 'start' or 'listen' or somesuch, which we could use to start
> the thread... thoughts?
> 
> Thanks,
> --tim

Mime
View raw message