Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 13E5A200B81 for ; Tue, 30 Aug 2016 03:57:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 10E1A160AB8; Tue, 30 Aug 2016 01:57:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DD02B160ACB for ; Tue, 30 Aug 2016 03:57:47 +0200 (CEST) Received: (qmail 89758 invoked by uid 500); 30 Aug 2016 01:57:47 -0000 Mailing-List: contact blur-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: blur-dev@incubator.apache.org Delivered-To: mailing list blur-commits@incubator.apache.org Received: (qmail 89542 invoked by uid 99); 30 Aug 2016 01:57:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Aug 2016 01:57:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0B6DE0BDD; Tue, 30 Aug 2016 01:57:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amccurry@apache.org To: blur-commits@incubator.apache.org Date: Tue, 30 Aug 2016 01:57:49 -0000 Message-Id: <80eca1efaa9249459e8fd740c1108cf1@git.apache.org> In-Reply-To: <427a1393cdb64f64a0347c3008659bf1@git.apache.org> References: <427a1393cdb64f64a0347c3008659bf1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/13] git commit: Third round of updates. archived-at: Tue, 30 Aug 2016 01:57:50 -0000 Third round of updates. Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ea50630a Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ea50630a Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ea50630a Branch: refs/heads/master Commit: ea50630a38d67675a61a916b144f3c0ce85d7f7a Parents: 0141656 Author: Aaron McCurry Authored: Sat May 7 13:11:54 2016 -0400 Committer: Aaron McCurry Committed: Sat May 7 13:11:54 2016 -0400 ---------------------------------------------------------------------- blur-indexer/pom.xml | 58 +++ blur-indexer/src/main/assemble/bin.xml | 45 ++ .../mapreduce/lib/update/BlurIndexCounter.java | 17 + .../mapreduce/lib/update/ClusterDriver.java | 362 ++++++++++++++ .../blur/mapreduce/lib/update/FasterDriver.java | 486 +++++++++++++++++++ .../update/HdfsConfigurationNamespaceMerge.java | 115 +++++ .../lib/update/InputSplitPruneUtil.java | 133 +++++ .../lib/update/LookupBuilderMapper.java | 18 + .../lib/update/LookupBuilderReducer.java | 165 +++++++ .../lib/update/MapperForExistingDataMod.java | 46 ++ .../MapperForExistingDataWithIndexLookup.java | 228 +++++++++ .../lib/update/MapperForNewDataMod.java | 82 ++++ .../lib/update/MergeSortRowIdMatcher.java | 372 ++++++++++++++ .../lib/update/PrunedBlurInputFormat.java | 57 +++ .../update/PrunedSequenceFileInputFormat.java | 59 +++ .../src/main/resources/blur-site.properties | 1 + .../src/main/resources/program-log4j.xml | 29 ++ blur-indexer/src/main/resources/test-log4j.xml | 46 ++ 18 files changed, 2319 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/pom.xml ---------------------------------------------------------------------- diff --git a/blur-indexer/pom.xml b/blur-indexer/pom.xml new file mode 100644 index 0000000..c7c1753 --- /dev/null +++ b/blur-indexer/pom.xml @@ -0,0 +1,58 @@ + + 4.0.0 + org.apache.blur + blur-indexer + 0.2.8 + blur-indexer + jar + + + 0.3.0.incubating.2.5.0.cdh5.3.3-SNAPSHOT + + + + org.apache.blur + blur-mapred + ${blur.version} + + + junit + junit + 4.9 + test + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + maven-assembly-plugin + + src/main/assemble/bin.xml + blur-indexer-${project.version} + + + + package + + single + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/assemble/bin.xml ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/assemble/bin.xml b/blur-indexer/src/main/assemble/bin.xml new file mode 100644 index 0000000..5fddd56 --- /dev/null +++ b/blur-indexer/src/main/assemble/bin.xml @@ -0,0 +1,45 @@ + + + tar.gz + + false + + + + true + blur-indexer-${project.version}/lib + false + + org.apache.blur:blur-indexer + org.apache.blur:* + org.apache.zookeeper:zookeeper + org.slf4j:slf4j-api + org.slf4j:slf4j-log4j12 + org.json:json + log4j:log4j + com.yammer.metrics:* + com.google.guava:guava + org.apache.httpcomponents:* + org.apache.lucene:* + com.spatial4j:spatial4j + commons-cli:commons-cli + org.eclipse.jetty:* + com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru + jline:jline + com.fasterxml.jackson.core:* + + + + + + + ${project.build.scriptSourceDirectory} + blur-indexer-${project.version} + + **/.empty + + + + http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java new file mode 100644 index 0000000..a9caabb --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java @@ -0,0 +1,17 @@ +package org.apache.blur.mapreduce.lib.update; + +public enum BlurIndexCounter { + + NEW_RECORDS, + ROW_IDS_FROM_INDEX, + ROW_IDS_TO_UPDATE_FROM_NEW_DATA, + ROW_IDS_FROM_NEW_DATA, + + INPUT_FORMAT_MAPPER, + INPUT_FORMAT_EXISTING_RECORDS, + + LOOKUP_MAPPER, + LOOKUP_MAPPER_EXISTING_RECORDS, + LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java new file mode 100644 index 0000000..d44adf1 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java @@ -0,0 +1,362 @@ +package org.apache.blur.mapreduce.lib.update; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.blur.log.Log; +import org.apache.blur.log.LogFactory; +import org.apache.blur.mapreduce.lib.BlurInputFormat; +import org.apache.blur.thirdparty.thrift_0_9_0.TException; +import org.apache.blur.thrift.BlurClient; +import org.apache.blur.thrift.generated.Blur.Iface; +import org.apache.blur.thrift.generated.BlurException; +import org.apache.blur.thrift.generated.TableDescriptor; +import org.apache.blur.thrift.generated.TableStats; +import org.apache.blur.utils.BlurConstants; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.log4j.LogManager; +import org.apache.log4j.xml.DOMConfigurator; + +public class ClusterDriver extends Configured implements Tool { + + private static final String BLUR_ENV = "blur.env"; + private static final Log LOG = LogFactory.getLog(ClusterDriver.class); + private static final String _SEP = "_"; + private static final String IMPORT = "import"; + + public static void main(String[] args) throws Exception { + String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE"); + System.out.println("Log file path [" + logFilePath + "]"); + System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath); + URL url = ClusterDriver.class.getResource("/program-log4j.xml"); + if (url != null) { + LOG.info("Reseting log4j config from classpath resource [{0}]", url); + LogManager.resetConfiguration(); + DOMConfigurator.configure(url); + } + int res = ToolRunner.run(new Configuration(), new ClusterDriver(), args); + System.exit(res); + } + + @Override + public int run(String[] args) throws Exception { + int c = 0; + final String blurEnv = args[c++]; + final String blurZkConnection = args[c++]; + final String extraConfig = args[c++]; + final int reducerMultiplier = Integer.parseInt(args[c++]); + final Configuration conf = getConf(); + + final ExecutorService service = Executors.newCachedThreadPool(); + final AtomicBoolean running = new AtomicBoolean(); + running.set(true); + + // Load configs for all filesystems. + Path path = new Path(extraConfig); + Configuration mergeHdfsConfigs = HdfsConfigurationNamespaceMerge.mergeHdfsConfigs(path.getFileSystem(conf), path); + conf.addResource(mergeHdfsConfigs); + conf.set(BlurConstants.BLUR_ZOOKEEPER_CONNECTION, blurZkConnection); + conf.set(BLUR_ENV, blurEnv); + + final Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection); + + stopAllExistingMRJobs(blurEnv, conf); + cleanUpOldImportDirs(client, conf); + moveInprogressDirsBackToNew(client, conf); + unlockLockedTables(client); + + Map> futures = new HashMap>(); + while (running.get()) { + LOG.debug("Starting index update check for blur cluster [" + blurZkConnection + "]."); + try { + List tableList = client.tableList(); + startMissingIndexerThreads(tableList, service, futures, blurZkConnection, conf, client, reducerMultiplier); + } catch (TException t) { + LOG.error("Unknown Blur Thrift Error, Retrying...", t); + } + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } + return 0; + } + + private void unlockLockedTables(Iface client) throws BlurException, TException { + List tableList = client.tableList(); + for (String table : tableList) { + TableDescriptor tableDescriptor = client.describe(table); + if (tableDescriptor.isEnabled()) { + unlockLockedTables(client, table); + } + } + } + + private void unlockLockedTables(Iface client, String table) throws BlurException, TException { + Map> listSnapshots = client.listSnapshots(table); + for (Entry> e : listSnapshots.entrySet()) { + List value = e.getValue(); + if (value.contains(FasterDriver.MRUPDATE_SNAPSHOT)) { + LOG.info("Unlocking table [{0}]", table); + client.removeSnapshot(table, FasterDriver.MRUPDATE_SNAPSHOT); + return; + } + } + } + + private void moveInprogressDirsBackToNew(Iface client, Configuration conf) throws BlurException, TException, + IOException { + List tableList = client.tableList(); + for (String table : tableList) { + String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table); + Path mrIncWorkingPath = new Path(mrIncWorkingPathStr); + Path newData = new Path(mrIncWorkingPath, FasterDriver.NEW); + Path inprogressData = new Path(mrIncWorkingPath, FasterDriver.INPROGRESS); + FileSystem fileSystem = inprogressData.getFileSystem(conf); + FileStatus[] listStatus = fileSystem.listStatus(inprogressData); + for (FileStatus fileStatus : listStatus) { + Path src = fileStatus.getPath(); + Path dst = new Path(newData, src.getName()); + if (fileSystem.rename(src, dst)) { + LOG.info("Moved [{0}] to [{1}] to be reprocessed.", src, dst); + } else { + LOG.error("Could not move [{0}] to [{1}] to be reprocessed.", src, dst); + } + } + } + } + + private void cleanUpOldImportDirs(Iface client, Configuration conf) throws BlurException, TException, IOException { + List tableList = client.tableList(); + for (String table : tableList) { + cleanUpOldImportDirs(client, conf, table); + } + } + + private void cleanUpOldImportDirs(Iface client, Configuration conf, String table) throws BlurException, TException, + IOException { + TableDescriptor descriptor = client.describe(table); + String tableUri = descriptor.getTableUri(); + Path tablePath = new Path(tableUri); + FileSystem fileSystem = tablePath.getFileSystem(getConf()); + Path importPath = new Path(tablePath, IMPORT); + if (fileSystem.exists(importPath)) { + for (FileStatus fileStatus : fileSystem.listStatus(importPath)) { + Path path = fileStatus.getPath(); + LOG.info("Removing failed import [{0}]", path); + fileSystem.delete(path, true); + } + } + } + + private void stopAllExistingMRJobs(String blurEnv, Configuration conf) throws YarnException, IOException, + InterruptedException { + Cluster cluster = new Cluster(conf); + JobStatus[] allJobStatuses = cluster.getAllJobStatuses(); + for (JobStatus jobStatus : allJobStatuses) { + if (jobStatus.isJobComplete()) { + continue; + } + String jobFile = jobStatus.getJobFile(); + JobID jobID = jobStatus.getJobID(); + Job job = cluster.getJob(jobID); + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + Configuration configuration = new Configuration(false); + Path path = new Path(jobFile); + Path makeQualified = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); + if (hasReadAccess(fileSystem, makeQualified)) { + try (FSDataInputStream in = fileSystem.open(makeQualified)) { + configuration.addResource(copy(in)); + } + String jobBlurEnv = configuration.get(BLUR_ENV); + LOG.info("Checking job [{0}] has env [{1}] current env set to [{2}]", jobID, jobBlurEnv, blurEnv); + if (blurEnv.equals(jobBlurEnv)) { + LOG.info("Killing running job [{0}]", jobID); + job.killJob(); + } + } + } + } + + private static InputStream copy(FSDataInputStream input) throws IOException { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + IOUtils.copy(input, outputStream); + return new ByteArrayInputStream(outputStream.toByteArray()); + } + } + + private static boolean hasReadAccess(FileSystem fileSystem, Path p) { + try { + fileSystem.access(p, FsAction.READ); + return true; + } catch (IOException e) { + return false; + } + } + + private Callable getCallable(final String blurZkConnection, final Configuration conf, final Iface client, + final String table, final int reducerMultiplier) { + return new Callable() { + @Override + public Void call() throws Exception { + String originalThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(table); + if (!isEnabled(client, table)) { + LOG.info("Table [" + table + "] is not enabled."); + return null; + } + waitForDataToLoad(client, table); + LOG.debug("Starting index update for table [" + table + "]."); + final String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table); + final String outputPathStr = getOutputPathStr(client, table); + Path path = new Path(outputPathStr); + FileSystem fileSystem = path.getFileSystem(getConf()); + + Configuration configuration = new Configuration(conf); + BlurInputFormat.setMaxNumberOfMaps(configuration, 10000); + + FasterDriver driver = new FasterDriver(); + driver.setConf(configuration); + try { + driver.run(new String[] { table, mrIncWorkingPathStr, outputPathStr, blurZkConnection, + Integer.toString(reducerMultiplier) }); + } finally { + if (fileSystem.exists(path)) { + fileSystem.delete(path, true); + } + } + return null; + } finally { + Thread.currentThread().setName(originalThreadName); + } + } + }; + } + + private void startMissingIndexerThreads(List tableList, ExecutorService service, + Map> futures, final String blurZkConnection, final Configuration conf, final Iface client, + int reducerMultiplier) throws BlurException, TException { + Set tables = new HashSet(tableList); + + // remove futures that are complete + for (String table : tables) { + Future future = futures.get(table); + if (future != null) { + if (future.isDone()) { + try { + future.get(); + } catch (InterruptedException e) { + LOG.error("Unknown error while processing table [" + table + "].", e); + } catch (ExecutionException e) { + LOG.error("Unknown error while processing table [" + table + "].", e.getCause()); + } + futures.remove(table); + } else { + LOG.info("Update for table [" + table + "] still running."); + } + } + } + + // start missing tables + for (String table : tables) { + if (!futures.containsKey(table)) { + if (isEnabled(client, table)) { + Future future = service.submit(getCallable(blurZkConnection, conf, client, table, reducerMultiplier)); + futures.put(table, future); + } + } + } + } + + public static void waitForDataToLoad(Iface client, String table) throws BlurException, TException, + InterruptedException { + if (isFullyLoaded(client.tableStats(table))) { + return; + } + while (true) { + TableStats tableStats = client.tableStats(table); + if (isFullyLoaded(tableStats)) { + LOG.info("Data load complete in table [" + table + "] [" + tableStats + "]"); + return; + } + LOG.info("Waiting for data to load in table [" + table + "] [" + tableStats + "]"); + Thread.sleep(5000); + } + } + + private static boolean isFullyLoaded(TableStats tableStats) { + if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount() == 0) { + return true; + } + return false; + } + + private boolean isEnabled(Iface client, String table) throws BlurException, TException { + TableDescriptor tableDescriptor = client.describe(table); + return tableDescriptor.isEnabled(); + } + + private void mkdirs(FileSystem fileSystem, Path path) throws IOException { + if (fileSystem.exists(path)) { + return; + } + LOG.info("Creating path [" + path + "]."); + if (!fileSystem.mkdirs(path)) { + LOG.error("Path [" + path + "] could not be created."); + } + } + + public static String getMRIncWorkingPathStr(Iface client, String table) throws BlurException, TException, IOException { + TableDescriptor descriptor = client.describe(table); + Map tableProperties = descriptor.getTableProperties(); + if (tableProperties != null) { + String workingPath = tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH); + if (workingPath != null) { + return workingPath; + } + } + throw new IOException("Table [" + table + "] does not have the property [" + + BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH + "] setup correctly."); + } + + private String getOutputPathStr(Iface client, String table) throws BlurException, TException, IOException { + TableDescriptor descriptor = client.describe(table); + String tableUri = descriptor.getTableUri(); + Path tablePath = new Path(tableUri); + FileSystem fileSystem = tablePath.getFileSystem(getConf()); + Path importPath = new Path(tablePath, IMPORT); + mkdirs(fileSystem, importPath); + return new Path(importPath, IMPORT + _SEP + System.currentTimeMillis() + _SEP + UUID.randomUUID().toString()) + .toString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java new file mode 100644 index 0000000..f43cba5 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.mapreduce.lib.update; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.blur.log.Log; +import org.apache.blur.log.LogFactory; +import org.apache.blur.mapreduce.lib.BlurInputFormat; +import org.apache.blur.mapreduce.lib.BlurOutputFormat; +import org.apache.blur.thirdparty.thrift_0_9_0.TException; +import org.apache.blur.thrift.BlurClient; +import org.apache.blur.thrift.generated.Blur.Iface; +import org.apache.blur.thrift.generated.BlurException; +import org.apache.blur.thrift.generated.TableDescriptor; +import org.apache.blur.thrift.generated.TableStats; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class FasterDriver extends Configured implements Tool { + + public static final String BLUR_UPDATE_ID = "blur.update.id"; + private static final String BLUR_EXEC_TYPE = "blur.exec.type"; + public static final String TMP = "tmp"; + + public enum EXEC { + MR_ONLY, MR_WITH_LOOKUP, AUTOMATIC + } + + public static final String MRUPDATE_SNAPSHOT = "mrupdate-snapshot"; + public static final String CACHE = "cache"; + public static final String COMPLETE = "complete"; + public static final String INPROGRESS = "inprogress"; + public static final String NEW = "new"; + private static final Log LOG = LogFactory.getLog(FasterDriver.class); + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new FasterDriver(), args); + System.exit(res); + } + + static class PartitionedInputResult { + final Path _partitionedInputData; + final Counters _counters; + final long[] _rowIdsFromNewData; + final long[] _rowIdsToUpdateFromNewData; + final long[] _rowIdsFromIndex; + + PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) { + _partitionedInputData = partitionedInputData; + _counters = counters; + _rowIdsFromNewData = new long[shards]; + _rowIdsToUpdateFromNewData = new long[shards]; + _rowIdsFromIndex = new long[shards]; + for (TaskReport tr : taskReports) { + int id = tr.getTaskID().getId(); + Counters taskCounters = tr.getTaskCounters(); + Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA); + _rowIdsFromNewData[id] = total.getValue(); + Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA); + _rowIdsToUpdateFromNewData[id] = update.getValue(); + Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX); + _rowIdsFromIndex[id] = index.getValue(); + } + } + + } + + @Override + public int run(String[] args) throws Exception { + int c = 0; + if (args.length < 5) { + System.err + .println("Usage Driver "); + return 1; + } + String table = args[c++]; + String mrIncWorkingPathStr = args[c++]; + String outputPathStr = args[c++]; + String blurZkConnection = args[c++]; + int reducerMultipler = Integer.parseInt(args[c++]); + for (; c < args.length; c++) { + String externalConfigFileToAdd = args[c]; + getConf().addResource(new Path(externalConfigFileToAdd)); + } + + Path outputPath = new Path(outputPathStr); + Path mrIncWorkingPath = new Path(mrIncWorkingPathStr); + FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf()); + + Path newData = new Path(mrIncWorkingPath, NEW); + Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS); + Path completeData = new Path(mrIncWorkingPath, COMPLETE); + Path fileCache = new Path(mrIncWorkingPath, CACHE); + Path tmpPathDontDelete = new Path(mrIncWorkingPath, TMP); + + Path tmpPath = new Path(tmpPathDontDelete, UUID.randomUUID().toString()); + + fileSystem.mkdirs(newData); + fileSystem.mkdirs(inprogressData); + fileSystem.mkdirs(completeData); + fileSystem.mkdirs(fileCache); + + List srcPathList = new ArrayList(); + for (FileStatus fileStatus : fileSystem.listStatus(newData)) { + srcPathList.add(fileStatus.getPath()); + } + if (srcPathList.isEmpty()) { + return 0; + } + + List inprogressPathList = new ArrayList(); + boolean success = false; + Iface client = null; + + EXEC exec = EXEC.valueOf(getConf().get(BLUR_EXEC_TYPE, EXEC.AUTOMATIC.name()).toUpperCase()); + + String uuid = UUID.randomUUID().toString(); + + try { + client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection); + TableDescriptor descriptor = client.describe(table); + Map tableProperties = descriptor.getTableProperties(); + String fastDir = tableProperties.get("blur.table.disable.fast.dir"); + if (fastDir == null || !fastDir.equals("true")) { + LOG.error("Table [{0}] has blur.table.disable.fast.dir enabled, not supported in fast MR update.", table); + return 1; + } + + waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT); + client.createSnapshot(table, MRUPDATE_SNAPSHOT); + TableStats tableStats = client.tableStats(table); + + inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList); + + switch (exec) { + case MR_ONLY: + success = runMrOnly(descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler); + break; + case MR_WITH_LOOKUP: + success = runMrWithLookup(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler, + tmpPath, tableStats, MRUPDATE_SNAPSHOT); + break; + case AUTOMATIC: + success = runAutomatic(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler, + tmpPath, tableStats, MRUPDATE_SNAPSHOT); + break; + default: + throw new RuntimeException("Exec type [" + exec + "] not supported."); + } + } finally { + if (success) { + LOG.info("Associate lookup cache with new data!"); + associateLookupCache(uuid, fileCache, outputPath); + LOG.info("Indexing job succeeded!"); + client.loadData(table, outputPathStr); + LOG.info("Load data called"); + movePathList(fileSystem, completeData, inprogressPathList); + LOG.info("Input data moved to complete"); + ClusterDriver.waitForDataToLoad(client, table); + LOG.info("Data loaded"); + } else { + LOG.error("Indexing job failed!"); + movePathList(fileSystem, newData, inprogressPathList); + } + fileSystem.delete(tmpPath, true); + if (client != null) { + client.removeSnapshot(table, MRUPDATE_SNAPSHOT); + } + } + + if (success) { + return 0; + } else { + return 1; + } + } + + private void associateLookupCache(String uuid, Path fileCache, Path outputPath) throws IOException { + FileSystem fileSystem = fileCache.getFileSystem(getConf()); + cleanupExtraFileFromSpecX(fileSystem, uuid, fileCache); + associateLookupCache(fileSystem, uuid, fileSystem.getFileStatus(fileCache), outputPath); + } + + private void cleanupExtraFileFromSpecX(FileSystem fileSystem, String uuid, Path fileCache) throws IOException { + FileStatus[] listStatus = fileSystem.listStatus(fileCache); + List uuidPaths = new ArrayList(); + for (FileStatus fs : listStatus) { + Path path = fs.getPath(); + if (fs.isDirectory()) { + cleanupExtraFileFromSpecX(fileSystem, uuid, path); + } else if (path.getName().startsWith(uuid)) { + uuidPaths.add(fs); + } + } + if (uuidPaths.size() > 1) { + deleteIncomplete(fileSystem, uuidPaths); + } + } + + private void deleteIncomplete(FileSystem fileSystem, List uuidPaths) throws IOException { + long max = 0; + FileStatus keeper = null; + for (FileStatus fs : uuidPaths) { + long len = fs.getLen(); + if (len > max) { + keeper = fs; + max = len; + } + } + for (FileStatus fs : uuidPaths) { + if (fs != keeper) { + LOG.info("Deleteing incomplete cache file [{0}]", fs.getPath()); + fileSystem.delete(fs.getPath(), false); + } + } + } + + private void associateLookupCache(FileSystem fileSystem, String uuid, FileStatus fileCache, Path outputPath) + throws IOException { + Path path = fileCache.getPath(); + if (fileCache.isDirectory()) { + FileStatus[] listStatus = fileSystem.listStatus(path); + for (FileStatus fs : listStatus) { + associateLookupCache(fileSystem, uuid, fs, outputPath); + } + } else if (path.getName().startsWith(uuid)) { + Path parent = path.getParent(); + String shardName = parent.getName(); + Path indexPath = findOutputDirPath(outputPath, shardName); + LOG.info("Path found for shard [{0}] outputPath [{1}]", shardName, outputPath); + String id = MergeSortRowIdMatcher.getIdForSingleSegmentIndex(getConf(), indexPath); + Path file = new Path(path.getParent(), id + ".seq"); + MergeSortRowIdMatcher.commitWriter(getConf(), file, path); + } + } + + private Path findOutputDirPath(Path outputPath, String shardName) throws IOException { + FileSystem fileSystem = outputPath.getFileSystem(getConf()); + Path shardPath = new Path(outputPath, shardName); + if (!fileSystem.exists(shardPath)) { + throw new IOException("Shard path [" + shardPath + "]"); + } + FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(".commit"); + } + }); + if (listStatus.length == 1) { + FileStatus fs = listStatus[0]; + return fs.getPath(); + } else { + throw new IOException("More than one sub dir [" + shardPath + "]"); + } + } + + private boolean runAutomatic(String uuid, TableDescriptor descriptor, List inprogressPathList, String table, + Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot) + throws ClassNotFoundException, IOException, InterruptedException { + PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot, + fileCache); + + Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]"); + + InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, result._rowIdsFromNewData); + InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, result._rowIdsToUpdateFromNewData); + InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, result._rowIdsFromIndex); + InputSplitPruneUtil.setTable(job, table); + + BlurInputFormat.setLocalCachePath(job, fileCache); + + // Existing data - This adds the copy data files first open and stream + // through all documents. + { + Path tablePath = new Path(descriptor.getTableUri()); + BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT); + MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, MapperForExistingDataMod.class); + } + + // Existing data - This adds the row id lookup + { + MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT); + FileInputFormat.addInputPath(job, result._partitionedInputData); + MultipleInputs.addInputPath(job, result._partitionedInputData, PrunedSequenceFileInputFormat.class, + MapperForExistingDataWithIndexLookup.class); + } + + // New Data + for (Path p : inprogressPathList) { + FileInputFormat.addInputPath(job, p); + MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class); + } + + BlurOutputFormat.setOutputPath(job, outputPath); + BlurOutputFormat.setupJob(job, descriptor); + + job.setReducerClass(UpdateReducer.class); + job.setMapOutputKeyClass(IndexKey.class); + job.setMapOutputValueClass(IndexValue.class); + job.setPartitionerClass(IndexKeyPartitioner.class); + job.setGroupingComparatorClass(IndexKeyWritableComparator.class); + + BlurOutputFormat.setReducerMultiplier(job, reducerMultipler); + + boolean success = job.waitForCompletion(true); + Counters counters = job.getCounters(); + LOG.info("Counters [" + counters + "]"); + return success; + } + + private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List inprogressPathList, String table, + Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot) + throws ClassNotFoundException, IOException, InterruptedException { + PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot, + fileCache); + + Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]"); + + MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT); + FileInputFormat.addInputPath(job, result._partitionedInputData); + MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class, + MapperForExistingDataWithIndexLookup.class); + + for (Path p : inprogressPathList) { + FileInputFormat.addInputPath(job, p); + MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class); + } + + BlurOutputFormat.setOutputPath(job, outputPath); + BlurOutputFormat.setupJob(job, descriptor); + + job.setReducerClass(UpdateReducer.class); + job.setMapOutputKeyClass(IndexKey.class); + job.setMapOutputValueClass(IndexValue.class); + job.setPartitionerClass(IndexKeyPartitioner.class); + job.setGroupingComparatorClass(IndexKeyWritableComparator.class); + + BlurOutputFormat.setReducerMultiplier(job, reducerMultipler); + + boolean success = job.waitForCompletion(true); + Counters counters = job.getCounters(); + LOG.info("Counters [" + counters + "]"); + return success; + } + + private boolean runMrOnly(TableDescriptor descriptor, List inprogressPathList, String table, Path fileCache, + Path outputPath, int reducerMultipler) throws IOException, ClassNotFoundException, InterruptedException { + Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]"); + Path tablePath = new Path(descriptor.getTableUri()); + BlurInputFormat.setLocalCachePath(job, fileCache); + BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT); + MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingDataMod.class); + + for (Path p : inprogressPathList) { + FileInputFormat.addInputPath(job, p); + MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class); + } + + BlurOutputFormat.setOutputPath(job, outputPath); + BlurOutputFormat.setupJob(job, descriptor); + + job.setReducerClass(UpdateReducer.class); + job.setMapOutputKeyClass(IndexKey.class); + job.setMapOutputValueClass(IndexValue.class); + job.setPartitionerClass(IndexKeyPartitioner.class); + job.setGroupingComparatorClass(IndexKeyWritableComparator.class); + + BlurOutputFormat.setReducerMultiplier(job, reducerMultipler); + + boolean success = job.waitForCompletion(true); + Counters counters = job.getCounters(); + LOG.info("Counters [" + counters + "]"); + return success; + } + + private PartitionedInputResult buildPartitionedInputData(String uuid, Path tmpPath, TableDescriptor descriptor, + List inprogressPathList, String snapshot, Path fileCachePath) throws IOException, ClassNotFoundException, + InterruptedException { + Job job = Job.getInstance(getConf(), "Partitioning data for table [" + descriptor.getName() + "]"); + job.getConfiguration().set(BLUR_UPDATE_ID, uuid); + + // Needed for the bloom filter path information. + BlurOutputFormat.setTableDescriptor(job, descriptor); + BlurInputFormat.setLocalCachePath(job, fileCachePath); + MapperForExistingDataWithIndexLookup.setSnapshot(job, snapshot); + + for (Path p : inprogressPathList) { + FileInputFormat.addInputPath(job, p); + } + Path outputPath = new Path(tmpPath, UUID.randomUUID().toString()); + job.setJarByClass(getClass()); + job.setMapperClass(LookupBuilderMapper.class); + job.setReducerClass(LookupBuilderReducer.class); + + int shardCount = descriptor.getShardCount(); + job.setNumReduceTasks(shardCount); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(BooleanWritable.class); + FileOutputFormat.setOutputPath(job, outputPath); + if (job.waitForCompletion(true)) { + return new PartitionedInputResult(outputPath, job.getCounters(), shardCount, job.getTaskReports(TaskType.REDUCE)); + } else { + throw new IOException("Partitioning failed!"); + } + } + + private void waitForOtherSnapshotsToBeRemoved(Iface client, String table, String snapshot) throws BlurException, + TException, InterruptedException { + while (true) { + Map> listSnapshots = client.listSnapshots(table); + boolean mrupdateSnapshots = false; + for (Entry> e : listSnapshots.entrySet()) { + List value = e.getValue(); + if (value.contains(snapshot)) { + mrupdateSnapshots = true; + } + } + if (!mrupdateSnapshots) { + return; + } else { + LOG.info(snapshot + " Snapshot for table [{0}] already exists", table); + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + LOG.info("Retrying"); + } + } + } + + private List movePathList(FileSystem fileSystem, Path dstDir, List lst) throws IOException { + List result = new ArrayList(); + for (Path src : lst) { + Path dst = new Path(dstDir, src.getName()); + if (fileSystem.rename(src, dst)) { + LOG.info("Moving [{0}] to [{1}]", src, dst); + result.add(dst); + } else { + LOG.error("Could not move [{0}] to [{1}]", src, dst); + } + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java new file mode 100644 index 0000000..34d3e99 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java @@ -0,0 +1,115 @@ +package org.apache.blur.mapreduce.lib.update; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class HdfsConfigurationNamespaceMerge { + + private static final String DFS_NAMESERVICES = "dfs.nameservices"; + private static final Log LOG = LogFactory.getLog(HdfsConfigurationNamespaceMerge.class); + + public static void main(String[] args) throws IOException { + Path p = new Path("./src/main/scripts/conf/hdfs"); + + Configuration configuration = mergeHdfsConfigs(p.getFileSystem(new Configuration()), p); + + // configuration.writeXml(System.out); + + Collection nameServices = configuration.getStringCollection(DFS_NAMESERVICES); + for (String name : nameServices) { + Path path = new Path("hdfs://" + name + "/"); + FileSystem fileSystem = path.getFileSystem(configuration); + FileStatus[] listStatus = fileSystem.listStatus(path); + for (FileStatus fileStatus : listStatus) { + System.out.println(fileStatus.getPath()); + } + } + } + + private static boolean checkHostName(String host) { + try { + InetAddress.getAllByName(host); + return true; + } catch (UnknownHostException e) { + LOG.warn("Host not found [" + host + "]"); + return false; + } + } + + public static Configuration mergeHdfsConfigs(FileSystem fs, Path p) throws IOException { + List configList = new ArrayList(); + gatherConfigs(fs, p, configList); + return merge(configList); + } + + public static Configuration merge(List configList) throws IOException { + Configuration merge = new Configuration(false); + Set nameServices = new HashSet(); + for (Configuration configuration : configList) { + String nameService = configuration.get(DFS_NAMESERVICES); + if (nameServices.contains(nameService)) { + throw new IOException("Multiple confs define namespace [" + nameService + "]"); + } + nameServices.add(nameService); + if (shouldAdd(configuration, nameService)) { + for (Entry e : configuration) { + String key = e.getKey(); + if (key.contains(nameService)) { + String value = e.getValue(); + merge.set(key, value); + } + } + } + } + merge.set(DFS_NAMESERVICES, StringUtils.join(nameServices, ",")); + return merge; + } + + private static boolean shouldAdd(Configuration configuration, String nameService) { + for (Entry e : configuration) { + String key = e.getKey(); + if (key.contains(nameService) && key.startsWith("dfs.namenode.rpc-address.")) { + return checkHostName(getHost(e.getValue())); + } + } + return false; + } + + private static String getHost(String host) { + return host.substring(0, host.indexOf(":")); + } + + public static void gatherConfigs(FileSystem fs, Path p, List configList) throws IOException { + if (fs.isFile(p)) { + if (p.getName().endsWith(".xml")) { + LOG.info("Loading file [" + p + "]"); + Configuration configuration = new Configuration(false); + configuration.addResource(p); + configList.add(configuration); + } else { + LOG.info("Skipping file [" + p + "]"); + } + } else { + FileStatus[] listStatus = fs.listStatus(p); + for (FileStatus fileStatus : listStatus) { + gatherConfigs(fs, fileStatus.getPath(), configList); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java new file mode 100644 index 0000000..e295073 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java @@ -0,0 +1,133 @@ +package org.apache.blur.mapreduce.lib.update; + +import org.apache.blur.utils.ShardUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; + +public class InputSplitPruneUtil { + + private static final String BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.update.from.new.data.count"; + private static final String BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.from.new.data.count."; + private static final String BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX = "blur.lookup.rowid.from.index.count."; + + private static final String BLUR_LOOKUP_TABLE = "blur.lookup.table"; + private static final String BLUR_LOOKUP_RATIO_PER_SHARD = "blur.lookup.ratio.per.shard"; + private static final String BLUR_LOOKUP_MAX_TOTAL_PER_SHARD = "blur.lookup.max.total.per.shard"; + + private static final double DEFAULT_LOOKUP_RATIO = 0.5; + private static final long DEFAULT_LOOKUP_MAX_TOTAL = Long.MAX_VALUE; + + public static boolean shouldLookupExecuteOnShard(Configuration configuration, String table, int shard) { + double lookupRatio = getLookupRatio(configuration); + long maxLookupCount = getMaxLookupCount(configuration); + long rowIdFromNewDataCount = getBlurLookupRowIdFromNewDataCount(configuration, table, shard); + long rowIdUpdateFromNewDataCount = getBlurLookupRowIdUpdateFromNewDataCount(configuration, table, shard); + long rowIdFromIndexCount = getBlurLookupRowIdFromIndexCount(configuration, table, shard); + return shouldLookupRun(rowIdFromIndexCount, rowIdFromNewDataCount, rowIdUpdateFromNewDataCount, lookupRatio, + maxLookupCount); + } + + private static boolean shouldLookupRun(long rowIdFromIndexCount, long rowIdFromNewDataCount, + long rowIdUpdateFromNewDataCount, double lookupRatio, long maxLookupCount) { + if (rowIdUpdateFromNewDataCount > maxLookupCount) { + return false; + } + double d = (double) rowIdUpdateFromNewDataCount / (double) rowIdFromIndexCount; + if (d <= lookupRatio) { + return true; + } + return false; + } + + public static double getLookupRatio(Configuration configuration) { + return configuration.getDouble(BLUR_LOOKUP_RATIO_PER_SHARD, DEFAULT_LOOKUP_RATIO); + } + + private static long getMaxLookupCount(Configuration configuration) { + return configuration.getLong(BLUR_LOOKUP_MAX_TOTAL_PER_SHARD, DEFAULT_LOOKUP_MAX_TOTAL); + } + + public static void setTable(Job job, String table) { + setTable(job.getConfiguration(), table); + } + + public static void setTable(Configuration configuration, String table) { + configuration.set(BLUR_LOOKUP_TABLE, table); + } + + public static String getTable(Configuration configuration) { + return configuration.get(BLUR_LOOKUP_TABLE); + } + + public static String getBlurLookupRowIdFromIndexCountName(String table) { + return BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX + table; + } + + public static String getBlurLookupRowIdFromNewDataCountName(String table) { + return BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX + table; + } + + public static String getBlurLookupRowIdUpdateFromNewDataCountName(String table) { + return BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX + table; + } + + public static long getBlurLookupRowIdUpdateFromNewDataCount(Configuration configuration, String table, int shard) { + String[] strings = configuration.getStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table)); + return getCount(strings, shard); + } + + public static long getBlurLookupRowIdFromNewDataCount(Configuration configuration, String table, int shard) { + String[] strings = configuration.getStrings(getBlurLookupRowIdFromNewDataCountName(table)); + return getCount(strings, shard); + } + + public static long getBlurLookupRowIdFromIndexCount(Configuration configuration, String table, int shard) { + String[] strings = configuration.getStrings(getBlurLookupRowIdFromIndexCountName(table)); + return getCount(strings, shard); + } + + public static void setBlurLookupRowIdFromNewDataCounts(Job job, String table, long[] counts) { + setBlurLookupRowIdFromNewDataCounts(job.getConfiguration(), table, counts); + } + + public static void setBlurLookupRowIdFromNewDataCounts(Configuration configuration, String table, long[] counts) { + configuration.setStrings(getBlurLookupRowIdFromNewDataCountName(table), toStrings(counts)); + } + + public static void setBlurLookupRowIdUpdateFromNewDataCounts(Job job, String table, long[] counts) { + setBlurLookupRowIdUpdateFromNewDataCounts(job.getConfiguration(), table, counts); + } + + public static void setBlurLookupRowIdUpdateFromNewDataCounts(Configuration configuration, String table, long[] counts) { + configuration.setStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table), toStrings(counts)); + } + + public static void setBlurLookupRowIdFromIndexCounts(Job job, String table, long[] counts) { + setBlurLookupRowIdFromIndexCounts(job.getConfiguration(), table, counts); + } + + public static void setBlurLookupRowIdFromIndexCounts(Configuration configuration, String table, long[] counts) { + configuration.setStrings(getBlurLookupRowIdFromIndexCountName(table), toStrings(counts)); + } + + public static long getCount(String[] strings, int shard) { + return Long.parseLong(strings[shard]); + } + + public static int getShardFromDirectoryPath(Path path) { + return ShardUtil.getShardIndex(path.getName()); + } + + public static String[] toStrings(long[] counts) { + if (counts == null) { + return null; + } + String[] strs = new String[counts.length]; + for (int i = 0; i < counts.length; i++) { + strs[i] = Long.toString(counts[i]); + } + return strs; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java new file mode 100644 index 0000000..ac0d91f --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java @@ -0,0 +1,18 @@ +package org.apache.blur.mapreduce.lib.update; + +import java.io.IOException; + +import org.apache.blur.mapreduce.lib.BlurRecord; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +public class LookupBuilderMapper extends Mapper { + + @Override + protected void map(Text key, BlurRecord value, Mapper.Context context) + throws IOException, InterruptedException { + context.write(new Text(value.getRowId()), NullWritable.get()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java new file mode 100644 index 0000000..1983cae --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java @@ -0,0 +1,165 @@ +package org.apache.blur.mapreduce.lib.update; + +import java.io.IOException; +import java.util.List; + +import org.apache.blur.BlurConfiguration; +import org.apache.blur.manager.BlurPartitioner; +import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy; +import org.apache.blur.mapreduce.lib.BlurInputFormat; +import org.apache.blur.mapreduce.lib.BlurOutputFormat; +import org.apache.blur.mapreduce.lib.update.MergeSortRowIdMatcher.Action; +import org.apache.blur.store.BlockCacheDirectoryFactoryV2; +import org.apache.blur.store.hdfs.HdfsDirectory; +import org.apache.blur.thrift.generated.TableDescriptor; +import org.apache.blur.utils.BlurConstants; +import org.apache.blur.utils.ShardUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.Terms; +import org.apache.lucene.store.Directory; + +import com.google.common.io.Closer; + +public class LookupBuilderReducer extends Reducer { + + public static final String BLUR_CACHE_DIR_TOTAL_BYTES = "blur.cache.dir.total.bytes"; + private Counter _rowIds; + private Counter _rowIdsToUpdate; + + private MergeSortRowIdMatcher _matcher; + private int _numberOfShardsInTable; + private Configuration _configuration; + private String _snapshot; + private Path _tablePath; + private Counter _rowIdsFromIndex; + private long _totalNumberOfBytes; + private Action _action; + private Closer _closer; + private Path _cachePath; + private String _table; + private Writer _writer; + + @Override + protected void setup(Reducer.Context context) throws IOException, + InterruptedException { + _configuration = context.getConfiguration(); + _rowIds = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA); + _rowIdsToUpdate = context.getCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA); + _rowIdsFromIndex = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX); + TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration); + _numberOfShardsInTable = tableDescriptor.getShardCount(); + _tablePath = new Path(tableDescriptor.getTableUri()); + _snapshot = MapperForExistingDataWithIndexLookup.getSnapshot(_configuration); + _totalNumberOfBytes = _configuration.getLong(BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024); + _cachePath = BlurInputFormat.getLocalCachePath(_configuration); + _table = tableDescriptor.getName(); + _closer = Closer.create(); + } + + @Override + protected void reduce(Text rowId, Iterable nothing, + Reducer.Context context) throws IOException, InterruptedException { + if (_matcher == null) { + _matcher = getMergeSortRowIdMatcher(rowId, context); + } + if (_writer == null) { + _writer = getRowIdWriter(rowId, context); + } + _writer.append(rowId, NullWritable.get()); + _rowIds.increment(1); + if (_action == null) { + _action = new Action() { + @Override + public void found(Text rowId) throws IOException { + _rowIdsToUpdate.increment(1); + try { + context.write(rowId, new BooleanWritable(true)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + }; + } + _matcher.lookup(rowId, _action); + } + + private Writer getRowIdWriter(Text rowId, Reducer.Context context) + throws IOException { + BlurPartitioner blurPartitioner = new BlurPartitioner(); + int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable); + String shardName = ShardUtil.getShardName(shard); + Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName); + Configuration configuration = context.getConfiguration(); + String uuid = configuration.get(FasterDriver.BLUR_UPDATE_ID); + Path tmpPath = new Path(cachePath, uuid + "_" + getAttemptString(context)); + return _closer.register(MergeSortRowIdMatcher.createWriter(_configuration, tmpPath)); + } + + private String getAttemptString(Reducer.Context context) { + TaskAttemptID taskAttemptID = context.getTaskAttemptID(); + return taskAttemptID.toString(); + } + + @Override + protected void cleanup(Reducer.Context context) throws IOException, + InterruptedException { + _closer.close(); + } + + private MergeSortRowIdMatcher getMergeSortRowIdMatcher(Text rowId, + Reducer.Context context) throws IOException { + BlurPartitioner blurPartitioner = new BlurPartitioner(); + int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable); + String shardName = ShardUtil.getShardName(shard); + + Path shardPath = new Path(_tablePath, shardName); + HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath); + SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration, + SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath)); + Long generation = policy.getGeneration(_snapshot); + if (generation == null) { + hdfsDirectory.close(); + throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]"); + } + + BlurConfiguration bc = new BlurConfiguration(); + BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc, + _totalNumberOfBytes); + _closer.register(blockCacheDirectoryFactoryV2); + Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null); + List listCommits = DirectoryReader.listCommits(dir); + IndexCommit indexCommit = MapperForExistingDataWithIndexLookup.findIndexCommit(listCommits, generation, shardPath); + DirectoryReader reader = DirectoryReader.open(indexCommit); + _rowIdsFromIndex.setValue(getTotalNumberOfRowIds(reader)); + + Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName); + return new MergeSortRowIdMatcher(dir, generation, _configuration, cachePath, context); + } + + private long getTotalNumberOfRowIds(DirectoryReader reader) throws IOException { + long total = 0; + List leaves = reader.leaves(); + for (AtomicReaderContext context : leaves) { + AtomicReader atomicReader = context.reader(); + Terms terms = atomicReader.terms(BlurConstants.ROW_ID); + long expectedInsertions = terms.size(); + if (expectedInsertions < 0) { + return -1; + } + total += expectedInsertions; + } + return total; + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java new file mode 100644 index 0000000..bf86e19 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.mapreduce.lib.update; + +import java.io.IOException; + +import org.apache.blur.mapreduce.lib.BlurRecord; +import org.apache.blur.mapreduce.lib.TableBlurRecord; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Mapper; + +public class MapperForExistingDataMod extends Mapper { + + private Counter _existingRecords; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + Counter counter = context.getCounter(BlurIndexCounter.INPUT_FORMAT_MAPPER); + counter.increment(1); + _existingRecords = context.getCounter(BlurIndexCounter.INPUT_FORMAT_EXISTING_RECORDS); + } + + @Override + protected void map(Text key, TableBlurRecord value, Context context) throws IOException, InterruptedException { + BlurRecord blurRecord = value.getBlurRecord(); + IndexKey oldDataKey = IndexKey.oldData(blurRecord.getRowId(), blurRecord.getRecordId()); + context.write(oldDataKey, new IndexValue(blurRecord)); + _existingRecords.increment(1L); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java new file mode 100644 index 0000000..0e2fe66 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.mapreduce.lib.update; + +import java.io.IOException; +import java.util.List; + +import org.apache.blur.BlurConfiguration; +import org.apache.blur.manager.BlurPartitioner; +import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy; +import org.apache.blur.mapreduce.lib.BlurOutputFormat; +import org.apache.blur.mapreduce.lib.BlurRecord; +import org.apache.blur.store.BlockCacheDirectoryFactoryV2; +import org.apache.blur.store.hdfs.HdfsDirectory; +import org.apache.blur.thrift.generated.Column; +import org.apache.blur.thrift.generated.FetchRecordResult; +import org.apache.blur.thrift.generated.Record; +import org.apache.blur.thrift.generated.TableDescriptor; +import org.apache.blur.utils.BlurConstants; +import org.apache.blur.utils.RowDocumentUtil; +import org.apache.blur.utils.ShardUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; + +import com.google.common.io.Closer; + +public class MapperForExistingDataWithIndexLookup extends Mapper { + + private static final Log LOG = LogFactory.getLog(MapperForExistingDataWithIndexLookup.class); + + private static final String BLUR_SNAPSHOT = "blur.snapshot"; + private Counter _existingRecords; + private Counter _rowLookup; + private BlurPartitioner _blurPartitioner; + private Path _tablePath; + private int _numberOfShardsInTable; + private Configuration _configuration; + private String _snapshot; + + private int _indexShard = -1; + private DirectoryReader _reader; + private IndexSearcher _indexSearcher; + private long _totalNumberOfBytes; + private Closer _closer; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + Counter counter = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER); + counter.increment(1); + + _configuration = context.getConfiguration(); + _existingRecords = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_EXISTING_RECORDS); + _rowLookup = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT); + _blurPartitioner = new BlurPartitioner(); + TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration); + _numberOfShardsInTable = tableDescriptor.getShardCount(); + _tablePath = new Path(tableDescriptor.getTableUri()); + _snapshot = getSnapshot(_configuration); + _totalNumberOfBytes = _configuration.getLong(LookupBuilderReducer.BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024); + _closer = Closer.create(); + } + + @Override + protected void map(Text key, BooleanWritable value, Context context) throws IOException, InterruptedException { + if (value.get()) { + String rowId = key.toString(); + LOG.debug("Looking up rowid [" + rowId + "]"); + _rowLookup.increment(1); + IndexSearcher indexSearcher = getIndexSearcher(rowId); + Term term = new Term(BlurConstants.ROW_ID, rowId); + RowCollector collector = getCollector(context); + indexSearcher.search(new TermQuery(term), collector); + LOG.debug("Looking for rowid [" + rowId + "] has [" + collector.records + "] records"); + } + } + + @Override + protected void cleanup(Mapper.Context context) throws IOException, + InterruptedException { + _closer.close(); + } + + static class RowCollector extends Collector { + + private AtomicReader reader; + private Mapper.Context _context; + private Counter _existingRecords; + int records; + + RowCollector(Mapper.Context context, Counter existingRecords) { + _context = context; + _existingRecords = existingRecords; + } + + @Override + public void setScorer(Scorer scorer) throws IOException { + + } + + @Override + public void setNextReader(AtomicReaderContext context) throws IOException { + reader = context.reader(); + } + + @Override + public void collect(int doc) throws IOException { + Document document = reader.document(doc); + FetchRecordResult result = RowDocumentUtil.getRecord(document); + String rowid = result.getRowid(); + Record record = result.getRecord(); + String recordId = record.getRecordId(); + IndexKey oldDataKey = IndexKey.oldData(rowid, recordId); + try { + _context.write(oldDataKey, new IndexValue(toBlurRecord(rowid, record))); + } catch (InterruptedException e) { + throw new IOException(e); + } + _existingRecords.increment(1L); + } + + private BlurRecord toBlurRecord(String rowId, Record record) { + BlurRecord blurRecord = new BlurRecord(); + blurRecord.setRowId(rowId); + blurRecord.setRecordId(record.getRecordId()); + blurRecord.setFamily(record.getFamily()); + List columns = record.getColumns(); + for (Column column : columns) { + blurRecord.addColumn(column.getName(), column.getValue()); + } + return blurRecord; + } + + @Override + public boolean acceptsDocsOutOfOrder() { + return false; + } + } + + private RowCollector getCollector(Mapper.Context context) { + return new RowCollector(context, _existingRecords); + } + + private IndexSearcher getIndexSearcher(String rowId) throws IOException { + int shard = _blurPartitioner.getShard(rowId, _numberOfShardsInTable); + if (_indexSearcher != null) { + if (shard != _indexShard) { + throw new IOException("Input data is not partitioned correctly."); + } + return _indexSearcher; + } else { + _indexShard = shard; + Path shardPath = new Path(_tablePath, ShardUtil.getShardName(_indexShard)); + HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath); + SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration, + SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath)); + Long generation = policy.getGeneration(_snapshot); + if (generation == null) { + hdfsDirectory.close(); + throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]"); + } + + BlurConfiguration bc = new BlurConfiguration(); + BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc, + _totalNumberOfBytes); + _closer.register(blockCacheDirectoryFactoryV2); + Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null); + + List listCommits = DirectoryReader.listCommits(dir); + IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardPath); + _reader = DirectoryReader.open(indexCommit); + return _indexSearcher = new IndexSearcher(_reader); + } + } + + public static IndexCommit findIndexCommit(List listCommits, long generation, Path shardDir) + throws IOException { + for (IndexCommit commit : listCommits) { + if (commit.getGeneration() == generation) { + return commit; + } + } + throw new IOException("Generation [" + generation + "] not found in shard [" + shardDir + "]"); + } + + public static void setSnapshot(Job job, String snapshot) { + setSnapshot(job.getConfiguration(), snapshot); + } + + public static void setSnapshot(Configuration configuration, String snapshot) { + configuration.set(BLUR_SNAPSHOT, snapshot); + } + + public static String getSnapshot(Configuration configuration) { + return configuration.get(BLUR_SNAPSHOT); + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java new file mode 100644 index 0000000..d91d1f5 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.mapreduce.lib.update; + +import java.io.IOException; +import java.lang.reflect.Field; + +import org.apache.blur.mapreduce.lib.BlurRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +public class MapperForNewDataMod extends Mapper { + + private static final IndexValue EMPTY_RECORD = new IndexValue(); + private long _timestamp; + private Counter _newRecords; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + InputSplit inputSplit = context.getInputSplit(); + FileSplit fileSplit = getFileSplit(inputSplit); + Path path = fileSplit.getPath(); + FileSystem fileSystem = path.getFileSystem(context.getConfiguration()); + FileStatus fileStatus = fileSystem.getFileStatus(path); + _timestamp = fileStatus.getModificationTime(); + _newRecords = context.getCounter(BlurIndexCounter.NEW_RECORDS); + } + + private FileSplit getFileSplit(InputSplit inputSplit) throws IOException { + if (inputSplit instanceof FileSplit) { + return (FileSplit) inputSplit; + } + if (inputSplit.getClass().getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) { + try { + Field declaredField = inputSplit.getClass().getDeclaredField("inputSplit"); + declaredField.setAccessible(true); + return getFileSplit((InputSplit) declaredField.get(inputSplit)); + } catch (NoSuchFieldException e) { + throw new IOException(e); + } catch (SecurityException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } else { + throw new IOException("Unknown input split type [" + inputSplit + "] [" + inputSplit.getClass() + "]"); + } + } + + @Override + protected void map(Text key, BlurRecord blurRecord, Context context) throws IOException, InterruptedException { + IndexKey newDataKey = IndexKey.newData(blurRecord.getRowId(), blurRecord.getRecordId(), _timestamp); + context.write(newDataKey, new IndexValue(blurRecord)); + _newRecords.increment(1L); + + IndexKey newDataMarker = IndexKey.newDataMarker(blurRecord.getRowId()); + context.write(newDataMarker, EMPTY_RECORD); + } + +}