incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Updating hive project to make use of the multi add in the bulk indexing api.
Date Tue, 30 Dec 2014 15:20:10 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 491f89305 -> 64d2d2a0e


Updating hive project to make use of the multi add in the bulk indexing api.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/64d2d2a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/64d2d2a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/64d2d2a0

Branch: refs/heads/master
Commit: 64d2d2a0e6ab8ca35af787485a42c2891612f648
Parents: 491f893
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Dec 30 10:18:56 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Dec 30 10:18:56 2014 -0500

----------------------------------------------------------------------
 contrib/blur-hive/pom.xml                       | 16 +++++
 .../apache/blur/hive/BlurHiveOutputFormat.java  | 61 +++++++++++++-------
 contrib/blur-hive/src/test/java/test.hive       |  3 +-
 3 files changed, 56 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/64d2d2a0/contrib/blur-hive/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/pom.xml b/contrib/blur-hive/pom.xml
index 945da1f..7a09502 100644
--- a/contrib/blur-hive/pom.xml
+++ b/contrib/blur-hive/pom.xml
@@ -108,6 +108,22 @@
 					<target>1.6</target>
 				</configuration>
 			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.2</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<createDependencyReducedPom>false</createDependencyReducedPom>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 	<profiles>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/64d2d2a0/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
index b2f9267..5c1187e 100644
--- a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
+++ b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
@@ -18,10 +18,11 @@ package org.apache.blur.hive;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.blur.manager.BlurPartitioner;
 import org.apache.blur.mapreduce.lib.BlurColumn;
@@ -86,6 +87,9 @@ public class BlurHiveOutputFormat implements HiveOutputFormat<Text, BlurRecord>
     return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
 
       private BlurPartitioner _blurPartitioner = new BlurPartitioner();
+      private Map<String, List<RowMutation>> _serverBatches = new ConcurrentHashMap<String,
List<RowMutation>>();
+      private int _capacity = 100;
+      private Map<String, String> _shardToServerLayout;
 
       @Override
       public void write(Writable w) throws IOException {
@@ -99,8 +103,24 @@ public class BlurHiveOutputFormat implements HiveOutputFormat<Text,
BlurRecord>
             toRecord(blurRecord)));
 
         try {
-          Iface client = getClient(rowId);
-          client.bulkMutateAdd(table, bulkId, rowMutation);
+          String server = getServer(rowId);
+          List<RowMutation> batch = _serverBatches.get(server);
+          if (batch == null) {
+            _serverBatches.put(server, batch = new ArrayList<RowMutation>(_capacity));
+          }
+          batch.add(rowMutation);
+          checkForFlush(_capacity);
+        } catch (BlurException e) {
+          throw new IOException(e);
+        } catch (TException e) {
+          throw new IOException(e);
+        }
+      }
+
+      @Override
+      public void close(boolean abort) throws IOException {
+        try {
+          checkForFlush(1);
         } catch (BlurException e) {
           throw new IOException(e);
         } catch (TException e) {
@@ -108,34 +128,31 @@ public class BlurHiveOutputFormat implements HiveOutputFormat<Text,
BlurRecord>
         }
       }
 
-      private Iface getClient(String rowId) throws BlurException, TException {
+      private void checkForFlush(int max) throws BlurException, TException {
+        for (Entry<String, List<RowMutation>> e : _serverBatches.entrySet())
{
+          String server = e.getKey();
+          List<RowMutation> batch = e.getValue();
+          if (batch.size() >= max) {
+            Iface client = BlurClient.getClient(server);
+            client.bulkMutateAddMultiple(table, bulkId, batch);
+            batch.clear();
+          }
+        }
+      }
+
+      private String getServer(String rowId) throws BlurException, TException {
         int shard = _blurPartitioner.getShard(rowId, numberOfShardsInTable);
         String shardId = ShardUtil.getShardName(shard);
-        return getClientFromShardId(table, shardId);
+        return getServerFromShardId(table, shardId);
       }
 
-      private Map<String, String> _shardToServerLayout;
-      private Map<String, Iface> _shardClients = new HashMap<String, Iface>();
-
-      private Iface getClientFromShardId(String table, String shardId) throws BlurException,
TException {
+      private String getServerFromShardId(String table, String shardId) throws BlurException,
TException {
         if (_shardToServerLayout == null) {
           _shardToServerLayout = controllerClient.shardServerLayout(table);
         }
-        return getClientFromConnectionStr(_shardToServerLayout.get(shardId));
+        return _shardToServerLayout.get(shardId);
       }
 
-      private Iface getClientFromConnectionStr(String connectionStr) {
-        Iface iface = _shardClients.get(connectionStr);
-        if (iface == null) {
-          _shardClients.put(connectionStr, iface = BlurClient.getClient(connectionStr));
-        }
-        return iface;
-      }
-
-      @Override
-      public void close(boolean abort) throws IOException {
-
-      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/64d2d2a0/contrib/blur-hive/src/test/java/test.hive
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/src/test/java/test.hive b/contrib/blur-hive/src/test/java/test.hive
index 64baa82..5f950a8 100644
--- a/contrib/blur-hive/src/test/java/test.hive
+++ b/contrib/blur-hive/src/test/java/test.hive
@@ -16,8 +16,7 @@
 set mapred.job.tracker=localhost:9001;
 set hive.metastore.warehouse.dir=hdfs://localhost:9000/user/hive/warehouse;
 
--- add jar file:///Users/amccurry/Development/incubator-blur/contrib/blur-hive/target/blur-hive-0.2.4-incubating-SNAPSHOT-hadoop1-all.jar;
-add jar file:///Users/amccurry/Development/incubator-blur/contrib/blur-hive/target/all-mod.zip;
+add jar file:///Users/amccurry/Development/incubator-blur/contrib/blur-hive/target/blur-hive-0.2.4-incubating-SNAPSHOT-hadoop1.jar;
 
 create database if not exists test;
 use test;


Mime
View raw message