ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject git commit: Revert "AMBARI-6106. Customize the Hadoop metrics sink to write to MySQL store. (swagle)"
Date Tue, 24 Jun 2014 17:12:45 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-1.6.1 b66ca31e7 -> 4be63cbba


Revert "AMBARI-6106. Customize the Hadoop metrics sink to write to MySQL store. (swagle)"

This reverts commit ec0c81a9c00908be78c6c60e5fa99a4f7cb762ef.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4be63cbb
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4be63cbb
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4be63cbb

Branch: refs/heads/branch-1.6.1
Commit: 4be63cbba8b998b03f9f315665103d96315053dc
Parents: b66ca31
Author: Siddharth Wagle <swagle@hortonworks.com>
Authored: Tue Jun 24 10:13:34 2014 -0700
Committer: Siddharth Wagle <swagle@hortonworks.com>
Committed: Tue Jun 24 10:13:34 2014 -0700

----------------------------------------------------------------------
 .../db/Hadoop-Metrics-MySQL-CREATE.ddl          | 460 -------------------
 .../db/Hadoop-Metrics-SQLServer-CREATE.ddl      |   6 +-
 contrib/ambari-scom/metrics-sink/pom.xml        |   2 -
 .../apache/hadoop/metrics2/sink/MySqlSink.java  |  45 --
 .../hadoop/metrics2/sink/MySqlSinkHadoop1.java  |  45 --
 .../hadoop/metrics2/sink/MySqlSinkHadoop2.java  |  44 --
 .../hadoop/metrics2/sink/SqlServerSink.java     | 284 +++++++++++-
 .../metrics2/sink/SqlServerSinkHadoop1.java     |   4 -
 .../metrics2/sink/SqlServerSinkHadoop2.java     |   4 -
 .../apache/hadoop/metrics2/sink/SqlSink.java    | 357 --------------
 10 files changed, 275 insertions(+), 976 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-MySQL-CREATE.ddl
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-MySQL-CREATE.ddl b/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-MySQL-CREATE.ddl
deleted file mode 100644
index 88bca41..0000000
--- a/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-MySQL-CREATE.ddl
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-/*
-Deployment script for HadoopMetrics
-*/
-
-delimiter ;
-
-# CREATE DATABASE `ambarimetrics` /*!40100 DEFAULT CHARACTER SET utf8 */;
-#
-# CREATE USER 'ambari' IDENTIFIED BY 'bigdata';
-
-# USE @schema;
-
-CREATE TABLE CompletedJob (ClusterNodeID INTEGER NOT NULL, TagSetID INTEGER NOT NULL, MapProgressPercent INTEGER NOT NULL, CleanupProgressPercent INTEGER NOT NULL, SetupProgressPercent INTEGER NOT NULL, ReduceProgressPercent INTEGER NOT NULL, RunState INTEGER NOT NULL, StartTime DATETIME NOT NULL, EndTime DATETIME NOT NULL, PRIMARY KEY(ClusterNodeID, TagSetID));
-create index IX_CompletedJob_EndTime on CompletedJob(EndTime);
-create index IX_CompletedJob_TagSetID on CompletedJob(TagSetID);
-
-CREATE TABLE Configuration (RequestedRefreshRate INTEGER NOT NULL);
-
-CREATE TABLE DatabaseVersion (Major INTEGER NOT NULL, Minor INTEGER NOT NULL, Build INTEGER NOT NULL, Revision INTEGER NOT NULL);
-
-CREATE TABLE MetricName (MetricID INTEGER NOT NULL AUTO_INCREMENT, Name VARCHAR(255), PRIMARY KEY(MetricID));
-create index IX_MetricName_Name on MetricName(Name);
-
-CREATE TABLE MetricPair (RecordID BIGINT NOT NULL, MetricID INTEGER NOT NULL, MetricValue VARCHAR(512) NOT NULL, PRIMARY KEY(RecordID, MetricID));
-
-CREATE TABLE MetricRecord (RecordID BIGINT NOT NULL AUTO_INCREMENT, RecordTypeID INTEGER NOT NULL, NodeID INTEGER NOT NULL, SourceIP VARCHAR(255), ClusterNodeID INTEGER NOT NULL, ServiceID INTEGER NOT NULL, TagSetID INTEGER NOT NULL, RecordTimestamp BIGINT NOT NULL, RecordDate DATETIME, PRIMARY KEY(RecordID));
-create index IX_MetricRecord_ClusterNodeID on MetricRecord(ClusterNodeID);
-create index IX_MetricRecord_NodeID_RecordID on MetricRecord(RecordID);
-create index IX_MetricRecord_NodeID_RecordTypeID_ClusterNodeID on MetricRecord(NodeID, RecordTypeID, ClusterNodeID);
-create index IX_MetricRecord_NodeID_TagSetID on MetricRecord(TagSetID);
-
-CREATE TABLE Service (ServiceID BIGINT NOT NULL AUTO_INCREMENT, Name VARCHAR(255), PRIMARY KEY(ServiceID));
-
-CREATE TABLE Node (NodeID INTEGER NOT NULL AUTO_INCREMENT, Name VARCHAR(255), LastKnownIP VARCHAR(255), LastNameNodeHeartBeat DATETIME, LastJobTrackerHeartBeat DATETIME, LastDataNodeHeartBeat DATETIME, LastTaskTrackerHeartBeat DATETIME, PRIMARY KEY(NodeID));
-create index IX_Node_Name on Node(Name);
-
-CREATE TABLE RecordType (RecordTypeID INTEGER NOT NULL AUTO_INCREMENT, Name VARCHAR(255), Context VARCHAR(255), PRIMARY KEY(RecordTypeID));
-create index IX_RecordType_Context_Name on RecordType(Context, Name);
-
-CREATE TABLE TagSet (TagSetID INTEGER NOT NULL AUTO_INCREMENT, TagPairs VARCHAR(512), PRIMARY KEY(TagSetID));
-create index IX_TagSet_TagPairs on TagSet(TagPairs);
-
-ALTER TABLE CompletedJob ADD CONSTRAINT FK_CompletedJob_TagSet_TagSetID FOREIGN KEY (TagSetID) REFERENCES TagSet (TagSetID) ON DELETE NO ACTION ON UPDATE NO ACTION;
-ALTER TABLE MetricPair ADD CONSTRAINT FK_MetricPair_MetricName_MetricID FOREIGN KEY (MetricID) REFERENCES MetricName (MetricID) ON DELETE NO ACTION ON UPDATE NO ACTION;
-ALTER TABLE MetricPair ADD CONSTRAINT FK_MetricPair_MetricRecord_RecordID FOREIGN KEY (RecordID) REFERENCES MetricRecord (RecordID) ON DELETE NO ACTION ON UPDATE NO ACTION;
-ALTER TABLE MetricRecord ADD CONSTRAINT FK_MetricRecord_Node_NodeID FOREIGN KEY (NodeID) REFERENCES Node (NodeID) ON DELETE NO ACTION ON UPDATE NO ACTION;
-ALTER TABLE MetricRecord ADD CONSTRAINT FK_MetricRecord_RecordType_RecordTypeID FOREIGN KEY (RecordTypeID) REFERENCES RecordType (RecordTypeID) ON DELETE NO ACTION ON UPDATE NO ACTION;
-ALTER TABLE MetricRecord ADD CONSTRAINT FK_MetricRecord_TagSet_TagSetID FOREIGN KEY (TagSetID) REFERENCES TagSet (TagSetID) ON DELETE NO ACTION ON UPDATE NO ACTION;
-
-DROP procedure IF EXISTS `uspInsertMetricValue`;
-
-DELIMITER $$
-
-CREATE DEFINER=`ambari`@`%` PROCEDURE `uspInsertMetricValue`(recordID bigint, metricName nvarchar(256), metricValue nvarchar(512))
-proc_label:BEGIN
-    DECLARE metricID INT;
-    DECLARE has_error INT DEFAULT 0;
-
-    DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET has_error = 1;
-
-    IF (recordID IS NULL OR metricName IS NULL) THEN
-      LEAVE proc_label;
-    END IF;
-
-    START TRANSACTION;
-		SELECT MetricID FROM MetricName WHERE `Name` = metricName INTO metricID;
-		IF (metricID IS NULL) THEN
-			INSERT INTO MetricName (`Name`) VALUES (metricName);
-			SELECT LAST_INSERT_ID() INTO metricID;
-			IF (has_error = 1) THEN
-				ROLLBACK;
-				LEAVE proc_label;
-			END IF;
-		END IF;
-		COMMIT;
-
-		INSERT INTO MetricPair (RecordID, MetricID, MetricValue) VALUES (recordID, metricID, metricValue);
-		LEAVE proc_label;
-
-	END;
-$$
-DELIMITER ;
-
-DROP procedure IF EXISTS `uspUpdateHeartBeats`;
-DELIMITER $$
-
-CREATE PROCEDURE uspUpdateHeartBeats(NodeID int, SourceIP varchar(256), NameNodeLast datetime, JobTrackerLast datetime,
-		DataNodeLast datetime, TaskTrackerLast datetime, LastKnownIP varchar(256))
-
-	BEGIN
-		IF (NodeID IS NOT NULL) THEN
-			IF (NameNodeLast IS NOT NULL) THEN
-				UPDATE Node as n SET n.LastNameNodeHeartBeat = NameNodeLast WHERE n.NodeID = NodeID;
-			END IF;
-			IF (JobTrackerLast IS NOT NULL) THEN
-				UPDATE Node as n SET n.LastJobTrackerHeartBeat = JobTrackerLast WHERE n.NodeID = NodeID;
-			END IF;
-			IF (DataNodeLast IS NOT NULL) THEN
-				UPDATE Node as n SET n.LastDataNodeHeartBeat = DataNodeLast WHERE n.NodeID = NodeID;
-			END IF;
-			IF (TaskTrackerLast IS NOT NULL) THEN
-				UPDATE Node as n SET n.LastTaskTrackerHeartBeat = TaskTrackerLast WHERE n.NodeID = NodeID;
-			END IF;
-			IF (LastKnownIP IS NULL OR SourceIP <> LastKnownIP) THEN
-				UPDATE Node as n SET n.LastKnownIP = SourceIP WHERE n.NodeID = NodeID;
-			END IF;
-		END IF;
-	END;
-$$
-
-DELIMITER ;
-
-
-DROP procedure IF EXISTS `uspGetMetricRecord`;
-DELIMITER $$
-
-CREATE DEFINER=`ambari`@`%` PROCEDURE `uspGetMetricRecord`(
-		recordTypeContext varchar(255),
-		recordTypeName varchar(255),
-		nodeName varchar(255),
-		sourceIP varchar(255),
-		clusterNodeName varchar(255),
-		serviceName varchar(255),
-		tagPairs varchar(512),
-		recordTimestamp bigint,
-		OUT metricRecordID bigint)
-proc_label: BEGIN
-
-		DECLARE recordTypeID int;
-		DECLARE nodeID int;
-		DECLARE clusterNodeID int;
-		DECLARE tagSetID int;
-		DECLARE serviceID int;
-		DECLARE err int default 0;
-		DECLARE recordIDCutoff bigint;
-
-		DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET err = 1;
-
-		START TRANSACTION;
-		SELECT max(r.RecordTypeID) INTO recordTypeID FROM RecordType as r WHERE r.`Context` = recordTypeContext AND r.`Name` = recordTypeName;
-
-		IF (recordTypeID IS NULL OR recordTypeID = 0) THEN
-				INSERT INTO RecordType (`Context`, `Name`) VALUES (recordTypeContext, recordTypeName);
-				SELECT LAST_INSERT_ID() INTO recordTypeID;
-				IF (err <> 0) THEN
-					ROLLBACK;
-					SET metricRecordID = NULL;
-					LEAVE proc_label;
-				END IF;
-		END IF;
-		COMMIT;
-
-		START TRANSACTION;
-		SELECT max(s.serviceID) INTO serviceID FROM Service as s WHERE s.`Name` = serviceName;
-		IF (serviceID IS NULL OR serviceID  = 0) THEN
-				INSERT INTO Service (`Name`) VALUES (serviceName);
-				SELECT LAST_INSERT_ID() INTO serviceID;
-				IF (err <> 0) THEN
-					ROLLBACK;
-					SET metricRecordID = NULL;
-					LEAVE proc_label;
-				END IF;
-		END IF;
-		COMMIT;
-
-		START TRANSACTION;
-		SELECT max(n.NodeID) INTO nodeID FROM Node as n WHERE n.`Name` = nodeName;
-
-		IF (nodeID IS NULL OR nodeID  = 0) THEN
-			/* Start with a node type of uninitialized.  HealthNode will determine node type based on metrics delivered over time. */
-				INSERT INTO Node (`Name`, LastKnownIP) VALUES (nodeName, sourceIP);
-				SELECT LAST_INSERT_ID() INTO nodeID;
-				IF (err <> 0) THEN
-					ROLLBACK;
-					SET metricRecordID = NULL;
-					LEAVE proc_label;
-				END IF;
-		END IF;
-		COMMIT;
-
-		-- Do our best to determine the cluster node ID based on completely flakey input from user which might be an IP address, a non-FQDN,
-		-- or an FQDN.  Note that worker nodes may have a completely different idea about the name of the namenode (which is the node
-		-- which represents the cluster) compared with the namenode itself
-
-		START TRANSACTION;
-		IF ((SELECT ufnIsIPAddress(clusterNodeName)) = 1) THEN
-			SELECT n.NodeID from Node as n WHERE n.LastKnownIP = clusterNodeName ORDER BY n.LastNameNodeHeartBeat DESC limit 1 INTO clusterNodeID;
-			IF (clusterNodeID IS NULL) THEN
-				INSERT INTO Node (`Name`, LastKnownIP) VALUES (clusterNodeName, sourceIP);
-				SELECT LAST_INSERT_ID() INTO clusterNodeID;
-				IF (err <> 0) THEN
-					ROLLBACK;
-					SET metricRecordID = NULL;
-					LEAVE proc_label;
-				END IF;
-			END IF;
-		ELSEIF ((SELECT LOCATE('.', clusterNodeName, 1)) > 0) THEN
-			-- IF this is not an IP address, but there is a dot in the name we assume we are looking at an FQDN
-			SELECT max(n.NodeID) FROM Node as n WHERE n.`Name` = clusterNodeName INTO clusterNodeID;
-			IF (clusterNodeID IS NULL OR clusterNodeID = 0) THEN
-				INSERT INTO Node (`Name`, LastKnownIP) VALUES (clusterNodeName, sourceIP);
-				SELECT LAST_INSERT_ID() INTO clusterNodeID;
-				IF (err <> 0) THEN
-					ROLLBACK;
-					SET metricRecordID = NULL;
-					LEAVE proc_label;
-				END IF;
-			END IF;
-		ELSE
-		BEGIN
-			-- We have got a non-FQDN, but the NameNode might know its FQDN, so be careful! We must prefer the FQDN if we can find one.
-			-- Sadly, yes, this could break things if we are monitoring clusters from different domains.  This is now by design!
-			SELECT n.NodeID FROM Node as n WHERE n.`Name` LIKE CONCAT(clusterNodeName, '%') ORDER BY n.LastNameNodeHeartBeat DESC limit 1 INTO clusterNodeID;
-			IF (clusterNodeID IS NULL) THEN
-				SELECT n.NodeID FROM Node as n WHERE n.`Name` = clusterNodeName INTO clusterNodeID;
-				if (clusterNodeID IS NULL) THEN
-					INSERT INTO Node (`Name`, LastKnownIP) VALUES (clusterNodeName, sourceIP);
-					SELECT LAST_INSERT_ID() INTO clusterNodeID;
-					IF (err <> 0) THEN
-						ROLLBACK;
-						SET metricRecordID = NULL;
-						LEAVE proc_label;
-					END IF;
-				END IF;
-			END IF;
-		END;
-		END IF;
-		COMMIT;
-
-		-- Cleanup older metric records and pairs if necessary
-		-- Policy is to keep between 60000 and 90000 metric records and associated metric pairs per node.
-		IF ((SELECT COUNT(*) FROM MetricRecord as mr WHERE mr.NodeID = nodeID) > 90000) THEN
-			SELECT MIN(mr.RecordID) FROM MetricRecord as mr WHERE mr.RecordID IN (select * from (SELECT r.RecordID FROM MetricRecord as r WHERE r.NodeID = nodeID ORDER BY r.RecordDate DESC limit 60000) as records) INTO recordIDCutoff;
-			IF (recordIDCutoff IS NOT NULL) THEN
-				DELETE FROM MetricPair WHERE RecordID IN (
-				SELECT RecordID FROM MetricPair as mp
-				JOIN MetricRecord as mr ON mp.RecordID = mr.RecordID
-				WHERE mr.RecordID < @recordIDCutoff AND mr.NodeID = @nodeID);
-
-				DELETE FROM MetricRecord
-				WHERE RecordID < recordIDCutoff AND NodeID = nodeID;
-			END IF;
-		END IF;
-
-
-		START TRANSACTION;
-		SELECT max(t.TagSetID) FROM TagSet as t WHERE t.TagPairs = tagPairs INTO tagSetID;
-		IF (tagSetID IS NULL OR tagSetID = 0) THEN
-				INSERT INTO TagSet (TagPairs) VALUES (tagPairs);
-				SELECT LAST_INSERT_ID() INTO tagSetID;
-				IF (err <> 0) THEN
-					ROLLBACK;
-					SET metricRecordID = NULL;
-					LEAVE proc_label;
-				END IF;
-		END IF;
-		COMMIT;
-
-		START TRANSACTION;
-		SELECT max(mr.RecordID) FROM MetricRecord as mr WHERE mr.RecordTypeID = recordTypeID AND mr.NodeID = nodeID AND mr.ServiceID = serviceID AND mr.TagSetID = tagSetID AND mr.RecordTimestamp = recordTimestamp INTO metricRecordID;
-		IF (metricRecordID IS NULL OR metricRecordID = 0) THEN
-			-- insert into temp_log values (CONCAT(metricRecordID, ', ', RecordTimestamp));
-			INSERT INTO MetricRecord (RecordTypeID, NodeID, SourceIP, ClusterNodeID, ServiceID, TagSetID, RecordTimestamp) VALUES (recordTypeID, nodeID, sourceIP, clusterNodeID, serviceID, tagSetID, recordTimestamp);
-			SELECT LAST_INSERT_ID() INTO metricRecordID;
-			IF (err <> 0) THEN
-				ROLLBACK;
-				SET metricRecordID = NULL;
-				LEAVE proc_label;
-			END IF;
-		END IF;
-		COMMIT;
-
-	END;
-$$
-
-DELIMITER ;
-
-DELIMITER $$
-
-CREATE DEFINER=`ambari`@`%` FUNCTION `ufnIsIPAddress`(inputString varchar(1024)) RETURNS tinyint(4)
-BEGIN
-	DECLARE currentPos bigint default 1;
-	DECLARE nextPos bigint default 0;
-	DECLARE count int default 0;
-
-	if (CHAR_LENGTH(inputString) = 0) THEN
-		RETURN 0;
-	END IF;
-
-	SELECT LOCATE('.', inputString, currentPos) INTO nextPos;
-
-	while_label: WHILE (nextPos < CHAR_LENGTH(inputString) AND count < 4) DO
-		IF (nextPos = 0) THEN
-			SET nextPos = CHAR_LENGTH(inputString);
-		END IF;
-		IF ((SELECT SUBSTRING(inputString, currentPos, nextPos - currentPos) REGEXP '[0-9]+') = 1) THEN
-			SET count = count + 1;
-			SET currentPos = nextPos;
-			SELECT LOCATE(inputString, '.', currentPos + 1) INTO nextPos;
-		ELSE
-			LEAVE while_label;
-		END IF;
-	END WHILE;
-
-	IF (count = 4) THEN
-		RETURN 1;
-	END IF;
-
-	SET currentPos = 1;
-	SET nextPos = 0;
-	SET count = 0;
-
-	WHILE (currentPos <= CHAR_LENGTH(inputString)) DO
-		IF ((SELECT SUBSTRING(inputString, currentPos, 1) REGEXP '[0-9A-Fa-f:]') = 1) THEN
-			IF (SUBSTRING(inputString, currentPos, 1) = ':') THEN
-				SET count = count + 1;
-			END IF;
-			SET currentPos = currentPos + 1;
-		ELSE RETURN 0;
-		END IF;
-	END WHILE;
-	IF (count >= 4) THEN
-		RETURN 1;
-	END IF;
-
-	RETURN 0;
-END;
-$$
-
-DELIMITER ;
-
-DROP procedure IF EXISTS `uspPurgeMetrics`;
-
-DELIMITER $$
-CREATE PROCEDURE uspPurgeMetrics(noOfDays bigint)
-	proc_label: BEGIN
-
-		DECLARE recordIDCutOff BIGINT;
-		DECLARE has_error INT default 0;
-		DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET has_error = 1;
-
-		IF (noOfDays IS NULL OR noOfDays < 1) THEN
-			LEAVE proc_label;
-		END IF;
-
-		SELECT MAX(RecordID) FROM MetricRecord WHERE DateDiff(RecordDate, CURRENT_TIMESTAMP) >= noOfDays INTO recordIDCutoff;
-
-		IF (recordIDCutoff IS NOT NULL) THEN
-
-			START TRANSACTION;
-
-			DELETE FROM MetricPair WHERE RecordID <= recordIDCutoff;
-			DELETE FROM MetricRecord WHERE RecordID <= recordIDCutoff;
-
-			IF (has_error <> 0) then
-				ROLLBACK;
-				LEAVE proc_label;
-			END IF;
-
-			COMMIT;
-		END IF;
-  END;
-$$
-
-DELIMITER ;
-
-DROP procedure IF EXISTS `ufGetMetrics`;
-DELIMITER $$
-
-CREATE DEFINER=`ambari`@`%` PROCEDURE `ufGetMetrics`(startTimeStamp bigint,
-		 endTimeStamp bigint,
-		 recordTypeContext VARCHAR(256),
-		 recordTypeName VARCHAR(256),
-		 metricName VARCHAR(256),
-		 serviceComponentName VARCHAR(256),
-		 nodeName VARCHAR(256)
-		)
-BEGIN
-		SELECT * from (
-			SELECT  s.RecordTimeStamp AS RecordTimeStamp,
-					mp.MetricValue AS MetricValue
-			FROM MetricPair mp
-			INNER JOIN (SELECT	mr.RecordID AS RecordID,
-								mr.RecordTimeStamp AS RecordTimeStamp
-						FROM MetricRecord mr
-						INNER JOIN RecordType rt ON (mr.RecordTypeId = rt.RecordTypeId)
-						INNER JOIN Node nd ON (mr.NodeID = nd.NodeID)
-						INNER JOIN Service sr ON (mr.ServiceID = sr.ServiceID)
-						WHERE rt.Context = recordTypeContext
-						AND rt.Name = recordTypeName
-						AND (nd.Name = nodeName)
-						AND (sr.Name = serviceComponentName)
-						AND mr.RecordTimestamp >= startTimeStamp
-						AND mr.RecordTimestamp <= endTimeStamp
-						) s ON (mp.RecordID = s.RecordID)
-			INNER JOIN MetricName mn ON (mp.MetricID = mn.MetricID)
-			WHERE (mn.Name = metricName)
-		) as mp_table;
-END;
-$$
-
-DELIMITER ;
-
-DROP procedure IF EXISTS `ufGetAggregatedServiceMetrics`;
-DELIMITER $$
-
-CREATE PROCEDURE ufGetAggregatedServiceMetrics(
-		startTimeStamp bigint,
-		endTimeStamp bigint,
-		recordTypeContext NVARCHAR(256),
-		recordTypeName NVARCHAR(256),
-		metricName NVARCHAR(256),
-		serviceComponentName NVARCHAR(256),
-		period integer
-		)
-BEGIN
-		SELECT * FROM
-		(
-			SELECT FLOOR ((mr.RecordTimeStamp - startTimeStamp) / period) TimeStampBlock, MAX(mr.RecordTimeStamp) RecordTimeStamp,  SUM(CAST(MetricValue AS DECIMAL(18,4))) AggMetricValue
-			FROM MetricPair mp
-			INNER JOIN MetricRecord mr ON (mp.RecordID = mr.RecordID)
-			INNER JOIN RecordType rt ON (rt.RecordTypeID = mr.RecordTypeID)
-			INNER JOIN MetricName mn ON (mn.MetricID = mp.MetricID)
-			INNER JOIN Service sr ON (sr.ServiceID = mr.ServiceID)
-			WHERE mr.RecordTimestamp >= startTimeStamp
-			AND mr.RecordTimestamp <= endTimeStamp
-			AND mn.Name = metricName
-			AND rt.Context = recordTypeContext
-			AND rt.Name = recordTypeName
-			AND sr.Name = serviceComponentName
-			GROUP BY FLOOR ((mr.RecordTimeStamp - startTimeStamp) / period)
-		) as mp_table;
-END;
-$$
-
-DELIMITER ;
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-SQLServer-CREATE.ddl
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-SQLServer-CREATE.ddl b/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-SQLServer-CREATE.ddl
index 7561960..b0ee7b7 100644
--- a/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-SQLServer-CREATE.ddl
+++ b/contrib/ambari-scom/metrics-sink/db/Hadoop-Metrics-SQLServer-CREATE.ddl
@@ -550,10 +550,10 @@ BEGIN
 			SELECT @recordIDCutoff = MIN(RecordID) FROM MetricRecord WHERE RecordID IN (SELECT TOP 60000 RecordID FROM MetricRecord WHERE NodeID = @nodeID ORDER BY RecordDate DESC);
 			IF @recordIDCutoff IS NOT NULL
 			BEGIN
-				DELETE FROM MetricPair WHERE RecordID IN (
-				SELECT RecordID FROM MetricPair as mp
+				DELETE FROM MetricPair 
+				FROM MetricPair as mp
 				JOIN MetricRecord as mr ON mp.RecordID = mr.RecordID
-				WHERE mr.RecordID < @recordIDCutoff AND mr.NodeID = @nodeID);
+				WHERE mr.RecordID < @recordIDCutoff AND mr.NodeID = @nodeID;
 
 				DELETE FROM MetricRecord
 				WHERE RecordID < @recordIDCutoff AND NodeID = @nodeID;

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/pom.xml b/contrib/ambari-scom/metrics-sink/pom.xml
index c7c2525..7c2706a 100644
--- a/contrib/ambari-scom/metrics-sink/pom.xml
+++ b/contrib/ambari-scom/metrics-sink/pom.xml
@@ -70,7 +70,6 @@
                         <configuration>
                             <excludes>
                                 <exclude>**/SqlServerSinkHadoop2.java</exclude>
-                                <exclude>**/MySqlSinkHadoop2.java</exclude>
                             </excludes>
                             <testExcludes>
                                 <exclude>**/SqlServerSinkHadoop2Test.java</exclude>
@@ -100,7 +99,6 @@
                         <configuration>
                             <excludes>
                                 <exclude>**/SqlServerSinkHadoop1.java</exclude>
-                                <exclude>**/MySqlSinkHadoop1.java</exclude>
                             </excludes>
                             <testExcludes>
                                 <exclude>**/SqlServerSinkHadoop1Test.java</exclude>

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSink.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSink.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSink.java
deleted file mode 100644
index 8150d28..0000000
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSink.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink;
-
-import java.lang.Override;
-
-/**
- * This class stores published metrics to the MySql database.
- */
-public abstract class MySqlSink extends SqlSink {
-  public MySqlSink(String NAMENODE_URL_KEY, String DFS_BLOCK_SIZE_KEY) {
-    super(NAMENODE_URL_KEY, DFS_BLOCK_SIZE_KEY);
-  }
-
-  @Override
-  protected String getInsertMetricsProcedureName() {
-    return SqlSink.insertMetricProc;
-  }
-
-  @Override
-  protected String getGetMetricsProcedureName() {
-    return SqlSink.getMetricRecordProc;
-  }
-
-  @Override
-  protected String getDatabaseDriverClassName() {
-    return "com.mysql.jdbc.Driver";
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSinkHadoop1.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSinkHadoop1.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSinkHadoop1.java
deleted file mode 100644
index a17b30c..0000000
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSinkHadoop1.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink;
-
-import org.apache.hadoop.metrics2.Metric;
-import org.apache.hadoop.metrics2.MetricsRecord;
-
-public class MySqlSinkHadoop1 extends MySqlSink {
-  public MySqlSinkHadoop1() {
-    super(SqlSink.HADOOP1_NAMENODE_URL_KEY, SqlSink.HADOOP1_DFS_BLOCK_SIZE_KEY);
-  }
-
-  @Override
-  public void putMetrics(MetricsRecord record) {
-    long metricRecordID = getMetricRecordID(record.context(), record.name(),
-      getLocalNodeName(), getLocalNodeIPAddress(), getClusterNodeName(), getCurrentServiceName(),
-      getTagString(record.tags()), record.timestamp());
-    if (metricRecordID < 0)
-      return;
-
-    for (Metric metric : record.metrics()) {
-      insertMetricValue(metricRecordID, metric.name(), String.valueOf(metric
-        .value()));
-      if (metric.name().equals("BlockCapacity")) {
-        insertMetricValue(metricRecordID, "BlockSize", Integer
-          .toString(getBlockSize()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSinkHadoop2.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSinkHadoop2.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSinkHadoop2.java
deleted file mode 100644
index b023746..0000000
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/MySqlSinkHadoop2.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink;
-
-import org.apache.hadoop.metrics2.AbstractMetric;
-import org.apache.hadoop.metrics2.MetricsRecord;
-
-public class MySqlSinkHadoop2 extends MySqlSink {
-  public MySqlSinkHadoop2() {
-    super(SqlSink.HADOOP2_NAMENODE_URL_KEY, SqlSink.HADOOP2_DFS_BLOCK_SIZE_KEY);
-  }
-
-  @Override
-  public void putMetrics(MetricsRecord record) {
-    long metricRecordID = getMetricRecordID(record.context(), record.name(),
-      getLocalNodeName(), getLocalNodeIPAddress(), getClusterNodeName(), getCurrentServiceName(),
-      getTagString(record.tags()), record.timestamp());
-    if (metricRecordID < 0)
-      return;
-
-    for (AbstractMetric metric : record.metrics()) {
-      insertMetricValue(metricRecordID, metric.name(), String.valueOf(metric.value()));
-      if (metric.name().equals("BlockCapacity")) {
-        insertMetricValue(metricRecordID, "BlockSize", Integer
-          .toString(getBlockSize()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
index 80aee7e..3e5b70c 100644
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
@@ -18,28 +18,288 @@
 
 package org.apache.hadoop.metrics2.sink;
 
-import java.lang.Override;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.log4j.Logger;
+
+import java.net.InetAddress;
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * This class stores published metrics to the SQL Server database.
  */
-public abstract class SqlServerSink extends SqlSink {
-  public SqlServerSink(String NAMENODE_URL_KEY, String DFS_BLOCK_SIZE_KEY) {
-    super(NAMENODE_URL_KEY, DFS_BLOCK_SIZE_KEY);
-  }
+public abstract class SqlServerSink implements MetricsSink {
+  private static final String DATABASE_URL_KEY = "databaseUrl";
+  private static final boolean DEBUG = true;
+  private static final String NAME_URL_KEY = "fs.default.name";
+  private static final Pattern NAME_URL_REGEX = Pattern.compile(
+      "hdfs://([^ :/]*)", Pattern.CASE_INSENSITIVE);
+  private static final String DFS_BLOCK_SIZE_KEY = "dfs.block.size";
+  private int blockSize = -1;
+  private String currentServiceName = "";
+  private String databaseUrl;
+  private Connection conn = null;
+  StringBuilder tagsListBuffer = new StringBuilder();
+  String nodeName = null;
+  String nodeIPAddress = null;
+  org.apache.hadoop.conf.Configuration hadoopConfig = null;
+  String clusterName = "localhost";
+
+  static Logger logger = Logger.getLogger(SqlServerSink.class);
 
   @Override
-  protected String getInsertMetricsProcedureName() {
-    return "dbo." + SqlSink.insertMetricProc;
+  public void init(SubsetConfiguration conf) {
+    String nameNodeUrl;
+    String blockSizeString;
+
+    logger.info("Entering init");
+
+    currentServiceName = getFirstConfigPrefix(conf);
+
+    databaseUrl = conf.getString(DATABASE_URL_KEY);
+
+    if (databaseUrl == null)
+      throw new MetricsException(
+          "databaseUrl required in the metrics2 configuration for SqlServerSink.");
+
+    try {
+      Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+    } catch (ClassNotFoundException cnfe) {
+      throw new MetricsException(
+          "SqlServerSink requires the Microsoft JDBC driver for SQL Server.");
+    }
+
+    hadoopConfig = new org.apache.hadoop.conf.Configuration();
+    if (hadoopConfig != null) {
+      nameNodeUrl = hadoopConfig.get(NAME_URL_KEY);
+      if (nameNodeUrl != null) {
+        Matcher matcher = NAME_URL_REGEX.matcher(nameNodeUrl);
+        if (matcher.find()) {
+          clusterName = matcher.group(1);
+        }
+      }
+      blockSizeString = hadoopConfig.get(DFS_BLOCK_SIZE_KEY);
+      if (blockSizeString != null) {
+        try {
+          blockSize = Integer.parseInt(blockSizeString);
+        } catch (NumberFormatException nfe) {
+          // do nothing
+        }
+      }
+
+    }
+    logger.info("Exit init, cluster name = " + clusterName);
   }
 
-  @Override
-  protected String getGetMetricsProcedureName() {
-    return "dbo." + SqlSink.getMetricRecordProc;
+  private String getFirstConfigPrefix(SubsetConfiguration conf) {
+    while (conf.getParent() instanceof SubsetConfiguration) {
+      conf = (SubsetConfiguration) conf.getParent();
+    }
+    return conf.getPrefix();
   }
 
   @Override
-  protected String getDatabaseDriverClassName() {
-    return "com.microsoft.sqlserver.jdbc.SQLServerDriver";
+  public abstract void putMetrics(MetricsRecord record);
+
+  @Override
+  public void flush() {
+    try {
+      if (conn != null)
+        conn.close();
+    } catch (Exception e) {
+      // do nothing
+    }
+    conn = null;
+  }
+
+  public String getLocalNodeName() {
+    if (nodeName == null) {
+      try {
+        nodeName = InetAddress.getLocalHost().getCanonicalHostName();
+      } catch (Exception e) {
+        if (DEBUG)
+          logger.info("Error during getLocalHostName: " + e.toString());
+      }
+      if (nodeName == null)
+        nodeName = "Unknown";
+    }
+    return nodeName;
+  }
+
+  public String getClusterNodeName() {
+    if (clusterName.equalsIgnoreCase("localhost"))
+      return getLocalNodeName();
+    try {
+      return InetAddress.getByName(clusterName).getCanonicalHostName();
+    } catch (Exception e) {
+      if (DEBUG)
+        logger.info("Error during getClusterNodeName: " + e.toString());
+    }
+
+    return clusterName;
+  }
+
+  public String getLocalNodeIPAddress() {
+    if (nodeIPAddress == null) {
+      try {
+        nodeIPAddress = InetAddress.getLocalHost().getHostAddress();
+      } catch (Exception e) {
+        if (DEBUG)
+          logger.info("Error during getLocalNodeIPAddress: " + e.toString());
+      }
+    }
+    if (nodeIPAddress == null)
+      nodeIPAddress = "127.0.0.1";
+    return nodeIPAddress;
+  }
+
+  /*
+   *  TODO: Keep a cache of all tag strings, potentially caching the TagSetID.
+   *  Caching the TagSetID will require some new stored procedures and new DAL
+   *  methods.
+   */
+  public String getTagString(Iterable<MetricsTag> desiredTags) {
+    /*
+     * We don't return tags (even sourceName) if no tags available. Most likely,
+     * when tags are empty - we don't need to distinguish services
+     */
+    if (desiredTags == null)
+      return null;
+
+    tagsListBuffer.setLength(0);
+    tagsListBuffer.append("sourceName:").append(currentServiceName);
+    String separator = ",";
+    for (MetricsTag tag : desiredTags) {
+      tagsListBuffer.append(separator);
+      tagsListBuffer.append(tag.name());
+      tagsListBuffer.append(":");
+      tagsListBuffer.append(String.valueOf(tag.value()));
+    }
+
+    return tagsListBuffer.toString();
+  }
+
+  public boolean ensureConnection() {
+    if (conn == null) {
+      try {
+        if (databaseUrl != null) {
+          conn = DriverManager.getConnection(databaseUrl);
+        }
+      } catch (Exception e) {
+        if (DEBUG)
+          logger.info("Error during getConnection: " + e.toString());
+      }
+    }
+    return conn != null;
+  }
+
+  public long getMetricRecordID(String recordTypeContext,
+                                String recordTypeName, String nodeName, String sourceIP,
+                                String clusterName, String serviceName, String tagPairs, long recordTimestamp) {
+    CallableStatement cstmt = null;
+    long result;
+    if (recordTypeContext == null || recordTypeName == null || nodeName == null
+        || sourceIP == null || tagPairs == null)
+      return -1;
+
+    int colid = 1;
+    try {
+      if (ensureConnection()) {
+        cstmt = conn
+            .prepareCall("{call dbo.uspGetMetricRecord(?, ?, ?, ?, ?, ?, ?, ?, ?)}");
+        cstmt.setNString(colid++, recordTypeContext);
+        cstmt.setNString(colid++, recordTypeName);
+        cstmt.setNString(colid++, nodeName);
+        cstmt.setNString(colid++, sourceIP);
+        cstmt.setNString(colid++, clusterName);
+        cstmt.setNString(colid++, serviceName);
+        cstmt.setNString(colid++, tagPairs);
+        cstmt.setLong(colid++, recordTimestamp);
+        cstmt.registerOutParameter(colid, java.sql.Types.BIGINT);
+        cstmt.execute();
+        result = cstmt.getLong(colid);
+        if (cstmt.wasNull())
+          return -1;
+        return result;
+      }
+    } catch (Exception e) {
+      if (DEBUG)
+        logger.info("Error during getMetricRecordID call sproc: "
+            + e.toString());
+      flush();
+    } finally {
+      if (cstmt != null) {
+        try {
+          cstmt.close();
+        } catch (SQLException se) {
+          if (DEBUG)
+            logger.info("Error during getMetricRecordID close cstmt: "
+                + se.toString());
+        }
+        /*
+         * We don't close the connection here because we are likely to be
+         * writing
+         * metric values next and it is more efficient to share the connection.
+         */
+      }
+    }
+    return -1;
+  }
+
+  /* 
+   * TODO: Think about sending all of this in one SP call if JDBC supports table
+   * valued parameters.
+   */
+  public void insertMetricValue(long metricRecordID, String metricName,
+                                String metricValue) {
+    CallableStatement cstmt = null;
+    if (metricRecordID < 0 || metricName == null || metricValue == null)
+      return;
+    try {
+      if (ensureConnection()) {
+        cstmt = conn.prepareCall("{call dbo.uspInsertMetricValue(?, ?, ?)}");
+        cstmt.setLong(1, metricRecordID);
+        cstmt.setNString(2, metricName);
+        cstmt.setNString(3, metricValue);
+        cstmt.execute();
+      }
+    } catch (Exception e) {
+      if (DEBUG)
+        logger.info("Error during insertMetricValue call sproc: "
+            + e.toString());
+      flush();
+    } finally {
+      if (cstmt != null) {
+        try {
+          cstmt.close();
+        } catch (SQLException se) {
+          if (DEBUG)
+            logger.info("Error during insertMetricValue close cstmt: "
+                + se.toString());
+        }
+        /*
+         * We don't close the connection here because we are likely to be
+         * writing
+         * more metric values next and it is more efficient to share the
+         * connection.
+         */
+      }
+    }
+  }
+
+  public String getCurrentServiceName() {
+    return currentServiceName;
+  }
+
+  public int getBlockSize() {
+    return blockSize;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
index 538bf84..852fb81 100644
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
@@ -21,10 +21,6 @@ import org.apache.hadoop.metrics2.Metric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 
 public class SqlServerSinkHadoop1 extends SqlServerSink {
-  public SqlServerSinkHadoop1() {
-    super(SqlSink.HADOOP1_NAMENODE_URL_KEY, SqlSink.HADOOP1_DFS_BLOCK_SIZE_KEY);
-  }
-
   @Override
   public void putMetrics(MetricsRecord record) {
     long metricRecordID = getMetricRecordID(record.context(), record.name(),

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
index 2cd1b71..eae76a8 100644
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
@@ -21,10 +21,6 @@ import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 
 public class SqlServerSinkHadoop2 extends SqlServerSink {
-  public SqlServerSinkHadoop2() {
-    super(SqlSink.HADOOP2_NAMENODE_URL_KEY, SqlSink.HADOOP2_DFS_BLOCK_SIZE_KEY);
-  }
-
   @Override
   public void putMetrics(MetricsRecord record) {
     long metricRecordID = getMetricRecordID(record.context(), record.name(),

http://git-wip-us.apache.org/repos/asf/ambari/blob/4be63cbb/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlSink.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlSink.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlSink.java
deleted file mode 100644
index a41152d..0000000
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlSink.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink;
-
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.hadoop.metrics2.MetricsException;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.MetricsSink;
-import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.log4j.Logger;
-
-import java.lang.String;
-import java.net.InetAddress;
-import java.sql.CallableStatement;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * This class stores published metrics to the SQL Server database.
- */
-public abstract class SqlSink implements MetricsSink {
-  private static final String DATABASE_URL_KEY = "databaseUrl";
-  private static final boolean DEBUG = true;
-  private final String NAMENODE_URL_KEY;
-  private static final Pattern NAME_URL_REGEX = Pattern.compile(
-    "hdfs://([^ :/]*)", Pattern.CASE_INSENSITIVE);
-  private final String DFS_BLOCK_SIZE_KEY;
-  private int blockSize = -1;
-  private String currentServiceName = "";
-  private String databaseUrl;
-  private Connection conn = null;
-
-  StringBuilder tagsListBuffer = new StringBuilder();
-  String nodeName = null;
-  String nodeIPAddress = null;
-  org.apache.hadoop.conf.Configuration hadoopConfig = null;
-  String clusterName = "localhost";
-
-  static final String updateHeartBeatsProc = "uspUpdateHeartBeats";
-  static final String purgeMetricsProc = "uspPurgeMetrics";
-  static final String insertMetricProc = "uspInsertMetricValue";
-  static final String getMetricRecordProc = "uspGetMetricRecord";
-  static final String getMetricsProc = "ufGetMetrics";
-  static final String getAggregatedMetricsProc = "ufGetAggregatedServiceMetrics";
-
-  static Logger logger = Logger.getLogger(SqlServerSink.class);
-
-  static String HADOOP1_NAMENODE_URL_KEY = "fs.default.name";
-  static String HADOOP2_NAMENODE_URL_KEY = "fs.defaultFS";
-  static String HADOOP1_DFS_BLOCK_SIZE_KEY = "dfs.block.size";
-  static String HADOOP2_DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
-
-  public SqlSink(String NAMENODE_URL_KEY, String DFS_BLOCK_SIZE_KEY) {
-    this.NAMENODE_URL_KEY = NAMENODE_URL_KEY;
-    this.DFS_BLOCK_SIZE_KEY = DFS_BLOCK_SIZE_KEY;
-  }
-
-  @Override
-  public void init(SubsetConfiguration conf) {
-    String nameNodeUrl;
-    String blockSizeString;
-
-    logger.info("Entering init");
-
-    currentServiceName = getFirstConfigPrefix(conf);
-
-    databaseUrl = conf.getString(DATABASE_URL_KEY);
-
-    if (databaseUrl == null)
-      throw new MetricsException(
-        "databaseUrl required in the metrics2 configuration for SqlServerSink.");
-
-    try {
-      Class.forName(getDatabaseDriverClassName());
-    } catch (ClassNotFoundException cnfe) {
-      throw new MetricsException(
-        "SqlServerSink requires the Microsoft JDBC driver for SQL Server.");
-    }
-
-    hadoopConfig = new org.apache.hadoop.conf.Configuration();
-    if (hadoopConfig != null) {
-      nameNodeUrl = hadoopConfig.get(NAMENODE_URL_KEY);
-      if (nameNodeUrl != null) {
-        Matcher matcher = NAME_URL_REGEX.matcher(nameNodeUrl);
-        if (matcher.find()) {
-          clusterName = matcher.group(1);
-        }
-      }
-      blockSizeString = hadoopConfig.get(DFS_BLOCK_SIZE_KEY);
-      if (blockSizeString != null) {
-        try {
-          blockSize = Integer.parseInt(blockSizeString);
-          logger.info("blockSize = " + blockSize);
-        } catch (NumberFormatException nfe) {
-          logger.warn("Exception on init: ", nfe);
-        }
-      }
-
-    }
-    logger.info("Exit init, cluster name = " + clusterName);
-  }
-
-  private String getFirstConfigPrefix(SubsetConfiguration conf) {
-    while (conf.getParent() instanceof SubsetConfiguration) {
-      conf = (SubsetConfiguration) conf.getParent();
-    }
-    return conf.getPrefix();
-  }
-
-  @Override
-  public abstract void putMetrics(MetricsRecord record);
-
-  @Override
-  public void flush() {
-    try {
-      if (conn != null)
-        conn.close();
-    } catch (Exception e) {
-      // do nothing
-    }
-    conn = null;
-  }
-
-  public String getLocalNodeName() {
-    if (nodeName == null) {
-      try {
-        nodeName = InetAddress.getLocalHost().getCanonicalHostName();
-      } catch (Exception e) {
-        if (DEBUG)
-          logger.info("Error during getLocalHostName: " + e.toString());
-      }
-      if (nodeName == null)
-        nodeName = "Unknown";
-    }
-    return nodeName;
-  }
-
-  public String getClusterNodeName() {
-    if (clusterName.equalsIgnoreCase("localhost"))
-      return getLocalNodeName();
-    try {
-      return InetAddress.getByName(clusterName).getCanonicalHostName();
-    } catch (Exception e) {
-      if (DEBUG)
-        logger.info("Error during getClusterNodeName: " + e.toString());
-    }
-
-    return clusterName;
-  }
-
-  public String getLocalNodeIPAddress() {
-    if (nodeIPAddress == null) {
-      try {
-        nodeIPAddress = InetAddress.getLocalHost().getHostAddress();
-      } catch (Exception e) {
-        if (DEBUG)
-          logger.info("Error during getLocalNodeIPAddress: " + e.toString());
-      }
-    }
-    if (nodeIPAddress == null)
-      nodeIPAddress = "127.0.0.1";
-    return nodeIPAddress;
-  }
-
-  /*
-   *  TODO: Keep a cache of all tag strings, potentially caching the TagSetID.
-   *  Caching the TagSetID will require some new stored procedures and new DAL
-   *  methods.
-   */
-  public String getTagString(Iterable<MetricsTag> desiredTags) {
-    /*
-     * We don't return tags (even sourceName) if no tags available. Most likely,
-     * when tags are empty - we don't need to distinguish services
-     */
-    if (desiredTags == null)
-      return null;
-
-    tagsListBuffer.setLength(0);
-    tagsListBuffer.append("sourceName:").append(currentServiceName);
-    String separator = ",";
-    for (MetricsTag tag : desiredTags) {
-      tagsListBuffer.append(separator);
-      tagsListBuffer.append(tag.name());
-      tagsListBuffer.append(":");
-      tagsListBuffer.append(String.valueOf(tag.value()));
-    }
-
-    return tagsListBuffer.toString();
-  }
-
-  public boolean ensureConnection() {
-    if (conn == null) {
-      try {
-        if (databaseUrl != null) {
-          conn = DriverManager.getConnection(databaseUrl);
-        }
-      } catch (Exception e) {
-        logger.warn("Error during getConnection: " + e.toString());
-      }
-    }
-    return conn != null;
-  }
-
-  public long getMetricRecordID(String recordTypeContext,
-                                String recordTypeName, String nodeName, String sourceIP,
-                                String clusterName, String serviceName, String tagPairs, long recordTimestamp) {
-    CallableStatement cstmt = null;
-    long result;
-    logger.trace(
-      "Params: recordTypeContext = " + recordTypeContext
-        + ", recordTypeName = " + recordTypeName
-        + ", nodeName = " + nodeName
-        + ", sourceIP = " + sourceIP
-        + ", tagPairs = " + tagPairs
-        + ", clusterName = " + clusterName
-        + ", serviceName = " + serviceName
-        + ", recordTimestamp = " + recordTimestamp
-    );
-    if (recordTypeContext == null || recordTypeName == null || nodeName == null
-      || sourceIP == null || tagPairs == null)
-      return -1;
-
-    int colid = 1;
-    try {
-      if (ensureConnection()) {
-        String procedureCall =
-          String.format("{call %s(?, ?, ?, ?, ?, ?, ?, ?, ?)}",
-            getGetMetricsProcedureName());
-        cstmt = conn.prepareCall(procedureCall);
-        cstmt.setNString(colid++, recordTypeContext);
-        cstmt.setNString(colid++, recordTypeName);
-        cstmt.setNString(colid++, nodeName);
-        cstmt.setNString(colid++, sourceIP);
-        cstmt.setNString(colid++, clusterName);
-        cstmt.setNString(colid++, serviceName);
-        cstmt.setNString(colid++, tagPairs);
-        cstmt.setLong(colid++, recordTimestamp);
-        cstmt.registerOutParameter(colid, java.sql.Types.BIGINT);
-        cstmt.execute();
-
-        result = cstmt.getLong(colid);
-        if (cstmt.wasNull())
-          return -1;
-        return result;
-      }
-    } catch (Exception e) {
-      if (DEBUG)
-        logger.info("Error during getMetricRecordID call sproc: "
-          + e.toString());
-      flush();
-    } finally {
-      if (cstmt != null) {
-        try {
-          cstmt.close();
-        } catch (SQLException se) {
-          if (DEBUG)
-            logger.info("Error during getMetricRecordID close cstmt: "
-              + se.toString());
-        }
-        /*
-         * We don't close the connection here because we are likely to be
-         * writing
-         * metric values next and it is more efficient to share the connection.
-         */
-      }
-    }
-    return -1;
-  }
-
-  /*
-   * TODO: Think about sending all of this in one SP call if JDBC supports table
-   * valued parameters.
-   */
-  public void insertMetricValue(long metricRecordID, String metricName,
-                                String metricValue) {
-    CallableStatement cstmt = null;
-    if (metricRecordID < 0 || metricName == null || metricValue == null)
-      return;
-    try {
-      logger.trace("Insert metricRecordId : " + metricRecordID + ", " +
-        "metricName : " + metricName + ", metricValue : " + metricValue + ", " +
-        "procedure = " + getInsertMetricsProcedureName());
-      if (ensureConnection()) {
-        String procedureCall =
-          String.format("{call %s(?, ?, ?)}", getInsertMetricsProcedureName());
-        cstmt = conn.prepareCall(procedureCall);
-        cstmt.setLong(1, metricRecordID);
-        cstmt.setNString(2, metricName);
-        cstmt.setNString(3, metricValue);
-        cstmt.execute();
-      }
-    } catch (Exception e) {
-      if (DEBUG)
-        logger.info("Error during insertMetricValue call sproc: "
-          + e.toString());
-      flush();
-    } finally {
-      if (cstmt != null) {
-        try {
-          cstmt.close();
-        } catch (SQLException se) {
-          if (DEBUG)
-            logger.info("Error during insertMetricValue close cstmt: "
-              + se.toString());
-        }
-        /*
-         * We don't close the connection here because we are likely to be
-         * writing
-         * more metric values next and it is more efficient to share the
-         * connection.
-         */
-      }
-    }
-  }
-
-  public String getCurrentServiceName() {
-    return currentServiceName;
-  }
-
-  public int getBlockSize() {
-    return blockSize;
-  }
-
-  /**
-   * Return stored procedure to use.
-   */
-  protected abstract String getInsertMetricsProcedureName();
-
-  /**
-   * Return stored procedure to use.
-   */
-  protected abstract String getGetMetricsProcedureName();
-
-  /**
-   * Retrun the driver class name to load.
-   */
-  protected abstract String getDatabaseDriverClassName();
-}
\ No newline at end of file


Mime
View raw message