incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/5] git commit: Fixed BLUR-437 BLUR-435
Date Mon, 22 Jun 2015 13:15:37 GMT
Fixed BLUR-437 BLUR-435


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4c8ec52c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4c8ec52c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4c8ec52c

Branch: refs/heads/blur-0.2.4-parcel
Commit: 4c8ec52c7cc4a0537bfa3ce10f66d84e7a754da2
Parents: d6f46d2
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jun 22 09:12:40 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jun 22 09:12:40 2015 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/update/Driver.java       | 39 +++++++++++++++++---
 1 file changed, 34 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4c8ec52c/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
index fb7d570..9051daf 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
@@ -19,14 +19,18 @@ package org.apache.blur.mapreduce.lib.update;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.mapreduce.lib.BlurInputFormat;
 import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -43,6 +47,7 @@ import org.apache.hadoop.util.ToolRunner;
 
 public class Driver extends Configured implements Tool {
 
+  public static final String MRUPDATE_SNAPSHOT = "mrupdate-snapshot";
   public static final String CACHE = "cache";
   public static final String COMPLETE = "complete";
   public static final String INPROGRESS = "inprogress";
@@ -95,19 +100,21 @@ public class Driver extends Configured implements Tool {
 
     List<Path> inprogressPathList = new ArrayList<Path>();
     boolean success = false;
+    Iface client = null;
     try {
       inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
 
       Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
 
-      Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
-      String snapshotId = UUID.randomUUID().toString();
-      client.createSnapshot(table, snapshotId);
+      waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
+
+      client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+      client.createSnapshot(table, MRUPDATE_SNAPSHOT);
       TableDescriptor descriptor = client.describe(table);
       Path tablePath = new Path(descriptor.getTableUri());
 
       BlurInputFormat.setLocalCachePath(job, fileCache);
-      BlurInputFormat.addTable(job, descriptor, snapshotId);
+      BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
       MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingData.class);
       for (Path p : inprogressPathList) {
         FileInputFormat.addInputPath(job, p);
@@ -137,6 +144,7 @@ public class Driver extends Configured implements Tool {
         LOG.error("Indexing job failed!");
         movePathList(fileSystem, newData, inprogressPathList);
       }
+      client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
     }
 
     if (success) {
@@ -147,6 +155,27 @@ public class Driver extends Configured implements Tool {
 
   }
 
+  private void waitForOtherSnapshotsToBeRemoved(Iface client, String table, String snapshot)
throws BlurException,
+      TException, InterruptedException {
+    while (true) {
+      Map<String, List<String>> listSnapshots = client.listSnapshots(table);
+      boolean mrupdateSnapshots = false;
+      for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
+        List<String> value = e.getValue();
+        if (value.contains(snapshot)) {
+          mrupdateSnapshots = true;
+        }
+      }
+      if (!mrupdateSnapshots) {
+        return;
+      } else {
+        LOG.info(snapshot + " Snapshot for table [{0}] already exists", table);
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+        LOG.info("Retrying");
+      }
+    }
+  }
+
   private List<Path> movePathList(FileSystem fileSystem, Path dstDir, List<Path>
lst) throws IOException {
     List<Path> result = new ArrayList<Path>();
     for (Path src : lst) {


Mime
View raw message