incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [9/9] git commit: Making API changes to remove table from API, and adding unit tests.
Date Tue, 20 Jan 2015 01:44:16 GMT
Making API changes to remove table from API, and adding unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/016a3865
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/016a3865
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/016a3865

Branch: refs/heads/master
Commit: 016a38656c31dccc9008debfefa4d6576a81657d
Parents: dcaf106
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jan 19 20:43:55 2015 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jan 19 20:43:55 2015 -0500

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |  17 +--
 .../apache/blur/manager/writer/BlurIndex.java   |   2 -
 .../blur/manager/writer/BlurIndexReadOnly.java  |   5 -
 .../manager/writer/BlurIndexSimpleWriter.java   |  71 ++++++++--
 .../apache/blur/server/FilteredBlurServer.java  |  18 +--
 .../blur/thrift/BlurControllerServer.java       | 132 ++++++++++++-------
 .../org/apache/blur/thrift/BlurShardServer.java |  62 +++++----
 .../java/org/apache/blur/thrift/TableAdmin.java |  12 ++
 .../blur/command/ShardCommandManagerTest.java   |   7 +-
 .../blur/manager/writer/IndexImporterTest.java  |   5 -
 .../apache/blur/thrift/BlurClusterTestBase.java |  41 +++++-
 .../blur/thrift/BlurClusterTestNoSecurity.java  |  13 ++
 .../blur/hive/BlurHiveOutputCommitter.java      |   5 +-
 .../apache/blur/hive/BlurHiveOutputFormat.java  |   2 +-
 .../blur/hive/BlurHiveStorageHandler.java       |   8 +-
 15 files changed, 269 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index cea6f68..56b00b0 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -1305,13 +1305,6 @@ public class IndexManager {
     enqueue(Arrays.asList(mutation));
   }
 
-  public void bulkMutateStart(String table, String bulkId) throws BlurException, IOException
{
-    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
-    for (BlurIndex index : indexes.values()) {
-      index.startBulkMutate(bulkId);
-    }
-  }
-
   public void bulkMutateAdd(String table, String bulkId, RowMutation mutation) throws BlurException,
IOException {
     String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table),
_blurPartitioner);
     Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
@@ -1322,11 +1315,13 @@ public class IndexManager {
     blurIndex.addBulkMutate(bulkId, mutation);
   }
 
-  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
+  public void bulkMutateFinish(Set<String> potentialTables, String bulkId, boolean
apply, boolean blockUntilComplete)
       throws BlurException, IOException {
-    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
-    for (BlurIndex index : indexes.values()) {
-      index.finishBulkMutate(bulkId, apply, blockUntilComplete);
+    for (String table : potentialTables) {
+      Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+      for (BlurIndex index : indexes.values()) {
+        index.finishBulkMutate(bulkId, apply, blockUntilComplete);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index f098ad7..192b6fa 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -135,8 +135,6 @@ public abstract class BlurIndex {
 
   public abstract void enqueue(List<RowMutation> mutations) throws IOException;
 
-  public abstract void startBulkMutate(String bulkId) throws IOException;
-
   public abstract void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException;
 
   public abstract void addBulkMutate(String bulkId, RowMutation mutation) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
index 6025068..9f1ce8a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
@@ -75,11 +75,6 @@ public class BlurIndexReadOnly extends BlurIndex {
   }
 
   @Override
-  public void startBulkMutate(String bulkId) {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
   public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
{
     throw new RuntimeException("Read-only shard");
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 1a66932..7595663 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -22,6 +22,7 @@ import static org.apache.blur.utils.BlurConstants.ACL_READ;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -79,6 +80,7 @@ import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.SnappyCodec;
 import org.apache.hadoop.util.Progressable;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.BlurIndexWriter;
@@ -514,8 +516,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     }
   }
 
-  @Override
-  public void startBulkMutate(String bulkId) throws IOException {
+  public BulkEntry startBulkMutate(String bulkId) throws IOException {
     BulkEntry bulkEntry = _bulkWriters.get(bulkId);
     if (bulkEntry == null) {
       Path tablePath = _tableContext.getTablePath();
@@ -533,22 +534,70 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       };
       final CompressionCodec codec;
       final CompressionType type;
-      // if (SnappyCodec.isNativeSnappyLoaded(configuration)) {
-      // codec = new SnappyCodec();
-      // type = CompressionType.BLOCK;
-      // } else {
-      codec = new DefaultCodec();
-      type = CompressionType.NONE;
-      // }
+
+      if (isSnappyCodecLoaded(configuration)) {
+        codec = new SnappyCodec();
+        type = CompressionType.BLOCK;
+      } else {
+        codec = new DefaultCodec();
+        type = CompressionType.NONE;
+      }
 
       Writer writer = SequenceFile.createWriter(fileSystem, configuration, path, Text.class,
RowMutationWritable.class,
           type, codec, progress);
 
-      _bulkWriters.put(bulkId, new BulkEntry(writer, path));
+      bulkEntry = new BulkEntry(writer, path);
+      _bulkWriters.put(bulkId, bulkEntry);
     } else {
       LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table [{2}].", bulkId,
_shardContext.getShard(),
           _tableContext.getTable());
     }
+    return bulkEntry;
+  }
+
+  private boolean isSnappyCodecLoaded(Configuration configuration) {
+    try {
+      Method methodHadoop1 = SnappyCodec.class.getMethod("isNativeSnappyLoaded", new Class[]
{ Configuration.class });
+      Boolean loaded = (Boolean) methodHadoop1.invoke(null, new Object[] { configuration
});
+      if (loaded != null && loaded) {
+        LOG.info("Using SnappyCodec");
+        return true;
+      } else {
+        LOG.info("Not using SnappyCodec");
+        return false;
+      }
+    } catch (NoSuchMethodException e) {
+      Method methodHadoop2;
+      try {
+        methodHadoop2 = SnappyCodec.class.getMethod("isNativeCodeLoaded", new Class[] {});
+      } catch (NoSuchMethodException ex) {
+        LOG.info("Can not determine if SnappyCodec is loaded.");
+        return false;
+      } catch (SecurityException ex) {
+        LOG.error("Not allowed.", ex);
+        return false;
+      }
+      Boolean loaded;
+      try {
+        loaded = (Boolean) methodHadoop2.invoke(null);
+        if (loaded != null && loaded) {
+          LOG.info("Using SnappyCodec");
+          return true;
+        } else {
+          LOG.info("Not using SnappyCodec");
+          return false;
+        }
+      } catch (Exception ex) {
+        LOG.info("Unknown error while trying to determine if SnappyCodec is loaded.", ex);
+        return false;
+      }
+    } catch (SecurityException e) {
+      LOG.error("Not allowed.", e);
+      return false;
+    } catch (Exception e) {
+      LOG.info("Unknown error while trying to determine if SnappyCodec is loaded.", e);
+      return false;
+    }
   }
 
   @Override
@@ -674,7 +723,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException {
     BulkEntry bulkEntry = _bulkWriters.get(bulkId);
     if (bulkEntry == null) {
-      throw new IOException("Bulk writer for [" + bulkId + "] not found.");
+      bulkEntry = startBulkMutate(bulkId);
     }
     RowMutationWritable rowMutationWritable = new RowMutationWritable();
     rowMutationWritable.setRowMutation(mutation);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
index c9ebef0..6a7eb17 100644
--- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
@@ -292,24 +292,24 @@ public class FilteredBlurServer implements Iface {
   }
 
   @Override
-  public void bulkMutateStart(String table, String bulkId) throws BlurException, TException
{
-    _iface.bulkMutateStart(table, bulkId);
+  public void bulkMutateStart(String bulkId) throws BlurException, TException {
+    _iface.bulkMutateStart(bulkId);
   }
 
   @Override
-  public void bulkMutateAdd(String table, String bulkId, RowMutation rowMutation) throws
BlurException, TException {
-    _iface.bulkMutateAdd(table, bulkId, rowMutation);
+  public void bulkMutateAdd(String bulkId, RowMutation rowMutation) throws BlurException,
TException {
+    _iface.bulkMutateAdd(bulkId, rowMutation);
   }
 
   @Override
-  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException, TException {
-    _iface.bulkMutateFinish(table, bulkId, apply, blockUntilComplete);
+  public void bulkMutateFinish(String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException,
+      TException {
+    _iface.bulkMutateFinish(bulkId, apply, blockUntilComplete);
   }
 
   @Override
-  public void bulkMutateAddMultiple(String table, String bulkId, List<RowMutation>
rowMutations) throws BlurException,
-      TException {
-    _iface.bulkMutateAddMultiple(table, bulkId, rowMutations);
+  public void bulkMutateAddMultiple(String bulkId, List<RowMutation> rowMutations)
throws BlurException, TException {
+    _iface.bulkMutateAddMultiple(bulkId, rowMutations);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index dbef564..77c8fdc 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -1673,85 +1673,129 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     throw new BException("Not Implemented");
   }
 
+  // @Override
+  // public void bulkMutateStart(final String bulkId) throws BlurException,
+  // TException {
+  // String cluster = getCluster(table);
+  // try {
+  // scatter(cluster, new BlurCommand<Void>() {
+  // @Override
+  // public Void call(Client client) throws BlurException, TException {
+  // client.bulkMutateStart(bulkId);
+  // return null;
+  // }
+  // });
+  // } catch (Exception e) {
+  // LOG.error("Unknown error while trying to get start a bulk mutate [{0}] [{1}]",
+  // e, bulkId);
+  // if (e instanceof BlurException) {
+  // throw (BlurException) e;
+  // }
+  // throw new BException(e.getMessage(), e);
+  // }
+  // }
+  //
   @Override
-  public void bulkMutateStart(final String table, final String bulkId) throws BlurException,
TException {
-    String cluster = getCluster(table);
+  public void bulkMutateAdd(final String bulkId, final RowMutation mutation) throws BlurException,
TException {
     try {
-      scatter(cluster, new BlurCommand<Void>() {
-        @Override
-        public Void call(Client client) throws BlurException, TException {
-          client.bulkMutateStart(table, bulkId);
-          return null;
-        }
-      });
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get start a bulk mutate [{0}] [{1}]", e, table,
bulkId);
-      if (e instanceof BlurException) {
-        throw (BlurException) e;
-      }
-      throw new BException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void bulkMutateAdd(final String table, final String bulkId, final RowMutation mutation)
throws BlurException, TException {
-    try {
-      checkTable(mutation.table);
-      checkForUpdates(mutation.table);
+      String table = mutation.getTable();
+      checkTable(table);
+      checkForUpdates(table);
       MutationHelper.validateMutation(mutation);
-      if (!table.equals(mutation.getTable())) {
-        throw new BException("RowMutation table [{0}] has to match method table [{1}]", mutation.getTable(),
table);
-      }
 
       int numberOfShards = getShardCount(table);
       Map<String, String> tableLayout = getTableLayout(table);
       if (tableLayout.size() != numberOfShards) {
         throw new BException("Cannot update data while shard is missing");
       }
-
       String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards,
_blurPartitioner);
       String node = tableLayout.get(shardName);
       _client.execute(node, new BlurCommand<Void>() {
         @Override
         public Void call(Client client) throws BlurException, TException {
-          client.bulkMutateAdd(table, bulkId, mutation);
+          client.bulkMutateAdd(bulkId, mutation);
           return null;
         }
       }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
     } catch (Exception e) {
-      LOG.error("Unknown error during bulk mutation of [{0}]", e, mutation);
+      LOG.error("Unknown error during bulk mutation of [{0}]", e, bulkId);
       if (e instanceof BlurException) {
         throw (BlurException) e;
       }
-      throw new BException("Unknown error during bulk mutation of [{0}]", e, mutation);
+      throw new BException("Unknown error during bulk mutation of [{0}]", e, bulkId);
     }
   }
 
   @Override
-  public void bulkMutateFinish(final String table, final String bulkId, final boolean apply,
final boolean blockUntilComplete) throws BlurException,
+  public void bulkMutateAddMultiple(final String bulkId, List<RowMutation> rowMutations)
throws BlurException,
       TException {
-    String cluster = getCluster(table);
     try {
-      scatter(cluster, new BlurCommand<Void>() {
-        @Override
-        public Void call(Client client) throws BlurException, TException {
-          client.bulkMutateFinish(table, bulkId, apply, blockUntilComplete);
-          return null;
-        }
-      });
+      Map<String, List<RowMutation>> batches = batchByServer(rowMutations);
+      for (Entry<String, List<RowMutation>> entry : batches.entrySet()) {
+        String node = entry.getKey();
+        final List<RowMutation> batch = entry.getValue();
+        _client.execute(node, new BlurCommand<Void>() {
+          @Override
+          public Void call(Client client) throws BlurException, TException {
+            client.bulkMutateAddMultiple(bulkId, batch);
+            return null;
+          }
+        }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
+      }
     } catch (Exception e) {
-      LOG.error("Unknown error while trying to get finish a bulk mutate [{0}] [{1}]", e,
table, bulkId);
+      LOG.error("Unknown error during bulk mutation of [{0}]", e, bulkId);
       if (e instanceof BlurException) {
         throw (BlurException) e;
       }
-      throw new BException(e.getMessage(), e);
+      throw new BException("Unknown error during bulk mutation of [{0}]", e, bulkId);
+    }
+  }
+
+  private Map<String, List<RowMutation>> batchByServer(List<RowMutation>
rowMutations) throws TException {
+    Map<String, List<RowMutation>> result = new HashMap<String, List<RowMutation>>();
+    for (RowMutation rowMutation : rowMutations) {
+      String table = rowMutation.getTable();
+      checkTable(table);
+      checkForUpdates(table);
+      int numberOfShards = getShardCount(table);
+      Map<String, String> tableLayout = getTableLayout(table);
+      if (tableLayout.size() != numberOfShards) {
+        throw new BException("Cannot update data while shard is missing");
+      }
+      String shardName = MutationHelper.getShardName(table, rowMutation.getRowId(), numberOfShards,
_blurPartitioner);
+      String node = tableLayout.get(shardName);
+
+      List<RowMutation> list = result.get(node);
+      if (list == null) {
+        result.put(node, list = new ArrayList<RowMutation>());
+      }
+      MutationHelper.validateMutation(rowMutation);
+      list.add(rowMutation);
     }
+    return result;
   }
 
   @Override
-  public void bulkMutateAddMultiple(String table, String bulkId, List<RowMutation>
rowMutations) throws BlurException,
-      TException {
-    throw new RuntimeException("Not Implemented");
+  public void bulkMutateFinish(final String bulkId, final boolean apply, final boolean blockUntilComplete)
+      throws BlurException, TException {
+    List<String> shardClusterList = shardClusterList();
+    for (String cluster : shardClusterList) {
+      try {
+        scatter(cluster, new BlurCommand<Void>() {
+          @Override
+          public Void call(Client client) throws BlurException, TException {
+            client.bulkMutateFinish(bulkId, apply, blockUntilComplete);
+            return null;
+          }
+        });
+      } catch (Exception e) {
+        LOG.error("Unknown error while trying to get finish a bulk mutate [{0}] [{1}]", e,
bulkId);
+        if (e instanceof BlurException) {
+          throw (BlurException) e;
+        }
+        throw new BException(e.getMessage(), e);
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 0986b6d..05fdb7d 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -23,6 +23,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_C
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -696,11 +697,15 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public void bulkMutateStart(String table, String bulkId) throws BlurException, TException
{
+  public void bulkMutateAdd(String bulkId, RowMutation rowMutation) throws BlurException,
TException {
+    String table = rowMutation.getTable();
+    checkTable(table);
+    checkForUpdates(table);
+    MutationHelper.validateMutation(rowMutation);
     try {
-      _indexManager.bulkMutateStart(table, bulkId);
+      _indexManager.bulkMutateAdd(table, bulkId, rowMutation);
     } catch (Exception e) {
-      LOG.error("Unknown error while trying to start a bulk mutate on table [" + table +
"]", e);
+      LOG.error("Unknown error while trying to add to a bulk mutate on table [" + table +
"]", e);
       if (e instanceof BlurException) {
         throw (BlurException) e;
       }
@@ -709,11 +714,13 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public void bulkMutateAdd(String table, String bulkId, RowMutation rowMutation) throws
BlurException, TException {
+  public void bulkMutateFinish(String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException,
+      TException {
     try {
-      _indexManager.bulkMutateAdd(table, bulkId, rowMutation);
+      List<String> tableListByCluster = tableListByCluster(_cluster);
+      _indexManager.bulkMutateFinish(new HashSet<String>(tableListByCluster), bulkId,
apply, blockUntilComplete);
     } catch (Exception e) {
-      LOG.error("Unknown error while trying to add to a bulk mutate on table [" + table +
"]", e);
+      LOG.error("Unknown error while trying to finsh a bulk mutate [" + bulkId + "]", e);
       if (e instanceof BlurException) {
         throw (BlurException) e;
       }
@@ -722,30 +729,37 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException, TException {
-    try {
-      _indexManager.bulkMutateFinish(table, bulkId, apply,blockUntilComplete);
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to finsh a bulk mutate on table [" + table +
"]", e);
-      if (e instanceof BlurException) {
-        throw (BlurException) e;
+  public void bulkMutateAddMultiple(String bulkId, List<RowMutation> rowMutations)
throws BlurException, TException {
+    Map<String, List<RowMutation>> batches = batchByTable(rowMutations);
+    for (Entry<String, List<RowMutation>> entry : batches.entrySet()) {
+      String table = entry.getKey();
+      List<RowMutation> batch = entry.getValue();
+      try {
+        _indexManager.bulkMutateAddMultiple(table, bulkId, batch);
+      } catch (Exception e) {
+        LOG.error("Unknown error while trying to add to a bulk mutate on table [" + table
+ "]", e);
+        if (e instanceof BlurException) {
+          throw (BlurException) e;
+        }
+        throw new BException(e.getMessage(), e);
       }
-      throw new BException(e.getMessage(), e);
     }
   }
 
-  @Override
-  public void bulkMutateAddMultiple(String table, String bulkId, List<RowMutation>
rowMutations) throws BlurException,
-      TException {
-    try {
-      _indexManager.bulkMutateAddMultiple(table, bulkId, rowMutations);
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to add to a bulk mutate on table [" + table +
"]", e);
-      if (e instanceof BlurException) {
-        throw (BlurException) e;
+  private Map<String, List<RowMutation>> batchByTable(List<RowMutation>
rowMutations) throws BlurException {
+    Map<String, List<RowMutation>> result = new HashMap<String, List<RowMutation>>();
+    for (RowMutation rowMutation : rowMutations) {
+      String table = rowMutation.getTable();
+      checkTable(table);
+      checkForUpdates(table);
+      List<RowMutation> list = result.get(table);
+      if (list == null) {
+        result.put(table, list = new ArrayList<RowMutation>());
       }
-      throw new BException(e.getMessage(), e);
+      MutationHelper.validateMutation(rowMutation);
+      list.add(rowMutation);
     }
+    return result;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
index 67271da..7a0e724 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -48,6 +48,7 @@ import org.apache.blur.thrift.generated.ColumnDefinition;
 import org.apache.blur.thrift.generated.CommandDescriptor;
 import org.apache.blur.thrift.generated.Level;
 import org.apache.blur.thrift.generated.Metric;
+import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.Schema;
 import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.thrift.generated.ShardState;
@@ -706,4 +707,15 @@ public abstract class TableAdmin implements Iface {
     }
   }
 
+  @Override
+  public void bulkMutateStart(String bulkId) throws BlurException, TException {
+    // TODO Start transaction here...
+  }
+
+//  @Override
+//  public void bulkMutateFinish(String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException,
+//      TException {
+//    throw new RuntimeException("Not implemented.");
+//  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
index e3ecbe3..e9edf32 100644
--- a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
@@ -272,7 +272,7 @@ public class ShardCommandManagerTest {
             return;
           } catch (IOException e) {
             if (e.getCause() instanceof CancellationException) {
-              return;  
+              return;
             }
             e.printStackTrace();
             fail.set(true);
@@ -424,11 +424,6 @@ public class ShardCommandManagerTest {
       }
 
       @Override
-      public void startBulkMutate(String bulkId) throws IOException {
-        throw new RuntimeException("Not implemented.");
-      }
-
-      @Override
       public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
         throw new RuntimeException("Not implemented.");
       }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index 40e7720..41c63d6 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -189,11 +189,6 @@ public class IndexImporterTest {
       }
 
       @Override
-      public void startBulkMutate(String bulkId) throws IOException {
-        throw new RuntimeException("Not implemented.");
-      }
-
-      @Override
       public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
         throw new RuntimeException("Not implemented.");
       }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestBase.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestBase.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestBase.java
index 83d5adc..edbca2a 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestBase.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestBase.java
@@ -145,7 +145,7 @@ public abstract class BlurClusterTestBase {
     }
   }
 
-  private Iface getClient() {
+  protected Iface getClient() {
     if (controllerConnectionStr == null) {
       controllerConnectionStr = miniCluster.getControllerConnectionStr();
     }
@@ -575,7 +575,7 @@ public abstract class BlurClusterTestBase {
       setDebugRunSlow(tableName, false);
     }
   }
-  
+
   protected abstract Map<String, String> getUserAttributes();
 
   @Test
@@ -962,4 +962,41 @@ public abstract class BlurClusterTestBase {
     assertFalse(client.tableList().contains(tableName));
 
   }
+
+  @Test
+  public void testBulkMutate() throws BlurException, TException, IOException {
+    String tableName = "testBulkMutate";
+    createTable(tableName);
+
+    String bulkId = UUID.randomUUID().toString();
+    Iface client = getClient();
+    client.bulkMutateStart(bulkId);
+    int batchSize = 10;
+    int total = 10000;
+    int maxFacetValue = 100;
+    List<RowMutation> mutations = new ArrayList<RowMutation>();
+    Random random = new Random(1);
+    for (int i = 0; i < total; i++) {
+      String rowId = UUID.randomUUID().toString();
+      RecordMutation mutation = BlurThriftHelper.newRecordMutation("test", rowId,
+          BlurThriftHelper.newColumn("test", "value"),
+          BlurThriftHelper.newColumn("facet", Integer.toString(random.nextInt(maxFacetValue))),
+          BlurThriftHelper.newColumn("facetFixed", "test"));
+      RowMutation rowMutation = BlurThriftHelper.newRowMutation(tableName, rowId, mutation);
+      mutations.add(rowMutation);
+      if (mutations.size() >= batchSize) {
+        client.bulkMutateAddMultiple(bulkId, mutations);
+        mutations.clear();
+      }
+    }
+    if (mutations.size() > 0) {
+      client.bulkMutateAddMultiple(bulkId, mutations);
+      mutations.clear();
+    }
+    client.bulkMutateFinish(bulkId, true, true);
+
+    TableStats tableStats = client.tableStats(tableName);
+    assertEquals(total, tableStats.getRecordCount());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestNoSecurity.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestNoSecurity.java
b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestNoSecurity.java
index 1504f01..a7dc5dd 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestNoSecurity.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTestNoSecurity.java
@@ -16,13 +16,26 @@
  */
 package org.apache.blur.thrift;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
 
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.RecordMutation;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.thrift.util.BlurThriftHelper;
 import org.apache.blur.user.User;
+import org.junit.Test;
 
 public class BlurClusterTestNoSecurity extends BlurClusterTestBase {
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
index 979cabd..df182b3 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
@@ -18,12 +18,10 @@ package org.apache.blur.hive;
 
 import java.io.IOException;
 
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.OutputCommitter;
@@ -72,10 +70,9 @@ public class BlurHiveOutputCommitter extends OutputCommitter {
     Configuration configuration = context.getConfiguration();
     String connectionStr = configuration.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
     Iface client = BlurClient.getClient(connectionStr);
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
     String bulkId = BlurHiveOutputFormat.getBulkId(configuration);
     try {
-      client.bulkMutateFinish(tableDescriptor.getName(), bulkId, apply, false);
+      client.bulkMutateFinish(bulkId, apply, false);
     } catch (BlurException e) {
       throw new IOException(e);
     } catch (TException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
index 5c1187e..158f807 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
@@ -134,7 +134,7 @@ public class BlurHiveOutputFormat implements HiveOutputFormat<Text,
BlurRecord>
           List<RowMutation> batch = e.getValue();
           if (batch.size() >= max) {
             Iface client = BlurClient.getClient(server);
-            client.bulkMutateAddMultiple(table, bulkId, batch);
+            client.bulkMutateAddMultiple(bulkId, batch);
             batch.clear();
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/016a3865/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
index f9ff7a7..13a63dd 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
@@ -16,16 +16,13 @@
  */
 package org.apache.blur.hive;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.SerDe;
@@ -57,12 +54,9 @@ public class BlurHiveStorageHandler extends DefaultStorageHandler {
       String bulkId = UUID.randomUUID().toString();
       String connectionStr = jobConf.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
       Iface client = BlurClient.getClient(connectionStr);
-      TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(jobConf);
-      client.bulkMutateStart(tableDescriptor.getName(), bulkId);
+      client.bulkMutateStart(bulkId);
       BlurHiveOutputFormat.setBulkId(jobConf, bulkId);
       jobConf.setOutputCommitter(BlurHiveOutputCommitter.class);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
     } catch (BlurException e) {
       throw new RuntimeException(e);
     } catch (TException e) {


Mime
View raw message