incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [7/7] git commit: Modified code to make use of new api.
Date Tue, 30 Dec 2014 14:53:06 GMT
Modified code to make use of new api.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/491f8930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/491f8930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/491f8930

Branch: refs/heads/master
Commit: 491f893059cc0463a93b1faf2a24360a085bce36
Parents: 36c8130
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Dec 30 09:50:17 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Dec 30 09:51:30 2014 -0500

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |   6 ++
 .../manager/writer/BlurIndexSimpleWriter.java   |  52 +++++----
 .../manager/writer/RowMutationWritable.java     | 105 +++++++++++++++++++
 .../apache/blur/server/FilteredBlurServer.java  |   6 ++
 .../blur/thrift/BlurControllerServer.java       |   6 ++
 .../org/apache/blur/thrift/BlurShardServer.java |  14 +++
 .../org/apache/blur/thrift/util/LoadData.java   |   4 +-
 7 files changed, 173 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/491f8930/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 0dbc634..46023cc 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -1313,4 +1313,10 @@ public class IndexManager {
     }
   }
 
+  public void bulkMutateAddMultiple(String table, String bulkId, List<RowMutation>
rowMutations) throws BlurException, IOException {
+    for (RowMutation mutation : rowMutations) {
+      bulkMutateAdd(table, bulkId, mutation);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/491f8930/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index a07cc34..c12c268 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -51,18 +51,21 @@ import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
-import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.SequenceFile.Sorter;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.util.Progressable;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.DirectoryReader;
@@ -417,7 +420,26 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       Path path = new Path(bulkInstance, _shardContext.getShard() + ".notsorted.seq");
       Configuration configuration = _tableContext.getConfiguration();
       FileSystem fileSystem = path.getFileSystem(configuration);
-      Writer writer = new SequenceFile.Writer(fileSystem, configuration, path, Text.class,
BytesWritable.class);
+
+      Progressable progress = new Progressable() {
+        @Override
+        public void progress() {
+
+        }
+      };
+      final CompressionCodec codec;
+      final CompressionType type;
+      if (SnappyCodec.isNativeSnappyLoaded(configuration)) {
+        codec = new SnappyCodec();
+        type = CompressionType.BLOCK;
+      } else {
+        codec = new DefaultCodec();
+        type = CompressionType.NONE;
+      }
+
+      Writer writer = SequenceFile.createWriter(fileSystem, configuration, path, Text.class,
RowMutationWritable.class,
+          type, codec, progress);
+
       _bulkWriters.put(bulkId, new BulkEntry(writer, path));
     } else {
       LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table [{2}].", bulkId,
_shardContext.getShard(),
@@ -454,7 +476,8 @@ public class BlurIndexSimpleWriter extends BlurIndex {
               public void performMutate(IndexSearcherClosable searcher, IndexWriter writer)
throws IOException {
                 Configuration configuration = _tableContext.getConfiguration();
 
-                SequenceFile.Sorter sorter = new Sorter(fileSystem, Text.class, BytesWritable.class,
configuration);
+                SequenceFile.Sorter sorter = new Sorter(fileSystem, Text.class, RowMutationWritable.class,
+                    configuration);
 
                 _sorted = new Path(path.getParent(), shard + ".sorted.seq");
 
@@ -466,7 +489,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
                 Reader reader = new SequenceFile.Reader(fileSystem, _sorted, configuration);
 
                 Text key = new Text();
-                BytesWritable value = new BytesWritable();
+                RowMutationWritable value = new RowMutationWritable();
 
                 Text last = null;
                 List<RowMutation> list = new ArrayList<RowMutation>();
@@ -476,7 +499,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
                     last = new Text(key);
                     list.clear();
                   }
-                  list.add(fromBytesWritable(value));
+                  list.add(value.getRowMutation().deepCopy());
                 }
                 flushMutates(searcher, writer, list);
                 reader.close();
@@ -549,24 +572,17 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     if (bulkEntry == null) {
       throw new IOException("Bulk writer for [" + bulkId + "] not found.");
     }
-    bulkEntry._writer.append(getKey(mutation), toBytesWritable(mutation));
+    RowMutationWritable rowMutationWritable = new RowMutationWritable();
+    rowMutationWritable.setRowMutation(mutation);
+    synchronized (bulkEntry._writer) {
+      bulkEntry._writer.append(getKey(mutation), rowMutationWritable);
+    }
   }
 
   private Text getKey(RowMutation mutation) {
     return new Text(mutation.getRowId());
   }
 
-  private BytesWritable toBytesWritable(RowMutation mutation) {
-    BytesWritable value = new BytesWritable();
-    byte[] bytes = BlurUtil.toBytes(mutation);
-    value.set(bytes, 0, bytes.length);
-    return value;
-  }
-
-  private RowMutation fromBytesWritable(BytesWritable value) {
-    return (RowMutation) BlurUtil.fromBytes(value.getBytes(), 0, value.getLength());
-  }
-
   private static void removeParentIfLastFile(final FileSystem fileSystem, Path parent) throws
IOException {
     FileStatus[] listStatus = fileSystem.listStatus(parent);
     if (listStatus != null) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/491f8930/blur-core/src/main/java/org/apache/blur/manager/writer/RowMutationWritable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/RowMutationWritable.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/RowMutationWritable.java
new file mode 100644
index 0000000..393d00c
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/RowMutationWritable.java
@@ -0,0 +1,105 @@
+package org.apache.blur.manager.writer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TCompactProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class RowMutationWritable implements Writable {
+
+  private RowMutation _rowMutation = new RowMutation();
+
+  public RowMutation getRowMutation() {
+    return _rowMutation;
+  }
+
+  public void setRowMutation(RowMutation rowMutation) {
+    this._rowMutation = rowMutation;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    try {
+      _rowMutation.read(getProtocol(in, null));
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private TProtocol getProtocol(DataInput in, DataOutput out) {
+    return new TCompactProtocol(getTransport(in, out));
+  }
+
+  private TTransport getTransport(final DataInput in, final DataOutput out) {
+    return new TTransport() {
+
+      @Override
+      public void write(byte[] buf, int off, int len) throws TTransportException {
+        try {
+          out.write(buf, off, len);
+        } catch (IOException e) {
+          throw new TTransportException(e);
+        }
+      }
+
+      @Override
+      public int read(byte[] buf, int off, int len) throws TTransportException {
+        try {
+          in.readFully(buf, off, len);
+        } catch (IOException e) {
+          throw new TTransportException(e);
+        }
+        return len;
+      }
+
+      @Override
+      public void open() throws TTransportException {
+        throw new RuntimeException("Not Implemented");
+      }
+
+      @Override
+      public boolean isOpen() {
+        throw new RuntimeException("Not Implemented");
+      }
+
+      @Override
+      public void close() {
+        throw new RuntimeException("Not Implemented");
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    try {
+      _rowMutation.write(getProtocol(null, out));
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/491f8930/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
index 47a9376..c9ebef0 100644
--- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
@@ -306,4 +306,10 @@ public class FilteredBlurServer implements Iface {
     _iface.bulkMutateFinish(table, bulkId, apply, blockUntilComplete);
   }
 
+  @Override
+  public void bulkMutateAddMultiple(String table, String bulkId, List<RowMutation>
rowMutations) throws BlurException,
+      TException {
+    _iface.bulkMutateAddMultiple(table, bulkId, rowMutations);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/491f8930/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 4cd2327..dbef564 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -1748,4 +1748,10 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     }
   }
 
+  @Override
+  public void bulkMutateAddMultiple(String table, String bulkId, List<RowMutation>
rowMutations) throws BlurException,
+      TException {
+    throw new RuntimeException("Not Implemented");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/491f8930/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 976896c..0986b6d 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -734,4 +734,18 @@ public class BlurShardServer extends TableAdmin implements Iface {
     }
   }
 
+  @Override
+  public void bulkMutateAddMultiple(String table, String bulkId, List<RowMutation>
rowMutations) throws BlurException,
+      TException {
+    try {
+      _indexManager.bulkMutateAddMultiple(table, bulkId, rowMutations);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to add to a bulk mutate on table [" + table +
"]", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/491f8930/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java
index ceac9f6..0fdec4d 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java
@@ -128,7 +128,7 @@ public class LoadData {
     out.flush();
   }
 
-  private static void loadWords() throws IOException {
+  public static void loadWords() throws IOException {
     InputStream inputStream = LoadData.class.getResourceAsStream("words.txt");
     BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
     String word;
@@ -170,7 +170,7 @@ public class LoadData {
     return "fam" + random.nextInt(numberOfFamilies);
   }
 
-  private static String getWord() {
+  public static String getWord() {
     return makeUpperCaseRandomly(words.get(random.nextInt(words.size())), random);
   }
 


Mime
View raw message