Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 32A10186B6 for ; Tue, 11 Aug 2015 08:20:05 +0000 (UTC) Received: (qmail 98814 invoked by uid 500); 11 Aug 2015 08:20:02 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 98697 invoked by uid 500); 11 Aug 2015 08:20:02 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 98575 invoked by uid 99); 11 Aug 2015 08:20:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2015 08:20:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9862FE03C8; Tue, 11 Aug 2015 08:20:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ajayyadava@apache.org To: commits@falcon.apache.org Date: Tue, 11 Aug 2015 08:20:03 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] falcon git commit: FALCON-1188 Falcon support for Hive Replication. Contributed by Venkat Ranganathan. http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/resources/log4j.xml b/addons/hivedr/src/main/resources/log4j.xml new file mode 100644 index 0000000..f83a9a9 --- /dev/null +++ b/addons/hivedr/src/main/resources/log4j.xml @@ -0,0 +1,54 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java new file mode 100644 index 0000000..bfeca8d --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.hive; + +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.util.DBReplicationStatus; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Unit tests for DBReplicationStatus. + */ +@Test +public class DBReplicationStatusTest { + + private Map tableStatuses = new HashMap(); + private ReplicationStatus dbReplicationStatus; + private ReplicationStatus tableStatus1; + + public DBReplicationStatusTest() { + } + + @BeforeClass + public void prepare() throws Exception { + dbReplicationStatus = new ReplicationStatus("source", "target", "jobname", + "Default1", null, ReplicationStatus.Status.FAILURE, 20L); + tableStatus1 = new ReplicationStatus("source", "target", "jobname", + "default1", "Table1", ReplicationStatus.Status.SUCCESS, 20L); + tableStatuses.put("Table1", tableStatus1); + + } + + public void dBReplicationStatusSerializeTest() throws Exception { + DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses); + + String expected = "{\n" + " \"db_status\": {\n" + + " \"sourceUri\": \"source\",\n" + " \"targetUri\": \"target\",\n" + + " \"jobName\": \"jobname\",\n" + " \"database\": \"default1\",\n" + + " \"status\": \"FAILURE\",\n" + " \"eventId\": 20\n" + " },\n" + + " \"table_status\": {\"table1\": {\n" + " \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n" + " \"jobName\": \"jobname\",\n" + + " \"database\": \"default1\",\n" + " \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n" + " \"eventId\": 20\n" + " }}\n" + "}"; + String actual = replicationStatus.toJsonString(); + Assert.assertEquals(actual, expected); + } + + public void dBReplicationStatusDeserializeTest() throws Exception { + + String jsonString = "{\"db_status\":{\"sourceUri\":\"source\"," + + "\"targetUri\":\"target\",\"jobName\":\"jobname\",\"database\":\"default1\",\"status\":\"SUCCESS\"," + + "\"eventId\":20},\"table_status\":{\"Table1\":{\"sourceUri\":\"source\",\"targetUri\":\"target\"," + + "\"jobName\":\"jobname\",\"database\":\"default1\",\"table\":\"Table1\",\"status\":\"SUCCESS\"," + + "\"eventId\":20},\"table3\":{\"sourceUri\":\"source\",\"targetUri\":\"target\"," + + "\"jobName\":\"jobname\", \"database\":\"Default1\",\"table\":\"table3\",\"status\":\"FAILURE\"," + + "\"eventId\":10}, \"table2\":{\"sourceUri\":\"source\",\"targetUri\":\"target\"," + + "\"jobName\":\"jobname\", \"database\":\"default1\",\"table\":\"table2\",\"status\":\"INIT\"}}}"; + + DBReplicationStatus dbStatus = new DBReplicationStatus(jsonString); + Assert.assertEquals(dbStatus.getDatabaseStatus().getDatabase(), "default1"); + Assert.assertEquals(dbStatus.getDatabaseStatus().getJobName(), "jobname"); + Assert.assertEquals(dbStatus.getDatabaseStatus().getEventId(), 20); + + Assert.assertEquals(dbStatus.getTableStatuses().get("table1").getEventId(), 20); + Assert.assertEquals(dbStatus.getTableStatuses().get("table1").getStatus(), ReplicationStatus.Status.SUCCESS); + Assert.assertEquals(dbStatus.getTableStatuses().get("table2").getEventId(), -1); + Assert.assertEquals(dbStatus.getTableStatuses().get("table2").getStatus(), ReplicationStatus.Status.INIT); + Assert.assertEquals(dbStatus.getTableStatuses().get("table3").getEventId(), 10); + Assert.assertEquals(dbStatus.getTableStatuses().get("table3").getStatus(), ReplicationStatus.Status.FAILURE); + + + } + + public void wrongDBForTableTest() throws Exception { + + ReplicationStatus newDbStatus = new ReplicationStatus("source", "target", "jobname", + "wrongDb", null, ReplicationStatus.Status.FAILURE, 20L); + new DBReplicationStatus(newDbStatus); + + try { + new DBReplicationStatus(newDbStatus, tableStatuses); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot set status for table default1.table1, It does not belong to DB wrongdb"); + } + + String jsonString = "{\n" + " \"db_status\": {\n" + + " \"sourceUri\": \"source\",\n" + " \"targetUri\": \"target\",\n" + + " \"jobName\": \"jobname\",\n" + " \"database\": \"wrongdb\",\n" + + " \"status\": \"FAILURE\",\n" + " \"eventId\": 20\n" + " },\n" + + " \"table_status\": {\"table1\": {\n" + " \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n" + " \"jobName\": \"jobname\",\n" + + " \"database\": \"default1\",\n" + " \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n" + " \"eventId\": 20\n" + " }}\n" + "}"; + + try { + new DBReplicationStatus(jsonString); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Unable to create DBReplicationStatus from JsonString. Cannot set status for " + + "table default1.table1, It does not belong to DB wrongdb"); + } + } + + public void updateTableStatusTest() throws Exception { + DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses); + replicationStatus.updateTableStatus(tableStatus1); + + // wrong DB test + try { + replicationStatus.updateTableStatus(new ReplicationStatus("source", "target", "jobname", + "wrongDB", "table2", ReplicationStatus.Status.INIT, -1L)); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot update Table Status. TableDB wrongdb does not match current DB default1"); + } + + // wrong status test + try { + replicationStatus.updateTableStatus(dbReplicationStatus); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot update Table Status. Table name is empty."); + } + + } + + public void updateDBStatusTest() throws Exception { + DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses); + replicationStatus.updateDbStatus(dbReplicationStatus); + + // wrong DB test + try { + replicationStatus.updateDbStatus(new ReplicationStatus("source", "target", "jobname", + "wrongDB", null, ReplicationStatus.Status.INIT, -1L)); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot update Database Status. StatusDB wrongdb does not match current DB default1"); + } + + // wrong status test + try { + replicationStatus.updateDbStatus(tableStatus1); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot update DB Status. This is table level status."); + } + } + + public void updateDbStatusFromTableStatusesTest() throws Exception { + + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname", + "default1", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname", + "default1", "table1", ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname", + "Default1", "table2", ReplicationStatus.Status.INIT, -1L); + ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname", + "default1", "Table3", ReplicationStatus.Status.FAILURE, 15L); + ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname", + "Default1", "Table4", ReplicationStatus.Status.FAILURE, 18L); + Map tables = new HashMap(); + + tables.put("table1", table1); + tables.put("table2", table2); + tables.put("table3", table3); + tables.put("table4", table4); + + // If there is a failue, last eventId should be lowest eventId of failed tables + DBReplicationStatus status = new DBReplicationStatus(dbStatus, tables); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS); + status.updateDbStatusFromTableStatuses(); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 15); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.FAILURE); + + // If all tables succeed, last eventId should be highest eventId of success tables + table3 = new ReplicationStatus("source", "target", "jobname", + "default1", "table3", ReplicationStatus.Status.SUCCESS, 25L); + table4 = new ReplicationStatus("source", "target", "jobname", + "default1", "table4", ReplicationStatus.Status.SUCCESS, 22L); + tables.put("Table3", table3); + tables.put("Table4", table4); + status = new DBReplicationStatus(dbStatus, tables); + status.updateDbStatusFromTableStatuses(); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 25); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS); + + // Init tables should not change DB status. + Map initOnlyTables = new HashMap(); + initOnlyTables.put("table2", table2); + dbStatus = new ReplicationStatus("source", "target", "jobname", + "default1", null, ReplicationStatus.Status.SUCCESS, 20L); + status = new DBReplicationStatus(dbStatus, initOnlyTables); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS); + status.updateDbStatusFromTableStatuses(); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS); + + + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java new file mode 100644 index 0000000..1f44b62 --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +/** + * Test class for DR. + */ +public class DRTest { + public void testHiveDr(String[] args) { + String[] testArgs = { + "-sourceMetastoreUri", "thrift://localhost:9083", + "-sourceDatabase", "default", + "-sourceTable", "test", + "-sourceStagingPath", "/apps/hive/tools/dr", + "-sourceNN", "hdfs://localhost:8020", + "-sourceRM", "local", + + "-targetMetastoreUri", "thrift://localhost:9083", + "-targetStagingPath", "/apps/hive/tools/dr", + "-targetNN", "hdfs://localhost:8020", + "-targetRM", "local", + + "-maxEvents", "5", + "-replicationMaxMaps", "1", + "-distcpMapBandwidth", "4", + }; + HiveDRTool.main(testArgs); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java new file mode 100644 index 0000000..c89c661 --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.hive; + +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.hadoop.JailedFileSystem; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.util.DRStatusStore; +import org.apache.falcon.hive.util.HiveDRStatusStore; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +/** + * Unit tests for HiveDRStatusStore. + */ +@Test +public class HiveDRStatusStoreTest { + private HiveDRStatusStore drStatusStore; + private FileSystem fileSystem = new JailedFileSystem(); + + public HiveDRStatusStoreTest() throws Exception { + EmbeddedCluster cluster = EmbeddedCluster.newCluster("hiveReplTest"); + Path storePath = new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH); + + fileSystem.initialize(LocalFileSystem.getDefaultUri(cluster.getConf()), cluster.getConf()); + if (fileSystem.exists(storePath)) { + fileSystem.delete(storePath, true); + } + FileSystem.mkdirs(fileSystem, storePath, DRStatusStore.DEFAULT_STORE_PERMISSION); + try { + new HiveDRStatusStore(fileSystem); + Assert.fail(); + } catch (IOException ie) { + // Exception expected. + Assert.assertEquals(ie.getMessage(), "Base dir jail://hiveReplTest:00" + storePath.toUri() + + " does not have correct ownership/permissions." + + " Please set group to " + DRStatusStore.getStoreGroup() + " and permissions to rwxrwx---"); + } + drStatusStore = new HiveDRStatusStore(fileSystem, fileSystem.getFileStatus(storePath).getGroup()); + } + + @BeforeClass + public void updateReplicationStatusTest() throws Exception { + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname", + "Default1", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname", + "Default1", "table1", ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname", + "default1", "Table2", ReplicationStatus.Status.INIT, -1L); + ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname", + "Default1", "Table3", ReplicationStatus.Status.FAILURE, 15L); + ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname", + "default1", "table4", ReplicationStatus.Status.FAILURE, 18L); + ArrayList replicationStatusList = new ArrayList(); + replicationStatusList.add(table1); + replicationStatusList.add(table2); + replicationStatusList.add(table3); + replicationStatusList.add(table4); + replicationStatusList.add(dbStatus); + drStatusStore.updateReplicationStatus("jobname", replicationStatusList); + } + + public void updateReplicationStatusNewTablesTest() throws Exception { + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname2", + "default2", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname2", + "Default2", "table1", ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname2", + "default2", "Table2", ReplicationStatus.Status.INIT, -1L); + ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname2", + "default2", "table3", ReplicationStatus.Status.FAILURE, 15L); + ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname2", + "Default2", "Table4", ReplicationStatus.Status.FAILURE, 18L); + ArrayList replicationStatusList = new ArrayList(); + replicationStatusList.add(table1); + replicationStatusList.add(table2); + replicationStatusList.add(table3); + replicationStatusList.add(table4); + replicationStatusList.add(dbStatus); + + drStatusStore.updateReplicationStatus("jobname2", replicationStatusList); + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default2"); + Assert.assertEquals(status.getEventId(), 15); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getJobName(), "jobname2"); + Assert.assertEquals(status.getTable(), null); + Assert.assertEquals(status.getSourceUri(), "source"); + + Iterator iter = drStatusStore.getTableReplicationStatusesInDb("source", "target", + "jobname2", "default2"); + int size = 0; + while(iter.hasNext()) { + iter.next(); + size++; + } + Assert.assertEquals(4, size); + + table3 = new ReplicationStatus("source", "target", "jobname2", + "default2", "table3", ReplicationStatus.Status.SUCCESS, 25L); + table4 = new ReplicationStatus("source", "target", "jobname2", + "Default2", "table4", ReplicationStatus.Status.SUCCESS, 22L); + ReplicationStatus table5 = new ReplicationStatus("source", "target", "jobname2", + "default2", "Table5", ReplicationStatus.Status.SUCCESS, 18L); + ReplicationStatus db1table1 = new ReplicationStatus("source", "target", "jobname2", + "Default1", "Table1", ReplicationStatus.Status.SUCCESS, 18L); + replicationStatusList = new ArrayList(); + replicationStatusList.add(table5); + replicationStatusList.add(table3); + replicationStatusList.add(table4); + replicationStatusList.add(db1table1); + + drStatusStore.updateReplicationStatus("jobname2", replicationStatusList); + status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default1"); + Assert.assertEquals(status.getEventId(), 18); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS); + + status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default2"); + Assert.assertEquals(status.getEventId(), 25); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS); + + iter = drStatusStore.getTableReplicationStatusesInDb("source", "target", + "jobname2", "default2"); + size = 0; + while(iter.hasNext()) { + iter.next(); + size++; + } + Assert.assertEquals(5, size); + } + + public void getReplicationStatusDBTest() throws HiveReplicationException { + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "jobname", "Default1"); + Assert.assertEquals(status.getEventId(), 15); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getJobName(), "jobname"); + Assert.assertEquals(status.getTable(), null); + Assert.assertEquals(status.getSourceUri(), "source"); + } + + public void checkReplicationConflictTest() throws HiveReplicationException { + + try { + //same source, same job, same DB, null table : pass + drStatusStore.checkForReplicationConflict("source", "jobname", "default1", null); + + //same source, same job, same DB, same table : pass + drStatusStore.checkForReplicationConflict("source", "jobname", "default1", "table1"); + + //same source, same job, different DB, null table : pass + drStatusStore.checkForReplicationConflict("source", "jobname", "diffDB", null); + + //same source, same job, different DB, different table : pass + drStatusStore.checkForReplicationConflict("source", "jobname", "diffDB", "diffTable"); + + // same source, different job, same DB, diff table : pass + drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", "diffTable"); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + try { + // different source, same job, same DB, null table : fail + drStatusStore.checkForReplicationConflict("diffSource", "jobname", "default1", null); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Two different sources are attempting to replicate to same db default1." + + " New Source = diffSource, Existing Source = source"); + } + + try { + // same source, different job, same DB, null table : fail + drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", null); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Two different jobs are attempting to replicate to same db default1." + + " New Job = diffJob, Existing Job = jobname"); + } + + try { + // same source, different job, same DB, same table : fail + drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", "table1"); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Two different jobs are trying to replicate to same table table1." + + " New job = diffJob, Existing job = jobname"); + } + + + } + + public void deleteReplicationStatusTest() throws Exception { + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "deleteJob", + "deleteDB", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "deleteJob", + "deleteDB", "Table1", ReplicationStatus.Status.SUCCESS, 20L); + ArrayList replicationStatusList = new ArrayList(); + replicationStatusList.add(table1); + replicationStatusList.add(dbStatus); + drStatusStore.updateReplicationStatus("deleteJob", replicationStatusList); + + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "deleteJob", "deleteDB"); + Path statusPath = drStatusStore.getStatusDirPath(status.getDatabase(), status.getJobName()); + Assert.assertEquals(fileSystem.exists(statusPath), true); + + drStatusStore.deleteReplicationStatus("deleteJob", "deleteDB"); + Assert.assertEquals(fileSystem.exists(statusPath), false); + } + + public void getReplicationStatusTableTest() throws HiveReplicationException { + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", + "jobname", "default1", "table1"); + Assert.assertEquals(status.getEventId(), 20); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS); + Assert.assertEquals(status.getTable(), "table1"); + + status = drStatusStore.getReplicationStatus("source", "target", + "jobname", "Default1", "Table2"); + Assert.assertEquals(status.getEventId(), -1); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.INIT); + Assert.assertEquals(status.getTable(), "table2"); + + status = drStatusStore.getReplicationStatus("source", "target", + "jobname", "default1", "Table3"); + Assert.assertEquals(status.getEventId(), 15); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getTable(), "table3"); + + status = drStatusStore.getReplicationStatus("source", "target", + "jobname", "default1", "table4"); + Assert.assertEquals(status.getEventId(), 18); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getTable(), "table4"); + } + + public void getTableReplicationStatusesInDbTest() throws HiveReplicationException { + Iterator iter = drStatusStore.getTableReplicationStatusesInDb("source", "target", + "jobname", "Default1"); + int size = 0; + while(iter.hasNext()) { + size++; + ReplicationStatus status = iter.next(); + if (status.getTable().equals("table3")) { + Assert.assertEquals(status.getEventId(), 15); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getTable(), "table3"); + } + } + Assert.assertEquals(4, size); + } + + public void fileRotationTest() throws Exception { + // initialize replication status store for db default3. + // This should init with eventId = -1 and status = INIT + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", + "jobname3", "default3"); + Assert.assertEquals(status.getEventId(), -1); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.INIT); + + // update status 5 times resulting in 6 files : latest.json + five rotated files + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname3", + "Default3", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname3", + "default3", "Table1", ReplicationStatus.Status.SUCCESS, 20L); + ArrayList replicationStatusList = new ArrayList(); + replicationStatusList.add(table1); + replicationStatusList.add(dbStatus); + + for(int i=0; i<5; i++) { + Thread.sleep(2000); + drStatusStore.updateReplicationStatus("jobname3", replicationStatusList); + } + + status = drStatusStore.getReplicationStatus("source", "target", "jobname3", "default3"); + Path statusPath = drStatusStore.getStatusDirPath(status.getDatabase(), status.getJobName()); + RemoteIterator iter = fileSystem.listFiles(statusPath, false); + Assert.assertEquals(getRemoteIterSize(iter), 6); + + drStatusStore.rotateStatusFiles(statusPath, 3, 10000000); + iter = fileSystem.listFiles(statusPath, false); + Assert.assertEquals(getRemoteIterSize(iter), 6); + + drStatusStore.rotateStatusFiles(statusPath, 3, 6000); + iter = fileSystem.listFiles(statusPath, false); + Assert.assertEquals(getRemoteIterSize(iter), 3); + } + + public void wrongJobNameTest() throws Exception { + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname3", + "Default3", null, ReplicationStatus.Status.SUCCESS, 20L); + ArrayList replicationStatusList = new ArrayList(); + replicationStatusList.add(dbStatus); + + try { + drStatusStore.updateReplicationStatus("jobname2", replicationStatusList); + Assert.fail(); + } catch (HiveReplicationException e) { + // Expected exception due to jobname mismatch + } + } + + @AfterClass + public void cleanUp() throws IOException { + fileSystem.delete(new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH), true); + } + + private int getRemoteIterSize(RemoteIterator iter) throws IOException { + int size = 0; + while(iter.hasNext()) { + iter.next(); + size++; + } + return size; + } + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java new file mode 100644 index 0000000..a02639c --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.hive; + +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Unit tests for ReplicationStatus. + */ +@Test +public class ReplicationStatusTest { + + private ReplicationStatus dbStatus, tableStatus; + + public ReplicationStatusTest() {} + + + @BeforeClass + public void prepare() throws Exception { + dbStatus = new ReplicationStatus("source", "target", "jobname", + "default1", null, ReplicationStatus.Status.INIT, 0L); + tableStatus = new ReplicationStatus("source", "target", "jobname", + "testDb", "Table1", ReplicationStatus.Status.SUCCESS, 0L); + } + + public void replicationStatusSerializeTest() throws Exception { + String expected = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"jobname\",\n" + + " \"database\": \"testdb\",\n \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n \"eventId\": 0\n}"; + String actual = tableStatus.toJsonString(); + Assert.assertEquals(actual, expected); + + expected = "{\n \"sourceUri\": \"source\",\n \"targetUri\": \"target\",\n" + + " \"jobName\": \"jobname\",\n \"database\": \"default1\",\n" + + " \"status\": \"INIT\",\n \"eventId\": 0\n}"; + actual = dbStatus.toJsonString(); + Assert.assertEquals(actual, expected); + } + + public void replicationStatusDeserializeTest() throws Exception { + String tableInput = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n" + + " \"database\": \"Test1\",\n \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n \"eventId\": 0\n}"; + String dbInput = "{ \"sourceUri\": \"source\", \"targetUri\": \"target\",\"jobName\": \"jobname\",\n" + + " \"database\": \"default1\", \"status\": \"FAILURE\"," + + " \"eventId\": 27, \"statusLog\": \"testLog\"}"; + + ReplicationStatus newDbStatus = new ReplicationStatus(dbInput); + ReplicationStatus newTableStatus = new ReplicationStatus(tableInput); + + Assert.assertEquals(newDbStatus.getTable(), null); + Assert.assertEquals(newDbStatus.getEventId(), 27); + Assert.assertEquals(newDbStatus.getDatabase(), "default1"); + Assert.assertEquals(newDbStatus.getLog(), "testLog"); + Assert.assertEquals(newDbStatus.getStatus(), ReplicationStatus.Status.FAILURE); + + + Assert.assertEquals(newTableStatus.getTable(), "table1"); + Assert.assertEquals(newTableStatus.getEventId(), 0); + Assert.assertEquals(newTableStatus.getDatabase(), "test1"); + Assert.assertEquals(newTableStatus.getJobName(), "testJob"); + + // no table, no eventId, no log + dbInput = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n" + + " \"database\": \"Test1\",\n" + + " \"status\": \"SUCCESS\"\n}"; + newDbStatus = new ReplicationStatus(dbInput); + + Assert.assertEquals(newDbStatus.getDatabase(), "test1"); + Assert.assertEquals(newDbStatus.getTable(), null); + Assert.assertEquals(newDbStatus.getEventId(), -1); + Assert.assertEquals(newDbStatus.getLog(), null); + + } + + public void invalidEventIdTest() throws Exception { + String tableInput = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n" + + " \"database\": \"test1\",\n \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n \"eventId\": -100\n}"; + + ReplicationStatus newTableStatus = new ReplicationStatus(tableInput); + Assert.assertEquals(newTableStatus.getEventId(), -1); + + newTableStatus.setEventId(-200); + Assert.assertEquals(newTableStatus.getEventId(), -1); + + String expected = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n" + + " \"database\": \"test1\",\n \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n \"eventId\": -1\n}"; + String actual = newTableStatus.toJsonString(); + Assert.assertEquals(actual, expected); + + newTableStatus.setEventId(50); + Assert.assertEquals(newTableStatus.getEventId(), 50); + } + + public void invalidStatusTest() throws Exception { + + String dbInput = "{ \"sourceUri\": \"source\", \"targetUri\": \"target\",\"jobName\": \"jobname\",\n" + + " \"database\": \"default1\", \"status\": \"BLAH\"," + + " \"eventId\": 27, \"statusLog\": \"testLog\"}"; + + try { + new ReplicationStatus(dbInput); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Unable to deserialize jsonString to ReplicationStatus. Invalid status BLAH"); + } + } + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml index 824e6f5..de0f748 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml @@ -17,30 +17,27 @@ limitations under the License. --> - + - - + + + _falcon_mirroring_type=HDFS + 1 LAST_ONLY - ##process.frequency## + ##falcon.recipe.frequency## UTC - - - - - - - - - + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml index 145d489..d6a4ee9 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml @@ -47,28 +47,78 @@ oozie.launcher.oozie.libpath ${wf:conf("falcon.libpath")} + + oozie.launcher.mapreduce.job.hdfs-servers + ${drSourceClusterFS},${drTargetClusterFS} + org.apache.falcon.replication.FeedReplicator -Dmapred.job.queue.name=${queueName} -Dmapred.job.priority=${jobPriority} -maxMaps - ${maxMaps} + ${distcpMaxMaps} -mapBandwidth - ${mapBandwidth} + ${distcpMapBandwidth} -sourcePaths - ${nameNode}${drSourceDir} + ${drSourceDir} -targetPath ${drTargetClusterFS}${drTargetDir} -falconFeedStorageType FILESYSTEM + -availabilityFlag + ${availabilityFlag == 'NA' ? "NA" : availabilityFlag} + -counterLogDir + ${logDir}/job-${nominalTime} + + + + + + + ${drNotificationReceivers ne 'NA'} + + + + + + + + ${drNotificationReceivers ne 'NA'} + + + + + + + ${drNotificationReceivers} + INFO: HDFS DR workflow ${entityName} completed successfully + + The HDFS DR workflow ${wf:id()} is successful. + Source = ${drSourceDir} + Target = ${drTargetClusterFS}${drTargetDir} + + + + + + + + ${drNotificationReceivers} + ERROR: HDFS DR workflow ${entityName} failed + + The workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())} + Source = ${drSourceDir} + Target = ${drTargetClusterFS}${drTargetDir} + + - Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties index 19b8459..64ab6b8 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties @@ -19,47 +19,59 @@ ##### NOTE: This is a TEMPLATE file which can be copied and edited ##### Recipe properties -falcon.recipe.name=hdfs-replication - +##### Unique recipe job name +falcon.recipe.name=sales-monthly ##### Workflow properties - falcon.recipe.workflow.name=hdfs-dr-workflow # Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS -falcon.recipe.workflow.path=/recipes/hdfs-replication/hdfs-replication-workflow.xml +falcon.recipe.workflow.path=/apps/data-mirroring/workflows/hdfs-replication-workflow.xml # Provide Wf lib absolute path. This can be HDFS or local FS path. If libs are on local FS it will be copied to HDFS #falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib ##### Cluster properties +# Cluster where job should run +falcon.recipe.cluster.name=primaryCluster +# Change the cluster hdfs write end point here. This is mandatory. +falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://240.0.0.10:8020 +# Change the cluster validity start time here +falcon.recipe.cluster.validity.start=2015-03-13T00:00Z +# Change the cluster validity end time here +falcon.recipe.cluster.validity.end=2016-12-30T00:00Z -# Change the src cluster name here -falcon.recipe.src.cluster.name=test -# Change the src cluster hdfs write end point here. This is mandatory. -falcon.recipe.src.cluster.hdfs.writeEndPoint=hdfs://sandbox.hortonworks.com:8020 -# Change the src cluster validity start time here -falcon.recipe.src.cluster.validity.start=2014-10-01T00:00Z -# Change the src cluster validity end time here -falcon.recipe.src.cluster.validity.end=2016-12-30T00:00Z +##### Scheduling properties +# Change the recipe frequency here. Valid frequency type are minutes, hours, days, months +falcon.recipe.process.frequency=minutes(5) +##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma +##### Uncomment to add tags +#falcon.recipe.tags= -##### Scheduling properties +##### Retry policy properties -# Change the process here. Valid frequency type are minutes, hours, days, months -falcon.recipe.process.frequency=minutes(60) +falcon.recipe.retry.policy=periodic +falcon.recipe.retry.delay=minutes(30) +falcon.recipe.retry.attempts=3 +##### ACL properties - Uncomment and change ACL if authorization is enabled + +falcon.recipe.acl.owner=ambari-qa +falcon.recipe.acl.group=users +falcon.recipe.acl.permission=0x755 +falcon.recipe.nn.principal=nn/_HOST@EXAMPLE.COM ##### Custom Job properties -# Specify property names and values for properties defined in recipe template -falcon.recipe.process.property2.name=drSourceDir -falcon.recipe.process.property2.value=/falcon/test/srcCluster/input -falcon.recipe.process.property3.name=drTargetClusterFS -falcon.recipe.process.property3.value=hdfs://sandbox.hortonworks.com:8020 -falcon.recipe.process.property4.name=drTargetDir -falcon.recipe.process.property4.value=/falcon/test/targetCluster/input -falcon.recipe.process.property5.name=drTargetCluster -falcon.recipe.process.property5.value=backupCluster -falcon.recipe.process.property6.name=maxMaps -falcon.recipe.process.property6.value=5 -falcon.recipe.process.property7.name=mapBandwidth -falcon.recipe.process.property7.value=100 +# Specify multiple comma separated source directories +drSourceDir=/user/hrt_qa/dr/test/primaryCluster/input +drSourceClusterFS=hdfs://240.0.0.10:8020 +drTargetDir=/user/hrt_qa/dr/test/backupCluster/input +drTargetClusterFS=hdfs://240.0.0.11:8020 + +# Change it to specify the maximum number of mappers for DistCP +distcpMaxMaps=1 +# Change it to specify the bandwidth in MB for each mapper in DistCP +distcpMapBandwidth=100 + +##### Email on failure +drNotificationReceivers=NA http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/README.txt ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/README.txt b/addons/recipes/hive-disaster-recovery/README.txt new file mode 100644 index 0000000..ab393b1 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/README.txt @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Hive Metastore Disaster Recovery Recipe + +Overview +This recipe implements replicating hive metadata and data from one +Hadoop cluster to another Hadoop cluster. +This piggy backs on replication solution in Falcon which uses the DistCp tool. + +Use Case +* +* + +Limitations +* +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Hive Metastore Disaster Recovery Recipe + +Overview +This recipe implements replicating hive metadata and data from one +Hadoop cluster to another Hadoop cluster. +This piggy backs on replication solution in Falcon which uses the DistCp tool. + +Use Case +* +* + +Limitations +* http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/pom.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/pom.xml b/addons/recipes/hive-disaster-recovery/pom.xml new file mode 100644 index 0000000..1732907 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/pom.xml @@ -0,0 +1,32 @@ + + + + + + + 4.0.0 + org.apache.falcon.recipes + falcon-hive-replication-recipe + 0.7-SNAPSHOT + Apache Falcon Hive Disaster Recovery Recipe + Apache Falcon Sample Hive Disaster Recovery Recipe + jar + http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml new file mode 100644 index 0000000..3afbef0 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml @@ -0,0 +1,44 @@ + + + + + + + + + + + + _falcon_mirroring_type=HIVE + + 1 + + LAST_ONLY + ##process.frequency## + UTC + + + + + + + + + http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml new file mode 100644 index 0000000..7362c2e --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml @@ -0,0 +1,401 @@ + + + + + + hcat.metastore.uri + ${sourceMetastoreUri} + + + hcat.metastore.principal + ${sourceHiveMetastoreKerberosPrincipal} + + + + + hcat.metastore.uri + ${targetMetastoreUri} + + + hcat.metastore.principal + ${targetHiveMetastoreKerberosPrincipal} + + + + + hive2.server.principal + ${sourceHive2KerberosPrincipal} + + + hive2.jdbc.url + jdbc:${sourceHiveServer2Uri}/${sourceDatabase} + + + + + hive2.server.principal + ${targetHive2KerberosPrincipal} + + + hive2.jdbc.url + jdbc:${targetHiveServer2Uri}/${sourceDatabase} + + + + + + + ${jobTracker} + ${nameNode} + + + oozie.launcher.mapreduce.job.user.classpath.first + true + + + mapred.job.queue.name + ${queueName} + + + oozie.launcher.mapred.job.priority + ${jobPriority} + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.java + distcp,hive,hive2,hcatalog + + + oozie.launcher.mapreduce.job.hdfs-servers + ${sourceNN},${targetNN} + + + mapreduce.job.hdfs-servers + ${sourceNN},${targetNN} + + + org.apache.falcon.hive.HiveDRTool + -Dmapred.job.queue.name=${queueName} + -Dmapred.job.priority=${jobPriority} + -falconLibPath + ${wf:conf("falcon.libpath")} + -sourceCluster + ${sourceCluster} + -sourceMetastoreUri + ${sourceMetastoreUri} + -sourceHiveServer2Uri + ${sourceHiveServer2Uri} + -sourceDatabase + ${sourceDatabase} + -sourceTable + ${sourceTable} + -sourceStagingPath + ${sourceStagingPath} + -sourceNN + ${sourceNN} + -sourceNNKerberosPrincipal + ${sourceNNKerberosPrincipal} + -sourceHiveMetastoreKerberosPrincipal + ${sourceHiveMetastoreKerberosPrincipal} + -sourceHive2KerberosPrincipal + ${sourceHive2KerberosPrincipal} + -targetCluster + ${targetCluster} + -targetMetastoreUri + ${targetMetastoreUri} + -targetHiveServer2Uri + ${targetHiveServer2Uri} + -targetStagingPath + ${targetStagingPath} + -targetNN + ${targetNN} + -targetNNKerberosPrincipal + ${targetNNKerberosPrincipal} + -targetHiveMetastoreKerberosPrincipal + ${targetHiveMetastoreKerberosPrincipal} + -targetHive2KerberosPrincipal + ${targetHive2KerberosPrincipal} + -maxEvents + ${maxEvents} + -clusterForJobRun + ${clusterForJobRun} + -clusterForJobRunWriteEP + ${clusterForJobRunWriteEP} + -clusterForJobNNKerberosPrincipal + ${clusterForJobNNKerberosPrincipal} + -drJobName + ${drJobName}-${nominalTime} + -executionStage + lastevents + + + + + + + + ${jobTracker} + ${nameNode} + + + oozie.launcher.mapreduce.job.user.classpath.first + true + + + mapred.job.queue.name + ${queueName} + + + oozie.launcher.mapred.job.priority + ${jobPriority} + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.java + distcp,hive,hive2,hcatalog + + + oozie.launcher.mapreduce.job.hdfs-servers + ${sourceNN},${targetNN} + + + mapreduce.job.hdfs-servers + ${sourceNN},${targetNN} + + + org.apache.falcon.hive.HiveDRTool + -Dmapred.job.queue.name=${queueName} + -Dmapred.job.priority=${jobPriority} + -falconLibPath + ${wf:conf("falcon.libpath")} + -replicationMaxMaps + ${replicationMaxMaps} + -distcpMaxMaps + ${distcpMaxMaps} + -sourceCluster + ${sourceCluster} + -sourceMetastoreUri + ${sourceMetastoreUri} + -sourceHiveServer2Uri + ${sourceHiveServer2Uri} + -sourceDatabase + ${sourceDatabase} + -sourceTable + ${sourceTable} + -sourceStagingPath + ${sourceStagingPath} + -sourceNN + ${sourceNN} + -sourceNNKerberosPrincipal + ${sourceNNKerberosPrincipal} + -sourceHiveMetastoreKerberosPrincipal + ${sourceHiveMetastoreKerberosPrincipal} + -sourceHive2KerberosPrincipal + ${sourceHive2KerberosPrincipal} + -targetCluster + ${targetCluster} + -targetMetastoreUri + ${targetMetastoreUri} + -targetHiveServer2Uri + ${targetHiveServer2Uri} + -targetStagingPath + ${targetStagingPath} + -targetNN + ${targetNN} + -targetNNKerberosPrincipal + ${targetNNKerberosPrincipal} + -targetHiveMetastoreKerberosPrincipal + ${targetHiveMetastoreKerberosPrincipal} + -targetHive2KerberosPrincipal + ${targetHive2KerberosPrincipal} + -maxEvents + ${maxEvents} + -distcpMapBandwidth + ${distcpMapBandwidth} + -clusterForJobRun + ${clusterForJobRun} + -clusterForJobRunWriteEP + ${clusterForJobRunWriteEP} + -clusterForJobNNKerberosPrincipal + ${clusterForJobNNKerberosPrincipal} + -drJobName + ${drJobName}-${nominalTime} + -executionStage + export + + + + + + + + ${jobTracker} + ${nameNode} + + + oozie.launcher.mapreduce.job.user.classpath.first + true + + + mapred.job.queue.name + ${queueName} + + + oozie.launcher.mapred.job.priority + ${jobPriority} + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.java + distcp,hive,hive2,hcatalog + + + oozie.launcher.mapreduce.job.hdfs-servers + ${sourceNN},${targetNN} + + + mapreduce.job.hdfs-servers + ${sourceNN},${targetNN} + + + org.apache.falcon.hive.HiveDRTool + -Dmapred.job.queue.name=${queueName} + -Dmapred.job.priority=${jobPriority} + -falconLibPath + ${wf:conf("falcon.libpath")} + -replicationMaxMaps + ${replicationMaxMaps} + -distcpMaxMaps + ${distcpMaxMaps} + -sourceCluster + ${sourceCluster} + -sourceMetastoreUri + ${sourceMetastoreUri} + -sourceHiveServer2Uri + ${sourceHiveServer2Uri} + -sourceDatabase + ${sourceDatabase} + -sourceTable + ${sourceTable} + -sourceStagingPath + ${sourceStagingPath} + -sourceNN + ${sourceNN} + -sourceNNKerberosPrincipal + ${sourceNNKerberosPrincipal} + -sourceHiveMetastoreKerberosPrincipal + ${sourceHiveMetastoreKerberosPrincipal} + -sourceHive2KerberosPrincipal + ${sourceHive2KerberosPrincipal} + -targetCluster + ${targetCluster} + -targetMetastoreUri + ${targetMetastoreUri} + -targetHiveServer2Uri + ${targetHiveServer2Uri} + -targetStagingPath + ${targetStagingPath} + -targetNN + ${targetNN} + -targetNNKerberosPrincipal + ${targetNNKerberosPrincipal} + -targetHiveMetastoreKerberosPrincipal + ${targetHiveMetastoreKerberosPrincipal} + -targetHive2KerberosPrincipal + ${targetHive2KerberosPrincipal} + -maxEvents + ${maxEvents} + -distcpMapBandwidth + ${distcpMapBandwidth} + -clusterForJobRun + ${clusterForJobRun} + -clusterForJobRunWriteEP + ${clusterForJobRunWriteEP} + -clusterForJobNNKerberosPrincipal + ${clusterForJobNNKerberosPrincipal} + -drJobName + ${drJobName}-${nominalTime} + -executionStage + import + + + + + + + + ${drNotificationReceivers ne 'NA'} + + + + + + + + ${drNotificationReceivers ne 'NA'} + + + + + + + ${drNotificationReceivers} + INFO: Hive DR workflow ${drJobName} completed successfully + + The Hive DR workflow ${wf:id()} is successful. + Source = ${sourceCluster} + Target = ${targetCluster} + DB Name = ${sourceDatabase} + Table Name = ${sourceTable} + + + + + + + + ${drNotificationReceivers} + ERROR: Hive DR workflow ${drJobName} failed + + The Hive DR workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())} + Source = ${sourceCluster} + Target = ${targetCluster} + DB Name = ${sourceDatabase} + Table Name = ${sourceTable} + + + + + + + + Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties new file mode 100644 index 0000000..b2d670a --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties @@ -0,0 +1,104 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +##### NOTE: This is a TEMPLATE file which can be copied and edited + +##### Recipe properties +falcon.recipe.name=hive-disaster-recovery + + +##### Workflow properties +falcon.recipe.workflow.name=hive-dr-workflow +# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS +falcon.recipe.workflow.path=/recipes/hive-replication/hive-disaster-recovery-secure-workflow.xml + +##### Cluster properties + +# Change the cluster name where replication job should run here +falcon.recipe.cluster.name=backupCluster +# Change the cluster hdfs write end point here. This is mandatory. +falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020 +# Change the cluster validity start time here +falcon.recipe.cluster.validity.start=2014-10-01T00:00Z +# Change the cluster validity end time here +falcon.recipe.cluster.validity.end=2016-12-30T00:00Z +# Change the cluster namenode kerberos principal. This is mandatory on secure clusters. +falcon.recipe.nn.principal=nn/_HOST@EXAMPLE.COM + +##### Scheduling properties + +# Change the process frequency here. Valid frequency type are minutes, hours, days, months +falcon.recipe.process.frequency=minutes(60) + +##### Retry policy properties + +falcon.recipe.retry.policy=periodic +falcon.recipe.retry.delay=minutes(30) +falcon.recipe.retry.attempts=3 + +##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma +##### Uncomment to add tags +#falcon.recipe.tags=owner=landing,pipeline=adtech + +##### ACL properties - Uncomment and change ACL if authorization is enabled + +#falcon.recipe.acl.owner=testuser +#falcon.recipe.acl.group=group +#falcon.recipe.acl.permission=0x755 + +##### Custom Job properties + +##### Source Cluster DR properties +sourceCluster=primaryCluster +sourceMetastoreUri=thrift://localhost:9083 +sourceHiveServer2Uri=hive2://localhost:10000 +# For DB level replicaiton to replicate multiple databases specify comma separated list of tables +sourceDatabase=default +# For DB level replication specify * for sourceTable. +# For table level replication to replicate multiple tables specify comma separated list of tables +sourceTable=testtable_dr +sourceStagingPath=/apps/hive/tools/dr +sourceNN=hdfs://localhost:8020 +# Specify kerberos principal required to access source namenode and hive servers, optional on non-secure cluster. +sourceNNKerberosPrincipal=nn/_HOST@EXAMPLE.COM +sourceHiveMetastoreKerberosPrincipal=hive/_HOST@EXAMPLE.COM +sourceHive2KerberosPrincipal=hive/_HOST@EXAMPLE.COM + +##### Target Cluster DR properties +targetCluster=backupCluster +targetMetastoreUri=thrift://localhost:9083 +targetHiveServer2Uri=hive2://localhost:10000 +targetStagingPath=/apps/hive/tools/dr +targetNN=hdfs://localhost:8020 +# Specify kerberos principal required to access target namenode and hive servers, optional on non-secure cluster. +targetNNKerberosPrincipal=nn/_HOST@EXAMPLE.COM +targetHiveMetastoreKerberosPrincipal=hive/_HOST@EXAMPLE.COM +targetHive2KerberosPrincipal=hive/_HOST@EXAMPLE.COM + +# To ceil the max events processed each time job runs. Set it to max value depending on your bandwidth limit. +# Setting it to -1 will process all the events but can hog up the bandwidth. Use it judiciously! +maxEvents=-1 +# Change it to specify the maximum number of mappers for replication +replicationMaxMaps=5 +# Change it to specify the maximum number of mappers for DistCP +distcpMaxMaps=1 +# Change it to specify the bandwidth in MB for each mapper in DistCP +distcpMapBandwidth=100 + +##### Email on failure +drNotificationReceivers=NA \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml new file mode 100644 index 0000000..3afbef0 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml @@ -0,0 +1,44 @@ + + + + + + + + + + + + _falcon_mirroring_type=HIVE + + 1 + + LAST_ONLY + ##process.frequency## + UTC + + + + + + + + +