phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/2] phoenix git commit: PHOENIX-2950 Implement CRON job to start async index builds and restart failed jobs (Ravi Kishore Valeti)
Date Mon, 20 Jun 2016 17:43:27 GMT
PHOENIX-2950 Implement CRON job to start async index builds and restart failed jobs (Ravi Kishore
Valeti)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/72bbfc46
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/72bbfc46
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/72bbfc46

Branch: refs/heads/4.x-HBase-1.0
Commit: 72bbfc46657d559562ffc397da0aeafe66b8a0f6
Parents: f3d78e1
Author: James Taylor <jamestaylor@apache.org>
Authored: Mon Jun 20 10:43:40 2016 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Mon Jun 20 10:46:29 2016 -0700

----------------------------------------------------------------------
 .../phoenix/mapreduce/index/IndexTool.java      |   2 +-
 .../index/automation/PhoenixAsyncIndex.java     |  75 +++++
 .../index/automation/PhoenixMRJobCallable.java  |  73 +++++
 .../index/automation/PhoenixMRJobSubmitter.java | 281 +++++++++++++++++++
 .../index/automation/YarnApplication.java       | 208 ++++++++++++++
 .../apache/phoenix/util/PhoenixMRJobUtil.java   | 233 +++++++++++++++
 .../phoenix/util/ZKBasedMasterElectionUtil.java |  70 +++++
 .../index/automated/TestMRJobSubmitter.java     | 137 +++++++++
 8 files changed, 1078 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/72bbfc46/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index db8aba9..576dbd3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -97,7 +97,7 @@ public class IndexTool extends Configured implements Tool {
     private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true,
             "Output path where the files are written");
     private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
-    private static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
+    public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
 
     private Options getOptions() {
         final Options options = new Options();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72bbfc46/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
new file mode 100644
index 0000000..3e88cd0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index.automation;
+
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PTable.IndexType;
+
+public class PhoenixAsyncIndex {
+    private String tableName;
+    private IndexType indexType;
+    private String tableSchem;
+    private String dataTableName;
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public IndexType getIndexType() {
+        return indexType;
+    }
+
+    public void setIndexType(IndexType indexType) {
+        this.indexType = indexType;
+    }
+
+    public String getTableSchem() {
+        return tableSchem;
+    }
+
+    public void setTableSchem(String tableSchem) {
+        this.tableSchem = tableSchem;
+    }
+
+    public String getDataTableName() {
+        return dataTableName;
+    }
+
+    public void setDataTableName(String dataTableName) {
+        this.dataTableName = dataTableName;
+    }
+
+    public String getJobName() {
+        return String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, dataTableName, tableName);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("TableName = " + tableName)
+                .append(" ; IndexType = " + (indexType == null ? "" : indexType.toString()))
+                .append(" ; TableSchem = " + tableSchem)
+                .append(" ; DataTableName = " + dataTableName);
+        return builder.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72bbfc46/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobCallable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobCallable.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobCallable.java
new file mode 100644
index 0000000..720d2c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobCallable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index.automation;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class PhoenixMRJobCallable implements Callable<Boolean> {
+
+    private PhoenixAsyncIndex indexInfo;
+    private String basePath;
+    private Configuration conf;
+
+    public PhoenixMRJobCallable(Configuration conf, final PhoenixAsyncIndex indexInfo,
+            String basePath) {
+        this.conf = conf;
+        this.indexInfo = indexInfo;
+        this.basePath = basePath;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+        StringBuilder commandLineArgBuilder = new StringBuilder();
+        commandLineArgBuilder.append(" -dt " + indexInfo.getDataTableName());
+        commandLineArgBuilder.append(" -it " + indexInfo.getTableName());
+        commandLineArgBuilder.append(" -direct");
+        commandLineArgBuilder.append(" -op " + (basePath.endsWith("/") ? basePath : basePath
+ "/")
+                + indexInfo.getTableName());
+
+        if (indexInfo.getTableSchem() != null && indexInfo.getTableSchem().trim().length()
> 0) {
+            commandLineArgBuilder.append(" -s " + indexInfo.getTableSchem());
+        }
+        // Setting the configuration here again (in addition to IndexTool.java) to doubly
sure
+        // configurations are set
+        final String qDataTable =
+                SchemaUtil.getTableName(indexInfo.getTableSchem(), indexInfo.getDataTableName());
+        final String qIndexTable =
+                SchemaUtil.getTableName(indexInfo.getTableSchem(), indexInfo.getTableName());
+        String physicalIndexTable = qIndexTable;
+
+        if (IndexType.LOCAL.equals(indexInfo.getIndexType())) {
+            physicalIndexTable = MetaDataUtil.getLocalIndexTableName(qDataTable);
+        }
+        conf.set(TableOutputFormat.OUTPUT_TABLE, physicalIndexTable);
+
+        IndexTool tool = new IndexTool();
+        tool.setConf(conf);
+        int result = tool.run(commandLineArgBuilder.toString().split(" "));
+        return result == 0 ? true : false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72bbfc46/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
new file mode 100644
index 0000000..9b8d5c8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index.automation;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixMRJobUtil;
+import org.apache.phoenix.util.PhoenixMRJobUtil.MR_SCHEDULER_TYPE;
+import org.apache.phoenix.util.ZKBasedMasterElectionUtil;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+
+public class PhoenixMRJobSubmitter {
+
+    // Lock to elect a master node that submits the Phoenix Secondary Index MR Jobs
+    private static final String PHOENIX_LOCKS_PARENT =
+            "/phoenix/automated-mr-index-build-leader-election";
+    private static final String AUTO_INDEX_BUILD_LOCK_NAME = "ActiveStandbyElectorLock";
+
+    private static final String CANDIDATE_INDEX_INFO_QUERY = "SELECT "
+            + PhoenixDatabaseMetaData.INDEX_TYPE + ", " + PhoenixDatabaseMetaData.DATA_TABLE_NAME
+            + ", " + PhoenixDatabaseMetaData.TABLE_SCHEM + ", "
+            + PhoenixDatabaseMetaData.TABLE_NAME + " FROM "
+            + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE "
+            + PhoenixDatabaseMetaData.COLUMN_NAME + " is null and "
+            + PhoenixDatabaseMetaData.TABLE_TYPE + " = '" + PTableType.INDEX.getSerializedValue()
+            + "' and " + PhoenixDatabaseMetaData.INDEX_STATE + " = '"
+            + PIndexState.BUILDING.getSerializedValue() + "'";
+    // TODO - Move this to a property?
+    private static final int JOB_SUBMIT_POOL_TIMEOUT = 5;
+    private Configuration conf;
+    private String zkQuorum;
+    private static final Log LOG = LogFactory.getLog(PhoenixMRJobSubmitter.class);
+
+    public PhoenixMRJobSubmitter() throws IOException {
+        this(null);
+    }
+
+    public PhoenixMRJobSubmitter(Configuration conf) throws IOException {
+        if (conf == null) {
+            conf = HBaseConfiguration.create();
+        }
+        this.conf = conf;
+
+        PhoenixMRJobUtil.updateTimeoutsToFailFast(conf);
+        String schedulerType =
+                conf.get(PhoenixMRJobUtil.PHOENIX_MR_SCHEDULER_TYPE_NAME,
+                    MR_SCHEDULER_TYPE.NONE.toString());
+
+        MR_SCHEDULER_TYPE type = MR_SCHEDULER_TYPE.valueOf(schedulerType);
+
+        switch (type) {
+        case CAPACITY:
+            LOG.info("Applying the Capacity Scheduler Queue Configurations");
+            PhoenixMRJobUtil.updateCapacityQueueInfo(conf);
+            break;
+        case FAIR:
+            LOG.warn("Fair Scheduler type is not yet supported");
+            throw new IOException("Fair Scheduler is not yet supported");
+        case NONE:
+        default:
+            break;
+        }
+        zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        // Use UGI.loginUserFromKeytab to login and work with secure clusters
+        enableKeyTabSecurity();
+    }
+
+    private void enableKeyTabSecurity() throws IOException {
+
+        final String PRINCIPAL = "principal";
+        final String KEYTAB = "keyTab";
+        // Login with the credentials from the keytab to retrieve the TGT . The
+        // renewal of the TGT happens in a Zookeeper thread
+        String principal = null;
+        String keyTabPath = null;
+        AppConfigurationEntry entries[] =
+                javax.security.auth.login.Configuration.getConfiguration()
+                        .getAppConfigurationEntry("Client");
+        LOG.info("Security - Fetched App Login Configuration Entries");
+        if (entries != null) {
+            for (AppConfigurationEntry entry : entries) {
+                if (entry.getOptions().get(PRINCIPAL) != null) {
+                    principal = (String) entry.getOptions().get(PRINCIPAL);
+                }
+                if (entry.getOptions().get(KEYTAB) != null) {
+                    keyTabPath = (String) entry.getOptions().get(KEYTAB);
+                }
+            }
+            LOG.info("Security - Got Principal = " + principal + "");
+            if (principal != null && keyTabPath != null) {
+                LOG.info("Security - Retreiving the TGT with principal:" + principal
+                        + " and keytab:" + keyTabPath);
+                UserGroupInformation.loginUserFromKeytab(principal, keyTabPath);
+                LOG.info("Security - Retrieved TGT with principal:" + principal + " and keytab:"
+                        + keyTabPath);
+            }
+        }
+    }
+
+    public Map<String, PhoenixAsyncIndex> getCandidateJobs() throws SQLException {
+        Connection con = DriverManager.getConnection("jdbc:phoenix:" + zkQuorum);
+        Statement s = con.createStatement();
+        ResultSet rs = s.executeQuery(CANDIDATE_INDEX_INFO_QUERY);
+        Map<String, PhoenixAsyncIndex> candidateIndexes = new HashMap<String, PhoenixAsyncIndex>();
+        while (rs.next()) {
+            PhoenixAsyncIndex indexInfo = new PhoenixAsyncIndex();
+            indexInfo.setIndexType(IndexType.fromSerializedValue(rs
+                    .getByte(PhoenixDatabaseMetaData.INDEX_TYPE)));
+            indexInfo.setDataTableName(rs.getString(PhoenixDatabaseMetaData.DATA_TABLE_NAME));
+            indexInfo.setTableSchem(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM));
+            indexInfo.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
+            candidateIndexes.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+                indexInfo.getDataTableName(), indexInfo.getTableName()), indexInfo);
+        }
+
+        return candidateIndexes;
+    }
+
+    public int scheduleIndexBuilds() throws Exception {
+
+        ZooKeeperWatcher zookeeperWatcher =
+                new ZooKeeperWatcher(conf, "phoenixAutomatedMRIndexBuild", null);
+
+        if (!ZKBasedMasterElectionUtil.acquireLock(zookeeperWatcher, PHOENIX_LOCKS_PARENT,
+            AUTO_INDEX_BUILD_LOCK_NAME)) {
+            LOG.info("Some other node is already running Automated Index Build. Skipping
execution!");
+            return -1;
+        }
+        // 1) Query Phoenix SYSTEM.CATALOG table to get a list of all candidate indexes to
be built
+        // (in state 'b')
+        // 2) Get a list of all ACCEPTED, SUBMITTED AND RUNNING jobs from Yarn Resource Manager
+        // 3) Get the jobs to submit (list from 1 - list from 2)
+
+        // Get Candidate indexes to be built
+        Map<String, PhoenixAsyncIndex> candidateJobs = getCandidateJobs();
+        LOG.info("Candidate Indexes to be built as seen from SYSTEM.CATALOG - " + candidateJobs);
+
+        // Get already scheduled Jobs list from Yarn Resource Manager
+        Set<String> submittedJobs = getSubmittedYarnApps();
+        LOG.info("Already Submitted/Running MR index build jobs - " + submittedJobs);
+
+        // Get final jobs to submit
+        Set<PhoenixAsyncIndex> jobsToSchedule = getJobsToSubmit(candidateJobs, submittedJobs);
+
+        LOG.info("Final indexes to be built - " + jobsToSchedule);
+        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(jobsToSchedule.size());
+
+        int failedJobSubmissionCount = 0;
+        int timedoutJobSubmissionCount = 0;
+        ExecutorService jobSubmitPool = Executors.newFixedThreadPool(10);
+        LOG.info("Attempt to submit MR index build jobs for - " + jobsToSchedule);
+
+        try {
+            for (PhoenixAsyncIndex indexToBuild : jobsToSchedule) {
+                PhoenixMRJobCallable task =
+                        new PhoenixMRJobCallable(HBaseConfiguration.create(conf), indexToBuild,
"/");
+                results.add(jobSubmitPool.submit(task));
+            }
+            for (Future<Boolean> result : results) {
+                try {
+                    result.get(JOB_SUBMIT_POOL_TIMEOUT, TimeUnit.MINUTES);
+                } catch (InterruptedException e) {
+                    failedJobSubmissionCount++;
+                } catch (ExecutionException e) {
+                    failedJobSubmissionCount++;
+                } catch (TimeoutException e) {
+                    timedoutJobSubmissionCount++;
+                }
+            }
+        } finally {
+            PhoenixMRJobUtil.shutdown(jobSubmitPool);
+        }
+
+        LOG.info("Result of Attempt to Submit MR index build Jobs - Jobs attempted = "
+                + jobsToSchedule.size() + " ; Failed to Submit = " + failedJobSubmissionCount
+                + " ; Timed out = " + timedoutJobSubmissionCount);
+        return failedJobSubmissionCount;
+    }
+
+    public Set<PhoenixAsyncIndex> getJobsToSubmit(Map<String, PhoenixAsyncIndex>
candidateJobs,
+            Set<String> submittedJobs) {
+        Set<PhoenixAsyncIndex> toScheduleJobs =
+                new HashSet<PhoenixAsyncIndex>(candidateJobs.values());
+        for (String jobId : submittedJobs) {
+            if (candidateJobs.containsKey(jobId)) {
+                toScheduleJobs.remove(candidateJobs.get(jobId));
+            }
+        }
+        toScheduleJobs.removeAll(submittedJobs);
+        return toScheduleJobs;
+    }
+
+    public Set<String> getSubmittedYarnApps() throws Exception {
+        String rmHost = PhoenixMRJobUtil.getActiveResourceManagerHost(conf, zkQuorum);
+        Map<String, String> urlParams = new HashMap<String, String>();
+        urlParams.put(YarnApplication.APP_STATES_ELEMENT, YarnApplication.state.NEW.toString()
+                + "," + YarnApplication.state.ACCEPTED + "," + YarnApplication.state.SUBMITTED
+                + "," + YarnApplication.state.RUNNING);
+        int rmPort = PhoenixMRJobUtil.getRMPort(conf);
+        String response = PhoenixMRJobUtil.getJobsInformationFromRM(rmHost, rmPort, urlParams);
+        LOG.debug("Already Submitted/Running Apps = " + response);
+        JSONObject jobsJson = new JSONObject(response);
+        JSONObject appsJson = jobsJson.optJSONObject(YarnApplication.APPS_ELEMENT);
+        Set<String> yarnApplicationSet = new HashSet<String>();
+
+        if (appsJson == null) {
+            return yarnApplicationSet;
+        }
+        JSONArray appJson = appsJson.optJSONArray(YarnApplication.APP_ELEMENT);
+        if (appJson == null) {
+            return yarnApplicationSet;
+        }
+        for (int i = 0; i < appJson.length(); i++) {
+
+            Gson gson = new GsonBuilder().create();
+            YarnApplication yarnApplication =
+                    gson.fromJson(appJson.getJSONObject(i).toString(),
+                        new TypeToken<YarnApplication>() {
+                        }.getType());
+            yarnApplicationSet.add(yarnApplication.getName());
+        }
+
+        return yarnApplicationSet;
+    }
+
+    public static void main(String[] args) throws Exception {
+        PhoenixMRJobSubmitter t = new PhoenixMRJobSubmitter();
+        t.scheduleIndexBuilds();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72bbfc46/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/YarnApplication.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/YarnApplication.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/YarnApplication.java
new file mode 100644
index 0000000..926aea3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/YarnApplication.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index.automation;
+
+public class YarnApplication {
+
+    public static final String APP_ELEMENT = "app";
+    public static final String APPS_ELEMENT = "apps";
+    public static final String APP_STATES_ELEMENT = "states";
+
+    private long finishedTime;
+    private String amContainerLogs;
+    private String trackingUI;
+
+    public enum state {
+        NEW, ACCEPTED, SUBMITTED, RUNNING, FINISHED
+    }
+
+    private String user;
+    private String id;
+    private String clusterId;
+
+    public enum finalStatus {
+        SUCCEEDED, FAILED, KILLED, UNDEFINED
+    }
+
+    private String amHostHttpAddress;
+    private double progress;
+    private String name;
+    private long startedTime;
+    private long elapsedTime;
+    private String diagnostics;
+    private String trackingUrl;
+    private String queue;
+    private int allocatedMB;
+    private int allocatedVCores;
+    private int runningContainers;
+    private int memorySeconds;
+    private int vcoreSeconds;
+
+    public long getFinishedTime() {
+        return finishedTime;
+    }
+
+    public void setFinishedTime(long finishedTime) {
+        this.finishedTime = finishedTime;
+    }
+
+    public String getAmContainerLogs() {
+        return amContainerLogs;
+    }
+
+    public void setAmContainerLogs(String amContainerLogs) {
+        this.amContainerLogs = amContainerLogs;
+    }
+
+    public String getTrackingUI() {
+        return trackingUI;
+    }
+
+    public void setTrackingUI(String trackingUI) {
+        this.trackingUI = trackingUI;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public String getAmHostHttpAddress() {
+        return amHostHttpAddress;
+    }
+
+    public void setAmHostHttpAddress(String amHostHttpAddress) {
+        this.amHostHttpAddress = amHostHttpAddress;
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public long getStartedTime() {
+        return startedTime;
+    }
+
+    public void setStartedTime(long startedTime) {
+        this.startedTime = startedTime;
+    }
+
+    public long getElapsedTime() {
+        return elapsedTime;
+    }
+
+    public void setElapsedTime(long elapsedTime) {
+        this.elapsedTime = elapsedTime;
+    }
+
+    public String getDiagnostics() {
+        return diagnostics;
+    }
+
+    public void setDiagnostics(String diagnostics) {
+        this.diagnostics = diagnostics;
+    }
+
+    public String getTrackingUrl() {
+        return trackingUrl;
+    }
+
+    public void setTrackingUrl(String trackingUrl) {
+        this.trackingUrl = trackingUrl;
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public int getAllocatedMB() {
+        return allocatedMB;
+    }
+
+    public void setAllocatedMB(int allocatedMB) {
+        this.allocatedMB = allocatedMB;
+    }
+
+    public int getAllocatedVCores() {
+        return allocatedVCores;
+    }
+
+    public void setAllocatedVCores(int allocatedVCores) {
+        this.allocatedVCores = allocatedVCores;
+    }
+
+    public int getRunningContainers() {
+        return runningContainers;
+    }
+
+    public void setRunningContainers(int runningContainers) {
+        this.runningContainers = runningContainers;
+    }
+
+    public int getMemorySeconds() {
+        return memorySeconds;
+    }
+
+    public void setMemorySeconds(int memorySeconds) {
+        this.memorySeconds = memorySeconds;
+    }
+
+    public int getVcoreSeconds() {
+        return vcoreSeconds;
+    }
+
+    public void setVcoreSeconds(int vcoreSeconds) {
+        this.vcoreSeconds = vcoreSeconds;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72bbfc46/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
new file mode 100644
index 0000000..f12d49d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ActiveRMInfoProto;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.codehaus.jettison.json.JSONException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class PhoenixMRJobUtil {
+
+    private static final String YARN_LEADER_ELECTION = "/yarn-leader-election";
+    private static final String ACTIVE_STANDBY_ELECTOR_LOCK = "ActiveStandbyElectorLock";
+    private static final String RM_APPS_GET_ENDPOINT = "/ws/v1/cluster/apps";
+
+    // Reduced HBase Client Retries
+    private static final int CLIENT_RETRIES_NUMBER = 2;
+    private static final long CLIENT_PAUSE_TIME = 1000;
+    private static final int ZOOKEEPER_RECOVERY_RETRY_COUNT = 1;
+
+    public static final String PHOENIX_INDEX_MR_QUEUE_NAME_PROPERTY =
+            "phoenix.index.mr.scheduler.capacity.queuename";
+    public static final String PHOENIX_INDEX_MR_MAP_MEMORY_PROPERTY =
+            "phoenix.index.mr.scheduler.capacity.mapMemoryMB";
+
+    // Default MR Capacity Scheduler Configurations for Phoenix MR Index Build
+    // Jobs
+    public static final String DEFAULT_QUEUE_NAME = "default";
+    public static final int DEFAULT_MAP_MEMROY_MB = 5120;
+    public static final String XMX_OPT = "-Xmx";
+
+    public static final String RM_HTTP_SCHEME = "http";
+    // TODO - Move these as properties?
+    public static final int RM_CONNECT_TIMEOUT_MILLIS = 10 * 1000;
+    public static final int RM_READ_TIMEOUT_MILLIS = 10 * 60 * 1000;
+
+    private static final Log LOG = LogFactory.getLog(PhoenixMRJobUtil.class);
+
+    public static final String PHOENIX_MR_SCHEDULER_TYPE_NAME = "phoenix.index.mr.scheduler.type";
+
+    public enum MR_SCHEDULER_TYPE {
+        CAPACITY, FAIR, NONE
+    };
+
+    public static String getActiveResourceManagerHost(Configuration config, String zkQuorum)
+            throws IOException, InterruptedException, JSONException, KeeperException,
+            InvalidProtocolBufferException, ZooKeeperConnectionException {
+        ZooKeeperWatcher zkw = null;
+        ZooKeeper zk = null;
+        String activeRMHost = null;
+        try {
+            zkw = new ZooKeeperWatcher(config, "get-active-yarnmanager", null);
+            zk = new ZooKeeper(zkQuorum, 30000, zkw, false);
+
+            List<String> children = zk.getChildren(YARN_LEADER_ELECTION, zkw);
+            for (String subEntry : children) {
+                List<String> subChildern =
+                        zk.getChildren(YARN_LEADER_ELECTION + "/" + subEntry, zkw);
+                for (String eachEntry : subChildern) {
+                    if (eachEntry.contains(ACTIVE_STANDBY_ELECTOR_LOCK)) {
+                        String path =
+                                YARN_LEADER_ELECTION + "/" + subEntry + "/"
+                                        + ACTIVE_STANDBY_ELECTOR_LOCK;
+                        byte[] data = zk.getData(path, zkw, new Stat());
+                        ActiveRMInfoProto proto = ActiveRMInfoProto.parseFrom(data);
+                        proto.getRmId();
+                        LOG.info("Active RmId : " + proto.getRmId());
+
+                        activeRMHost =
+                                config.get(YarnConfiguration.RM_HOSTNAME + "." + proto.getRmId());
+                        LOG.info("activeResourceManagerHostname = " + activeRMHost);
+
+                    }
+                }
+            }
+        } finally {
+            if (zkw != null) zkw.close();
+            if (zk != null) zk.close();
+        }
+
+        return activeRMHost;
+    }
+
+    public static String getJobsInformationFromRM(String rmhost, int rmport,
+            Map<String, String> urlParams) throws MalformedURLException, ProtocolException,
+            UnsupportedEncodingException, IOException {
+        HttpURLConnection con = null;
+        String response = null;
+        String url = null;
+
+        try {
+            StringBuilder urlBuilder = new StringBuilder();
+
+            urlBuilder.append(RM_HTTP_SCHEME + "://").append(rmhost).append(":").append(rmport)
+                    .append(RM_APPS_GET_ENDPOINT);
+
+            if (urlParams != null && urlParams.size() != 0) {
+                urlBuilder.append("?");
+                for (String key : urlParams.keySet()) {
+                    urlBuilder.append(key + "=" + urlParams.get(key) + "&");
+                }
+                urlBuilder.delete(urlBuilder.length() - 1, urlBuilder.length());
+            }
+
+            url = urlBuilder.toString();
+            LOG.info("Attempt to get running/submitted jobs information from RM URL = " +
url);
+
+            URL obj = new URL(url);
+            con = (HttpURLConnection) obj.openConnection();
+            con.setInstanceFollowRedirects(true);
+            con.setRequestMethod("GET");
+
+            con.setConnectTimeout(RM_CONNECT_TIMEOUT_MILLIS);
+            con.setReadTimeout(RM_READ_TIMEOUT_MILLIS);
+
+            response = getTextContent(con.getInputStream());
+        } finally {
+            if (con != null) con.disconnect();
+        }
+
+        LOG.info("Result of attempt to get running/submitted jobs from RM - URL=" + url
+                + ",ResponseCode=" + con.getResponseCode() + ",Response=" + response);
+
+        return response;
+    }
+
+    public static String getTextContent(InputStream is) throws IOException {
+        BufferedReader in = null;
+        StringBuilder response = null;
+        try {
+            in = new BufferedReader(new InputStreamReader(is));
+            String inputLine;
+            response = new StringBuilder();
+            while ((inputLine = in.readLine()) != null) {
+                response.append(inputLine).append("\n");
+            }
+        } finally {
+            if (in != null) in.close();
+            if (is != null) {
+                is.close();
+            }
+        }
+        return response.toString();
+    }
+
+    public static void shutdown(ExecutorService pool) throws InterruptedException {
+        pool.shutdown();
+        LOG.debug("Shutdown called");
+        pool.awaitTermination(200, TimeUnit.MILLISECONDS);
+        LOG.debug("Await termination called to wait for 200 msec");
+        if (!pool.isShutdown()) {
+            pool.shutdownNow();
+            LOG.debug("Await termination called to wait for 200 msec");
+            pool.awaitTermination(100, TimeUnit.MILLISECONDS);
+        }
+        if (!pool.isShutdown()) {
+            LOG.warn("Pool did not shutdown");
+        }
+    }
+
+    public static int getRMPort(Configuration conf) throws IOException {
+        String rmHostPortStr = conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS);
+        String[] rmHostPort = rmHostPortStr.split(":");
+        if (rmHostPort == null || rmHostPort.length != 2) {
+            throw new IOException("Invalid value for property "
+                    + YarnConfiguration.RM_WEBAPP_ADDRESS + " = " + rmHostPortStr);
+        }
+        int rmPort = Integer.parseInt(rmHostPort[1]);
+        return rmPort;
+    }
+
+    public static void updateTimeoutsToFailFast(Configuration conf) {
+        conf.set("hbase.client.retries.number", String.valueOf(CLIENT_RETRIES_NUMBER));
+        conf.set("zookeeper.recovery.retry", String.valueOf(ZOOKEEPER_RECOVERY_RETRY_COUNT));
+        conf.set("hbase.client.pause", String.valueOf(CLIENT_PAUSE_TIME));
+    }
+
+    /**
+     * This method set the configuration values for Capacity scheduler.
+     * @param conf - Configuration to which Capacity Queue information to be added
+     */
+    public static void updateCapacityQueueInfo(Configuration conf) {
+        conf.set(MRJobConfig.QUEUE_NAME,
+            conf.get(PHOENIX_INDEX_MR_QUEUE_NAME_PROPERTY, DEFAULT_QUEUE_NAME));
+        int mapMemoryMB = conf.getInt(PHOENIX_INDEX_MR_MAP_MEMORY_PROPERTY, DEFAULT_MAP_MEMROY_MB);
+
+        conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemoryMB);
+        conf.set(MRJobConfig.MAP_JAVA_OPTS, XMX_OPT + ((int) (mapMemoryMB * 0.9)) + "m");
+
+        LOG.info("Queue Name=" + conf.get(MRJobConfig.QUEUE_NAME) + ";" + "Map Meory MB="
+                + conf.get(MRJobConfig.MAP_MEMORY_MB) + ";" + "Map Java Opts="
+                + conf.get(MRJobConfig.MAP_JAVA_OPTS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72bbfc46/phoenix-core/src/main/java/org/apache/phoenix/util/ZKBasedMasterElectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ZKBasedMasterElectionUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/ZKBasedMasterElectionUtil.java
new file mode 100644
index 0000000..69ef0b5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ZKBasedMasterElectionUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+public class ZKBasedMasterElectionUtil {
+
+    private static final Log LOG = LogFactory.getLog(ZKBasedMasterElectionUtil.class);
+
+    public static boolean acquireLock(ZooKeeperWatcher zooKeeperWatcher, String parentNode,
+            String lockName) throws KeeperException, InterruptedException {
+        // Create the parent node as Persistent
+        LOG.info("Creating the parent lock node:" + parentNode);
+        ZKUtil.createWithParents(zooKeeperWatcher, parentNode);
+
+        // Create the ephemeral node
+        String lockNode = parentNode + "/" + lockName;
+        String nodeValue = getHostName() + "_" + UUID.randomUUID().toString();
+        LOG.info("Trying to acquire the lock by creating node:" + lockNode + " value:" +
nodeValue);
+        // Create the ephemeral node
+        try {
+            zooKeeperWatcher.getRecoverableZooKeeper().create(lockNode, Bytes.toBytes(nodeValue),
+                Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        } catch (KeeperException.NodeExistsException e) {
+            LOG.info("Could not acquire lock. Another process had already acquired the lock
on Node "
+                    + lockName);
+            return false;
+        }
+        LOG.info("Obtained the lock :" + lockNode);
+        return true;
+    }
+
+    private static String getHostName() {
+        String host = "";
+        try {
+            host = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            LOG.error("UnknownHostException while trying to get the Local Host address :
", e);
+        }
+        return host;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72bbfc46/phoenix-core/src/test/java/org/apache/phoenix/index/automated/TestMRJobSubmitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/automated/TestMRJobSubmitter.java
b/phoenix-core/src/test/java/org/apache/phoenix/index/automated/TestMRJobSubmitter.java
new file mode 100644
index 0000000..649f22a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/automated/TestMRJobSubmitter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index.automated;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.automation.PhoenixAsyncIndex;
+import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMRJobSubmitter {
+
+    private Map<String, PhoenixAsyncIndex> candidateJobs =
+            new LinkedHashMap<String, PhoenixAsyncIndex>();
+    private Set<String> submittedJobs = new HashSet<String>();
+
+    @Before
+    public void prepare() {
+        PhoenixAsyncIndex index1 = new PhoenixAsyncIndex();
+        index1.setDataTableName("DT1");
+        index1.setTableName("IT1");
+        index1.setIndexType(IndexType.LOCAL);
+
+        candidateJobs.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+            index1.getDataTableName(), index1.getTableName()), index1);
+
+        PhoenixAsyncIndex index2 = new PhoenixAsyncIndex();
+        index2.setDataTableName("DT2");
+        index2.setTableName("IT2");
+        index2.setIndexType(IndexType.LOCAL);
+
+        candidateJobs.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+            index2.getDataTableName(), index2.getTableName()), index2);
+    }
+
+    @Test
+    public void testLocalIndexJobsSubmission() throws IOException {
+
+        // Set the index type to LOCAL
+        for (String jobId : candidateJobs.keySet()) {
+            candidateJobs.get(jobId).setIndexType(IndexType.LOCAL);
+        }
+        PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+        Set<PhoenixAsyncIndex> jobsToSubmit =
+                submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+        assertEquals(2, jobsToSubmit.size());
+    }
+
+    @Test
+    public void testGlobalIndexJobsForSubmission() throws IOException {
+
+        // Set the index type to GLOBAL
+        for (String jobId : candidateJobs.keySet()) {
+            candidateJobs.get(jobId).setIndexType(IndexType.GLOBAL);
+        }
+        PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+        Set<PhoenixAsyncIndex> jobsToSubmit =
+                submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+        assertEquals(2, jobsToSubmit.size());
+        assertEquals(true, jobsToSubmit.containsAll(candidateJobs.values()));
+    }
+
+    @Test
+    public void testSkipSubmittedJob() throws IOException {
+        PhoenixAsyncIndex[] jobs = new PhoenixAsyncIndex[candidateJobs.size()];
+        candidateJobs.values().toArray(jobs);
+        
+        // Mark one job as running
+        submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+            jobs[0].getDataTableName(), jobs[0].getTableName()));
+
+        PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+        Set<PhoenixAsyncIndex> jobsToSubmit =
+                submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+        
+        // Should not contain the running job
+        assertEquals(1, jobsToSubmit.size());
+        assertEquals(false, jobsToSubmit.containsAll(candidateJobs.values()));
+        assertEquals(true, jobsToSubmit.contains(jobs[1]));
+    }
+
+    @Test
+    public void testSkipAllSubmittedJobs() throws IOException {
+        PhoenixAsyncIndex[] jobs = new PhoenixAsyncIndex[candidateJobs.size()];
+        candidateJobs.values().toArray(jobs);
+        
+        // Mark all the candidate jobs as running/in-progress
+        submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+            jobs[0].getDataTableName(), jobs[0].getTableName()));
+        submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+            jobs[1].getDataTableName(), jobs[1].getTableName()));
+
+        PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+        Set<PhoenixAsyncIndex> jobsToSubmit =
+                submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+        assertEquals(0, jobsToSubmit.size());
+    }
+    
+    @Test
+    public void testNoJobsToSubmit() throws IOException {
+        // Clear candidate jobs
+        candidateJobs.clear();
+        // Add some dummy running jobs to the submitted list
+        submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+            "d1", "i1"));
+        submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+            "d2", "i2"));
+        PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+        Set<PhoenixAsyncIndex> jobsToSubmit =
+                submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+        assertEquals(0, jobsToSubmit.size());
+    }
+}
\ No newline at end of file


Mime
View raw message