incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/6] BLUR-97: Repackaged the agent to org.apache
Date Wed, 22 May 2013 02:07:39 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/BlurDatabaseConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/BlurDatabaseConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/BlurDatabaseConnection.java
deleted file mode 100644
index 191ee4e..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/BlurDatabaseConnection.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package com.nearinfinity.agent.connections.blur;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.blur.thrift.generated.BlurQueryStatus;
-import org.apache.blur.thrift.generated.SimpleQuery;
-import org.apache.commons.lang.StringUtils;
-import org.json.simple.JSONValue;
-import org.springframework.dao.IncorrectResultSizeDataAccessException;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import com.nearinfinity.agent.connections.blur.interfaces.BlurDatabaseInterface;
-import com.nearinfinity.agent.exceptions.TableCollisionException;
-import com.nearinfinity.agent.exceptions.TableMissingException;
-import com.nearinfinity.agent.exceptions.ZookeeperNameCollisionException;
-import com.nearinfinity.agent.exceptions.ZookeeperNameMissingException;
-import com.nearinfinity.agent.types.TimeHelper;
-
-public class BlurDatabaseConnection implements BlurDatabaseInterface {
-
-	private final JdbcTemplate jdbc;
-
-	public BlurDatabaseConnection(JdbcTemplate jdbc) {
-		this.jdbc = jdbc;
-	}
-
-	@Override
-	public String resolveConnectionString(int zookeeperId) {
-		String queryString = "select distinct node_name from blur_controllers where zookeeper_id = ? and controller_status = 1";
-		List<String> controller_uris = jdbc.queryForList(queryString, new String[] { Integer.toString(zookeeperId) }, String.class);
-		String connection = StringUtils.join(controller_uris, ',');
-		this.jdbc.update("update zookeepers set blur_urls=? where id = ?", connection, zookeeperId);
-		return connection;
-	}
-
-	@Override
-	public String getZookeeperId(final String zookeeperName) throws ZookeeperNameMissingException, ZookeeperNameCollisionException {
-		List<Map<String, Object>> zookeepers = jdbc.queryForList("select id from zookeepers where name = ?", zookeeperName);
-		switch (zookeepers.size()) {
-		case 0:
-			throw new ZookeeperNameMissingException(zookeeperName);
-		case 1:
-			return zookeepers.get(0).get("ID").toString();
-		default:
-			throw new ZookeeperNameCollisionException(zookeepers.size(), zookeeperName);
-		}
-	}
-
-	@Override
-	public List<Map<String, Object>> getClusters(final int zookeeperId) {
-		return jdbc.queryForList("select id, name from clusters where zookeeper_id = ?", zookeeperId);
-	}
-
-	@Override
-	public Map<String, Object> getExistingTable(final String table, final Integer clusterId) throws TableMissingException,
-			TableCollisionException {
-		List<Map<String, Object>> existingTable = jdbc.queryForList(
-				"select id, cluster_id from blur_tables where table_name=? and cluster_id=?", table, clusterId);
-		switch (existingTable.size()) {
-		case 0:
-			throw new TableMissingException(table);
-		case 1:
-			return existingTable.get(0);
-		default:
-			throw new TableCollisionException(existingTable.size(), table);
-		}
-	}
-
-	@Override
-	public int getTableId(int clusterId, String tableName) {
-		try {
-			return jdbc.queryForInt("select id from blur_tables where cluster_id=? and table_name =?", clusterId, tableName);
-		} catch (IncorrectResultSizeDataAccessException e) {
-			return -1;
-		}
-	}
-
-	@Override
-	public void updateTableSchema(final int tableId, final String schema, final String tableAnalyzer) {
-		jdbc.update("update blur_tables set table_schema=?, table_analyzer=? where id=?", new Object[] { schema, tableAnalyzer, tableId });
-	}
-
-	@Override
-	public void updateTableServer(final int tableId, String server) {
-		jdbc.update("update blur_tables set server=? where id=?", new Object[] { server, tableId });
-	}
-
-	@Override
-	public void updateTableStats(final int tableId, Long tableBytes, Long tableQueries, Long tableRecordCount, Long tableRowCount) {
-		jdbc.update("update blur_tables set current_size=?, query_usage=?, record_count=?, row_count=? where id=?", new Object[] { tableBytes,
-				tableQueries, tableRecordCount, tableRowCount, tableId });
-	}
-
-	public Map<String, Object> getQuery(int tableId, long UUID) {
-		try {
-			return this.jdbc.queryForMap("select id, complete_shards, times, state from blur_queries where blur_table_id=? and uuid=?", tableId,
-					UUID);
-		} catch (IncorrectResultSizeDataAccessException e) {
-			return null;
-		}
-	}
-
-	public void createQuery(BlurQueryStatus status, SimpleQuery query, String times, Date startTime, int tableId) {
-		this.jdbc
-				.update(
-						"insert into blur_queries (query_string, times, complete_shards, total_shards, state, uuid, created_at, updated_at, blur_table_id, super_query_on, facets, start, fetch_num, pre_filters, post_filters, selector_column_families, selector_columns, userid, record_only) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
-						query.getQueryStr(),
-						times,
-						status.getCompleteShards(),
-						status.getTotalShards(),
-						status.getState().getValue(),
-						status.getUuid(),
-						startTime,
-						TimeHelper.now().getTime(),
-						tableId,
-						query.isSuperQueryOn(),
-						StringUtils.join(status.getQuery().getFacets(), ", "),
-						status.getQuery().getStart(),
-						status.getQuery().getFetch(),
-						query.getPreSuperFilter(),
-						query.getPostSuperFilter(),
-						status.getQuery().getSelector() == null ? null : JSONValue.toJSONString(status.getQuery().getSelector()
-								.getColumnFamiliesToFetch()),
-						status.getQuery().getSelector() == null ? null : JSONValue.toJSONString(status.getQuery().getSelector().getColumnsToFetch()),
-						status.getQuery().getUserContext(), status.getQuery().getSelector() == null ? null : status.getQuery().getSelector()
-								.isRecordOnly());
-	}
-
-	public void updateQuery(BlurQueryStatus status, String times, int queryId) {
-		jdbc.update("update blur_queries set times=?, complete_shards=?, total_shards=?, state=?, updated_at=? where id=?", times,
-				status.getCompleteShards(), status.getTotalShards(), status.getState().getValue(), TimeHelper.now().getTime(), queryId);
-	}
-
-	@Override
-	public List<Long> getRunningQueries() {
-		return this.jdbc.queryForList("select uuid from blur_queries where state = 0", Long.class);
-	}
-
-	@Override
-	public void markOrphanedRunningQueriesComplete(Collection<Long> queries) {
-		if (!queries.isEmpty()) {
-			this.jdbc.update("update blur_queries set state=3 where uuid in (" + StringUtils.join(queries, ", ") + ")");
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/BlurDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/BlurDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/BlurDatabaseInterface.java
deleted file mode 100644
index 679e903..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/BlurDatabaseInterface.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.nearinfinity.agent.connections.blur.interfaces;
-
-import java.util.List;
-import java.util.Map;
-
-import com.nearinfinity.agent.connections.blur.interfaces.TableDatabaseInterface;
-import com.nearinfinity.agent.exceptions.TableCollisionException;
-import com.nearinfinity.agent.exceptions.TableMissingException;
-import com.nearinfinity.agent.exceptions.ZookeeperNameCollisionException;
-import com.nearinfinity.agent.exceptions.ZookeeperNameMissingException;
-
-public interface BlurDatabaseInterface extends TableDatabaseInterface, QueryDatabaseInterface {
-	String resolveConnectionString(int zookeeperId);
-
-	String getZookeeperId(final String zookeeperName) throws ZookeeperNameMissingException, ZookeeperNameCollisionException;
-
-	List<Map<String, Object>> getClusters(final int zookeeperId);
-
-	Map<String, Object> getExistingTable(final String table, final Integer clusterId) throws TableMissingException, TableCollisionException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/QueryDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/QueryDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/QueryDatabaseInterface.java
deleted file mode 100644
index 7a16182..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/QueryDatabaseInterface.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.nearinfinity.agent.connections.blur.interfaces;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.blur.thrift.generated.BlurQueryStatus;
-import org.apache.blur.thrift.generated.SimpleQuery;
-
-public interface QueryDatabaseInterface {
-	Map<String, Object> getQuery(int tableId, long UUID);
-
-	List<Long> getRunningQueries();
-
-	void createQuery(BlurQueryStatus status, SimpleQuery query, String times, Date startTime, int tableId);
-
-	void updateQuery(BlurQueryStatus status, String times, int queryId);
-	
-	void markOrphanedRunningQueriesComplete(Collection<Long> queries);
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/TableDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/TableDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/TableDatabaseInterface.java
deleted file mode 100644
index dbc8e01..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/TableDatabaseInterface.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package com.nearinfinity.agent.connections.blur.interfaces;
-
-public interface TableDatabaseInterface {
-	int getTableId(int clusterId, String tableName);
-
-	void updateTableSchema(final int tableId, final String schema, final String tableAnalyzer);
-
-	void updateTableServer(final int tableId, final String server);
-
-	void updateTableStats(final int tableId, final Long tableBytes, final Long tableQueries, final Long tableRecordCount,
-			final Long tableRowCount);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/CleanerDatabaseConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/CleanerDatabaseConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/CleanerDatabaseConnection.java
deleted file mode 100644
index 6ba5555..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/CleanerDatabaseConnection.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.nearinfinity.agent.connections.cleaners;
-
-import java.util.Calendar;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import com.nearinfinity.agent.connections.cleaners.interfaces.CleanerDatabaseInterface;
-import com.nearinfinity.agent.types.TimeHelper;
-
-public class CleanerDatabaseConnection implements CleanerDatabaseInterface {
-	private final JdbcTemplate jdbc;
-
-	// Query Expiration Times
-	private final int expireThreshold = 2 * 60 * 1000;
-	private final int deleteThreshold = 2 * 60 * 60 * 1000;
-
-	// Hdfs Expiration Times
-	private final int timeToLive = 14 * 24 * 60 * 60 * 1000;
-
-	public CleanerDatabaseConnection(JdbcTemplate jdbc) {
-		this.jdbc = jdbc;
-	}
-
-	@Override
-	public int deleteOldQueries() {
-		Calendar twoHoursAgo = TimeHelper.getTimeAgo(deleteThreshold);
-		return this.jdbc.update("delete from blur_queries where created_at < ?", twoHoursAgo.getTime());
-	}
-
-	@Override
-	public int expireOldQueries() {
-		Calendar now = TimeHelper.now();
-		Calendar twoMinutesAgo = TimeHelper.getTimeAgo(expireThreshold);
-		return this.jdbc.update("update blur_queries set state=1, updated_at=? where updated_at < ? and state = 0", now.getTime(),
-				twoMinutesAgo);
-	}
-
-	@Override
-	public int deleteOldStats() {
-		Calendar ttlDaysAgo = TimeHelper.getTimeAgo(timeToLive);
-		return jdbc.update("delete from hdfs_stats where created_at < ?", ttlDaysAgo);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/CleanerDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/CleanerDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/CleanerDatabaseInterface.java
deleted file mode 100644
index fef657e..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/CleanerDatabaseInterface.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.nearinfinity.agent.connections.cleaners.interfaces;
-
-public interface CleanerDatabaseInterface extends HdfsDatabaseCleanerInterface, QueryDatabaseCleanerInterface {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/HdfsDatabaseCleanerInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/HdfsDatabaseCleanerInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/HdfsDatabaseCleanerInterface.java
deleted file mode 100644
index eeb7f48..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/HdfsDatabaseCleanerInterface.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.nearinfinity.agent.connections.cleaners.interfaces;
-
-public interface HdfsDatabaseCleanerInterface {
-	int deleteOldStats();
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/QueryDatabaseCleanerInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/QueryDatabaseCleanerInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/QueryDatabaseCleanerInterface.java
deleted file mode 100644
index 5d2b337..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/QueryDatabaseCleanerInterface.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.nearinfinity.agent.connections.cleaners.interfaces;
-
-public interface QueryDatabaseCleanerInterface {
-	int deleteOldQueries();
-
-	int expireOldQueries();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/HdfsDatabaseConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/HdfsDatabaseConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/HdfsDatabaseConnection.java
deleted file mode 100644
index 6e65620..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/HdfsDatabaseConnection.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.nearinfinity.agent.connections.hdfs;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.springframework.dao.IncorrectResultSizeDataAccessException;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import com.nearinfinity.agent.connections.hdfs.interfaces.HdfsDatabaseInterface;
-import com.nearinfinity.agent.exceptions.NullReturnedException;
-
-public class HdfsDatabaseConnection implements HdfsDatabaseInterface {
-	private final JdbcTemplate jdbc;
-
-	public HdfsDatabaseConnection(JdbcTemplate jdbc) {
-		this.jdbc = jdbc;
-	}
-
-	@Override
-	public void setHdfsInfo(String name, String host, int port) {
-		List<Map<String, Object>> existingHdfs = jdbc.queryForList("select id from hdfs where name=?", name);
-
-		if (existingHdfs.isEmpty()) {
-			jdbc.update("insert into hdfs (name, host, port) values (?, ?, ?)", name, host, port);
-		} else {
-			jdbc.update("update hdfs set host=?, port=? where id=?", host, port, existingHdfs.get(0).get("ID"));
-		}
-	}
-
-	@Override
-	public int getHdfsId(String name) throws NullReturnedException {
-		try {
-			return jdbc.queryForInt("select id from hdfs where name = ?", name);
-		} catch (IncorrectResultSizeDataAccessException e) {
-			return -1;
-		}
-	}
-
-	@Override
-	public void insertHdfsStats(long capacity, long presentCapacity, long remaining, long used, long logical_used, double d,
-			long underReplicatedBlocksCount, long corruptBlocksCount, long missingBlocksCount, long totalNodes, long liveNodes, long deadNodes,
-			Date time, String host, int port, int hdfsId) {
-		jdbc.update(
-				"insert into hdfs_stats (config_capacity, present_capacity, dfs_remaining, dfs_used_real, dfs_used_logical, dfs_used_percent, under_replicated, corrupt_blocks, missing_blocks, total_nodes, live_nodes, dead_nodes, created_at, host, port, hdfs_id) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
-				capacity, presentCapacity, remaining, used, logical_used, (((1.0 * used) / presentCapacity) * 100), underReplicatedBlocksCount,
-				corruptBlocksCount, missingBlocksCount, totalNodes, liveNodes, deadNodes, time, host, port, hdfsId);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/interfaces/HdfsDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/interfaces/HdfsDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/interfaces/HdfsDatabaseInterface.java
deleted file mode 100644
index 0d6abe2..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/interfaces/HdfsDatabaseInterface.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.nearinfinity.agent.connections.hdfs.interfaces;
-
-import java.util.Date;
-
-import com.nearinfinity.agent.exceptions.NullReturnedException;
-
-public interface HdfsDatabaseInterface {
-	void setHdfsInfo(String name, String host, int port);
-
-	int getHdfsId(String name) throws NullReturnedException;
-
-	void insertHdfsStats(long capacity, long presentCapacity, long remaining, long used, long logical_used, double d,
-			long underReplicatedBlocksCount, long corruptBlocksCount, long missingBlocksCount, long totalNodes, long liveNodes, long deadNodes,
-			Date time, String host, int port, int hdfsId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/ZookeeperDatabaseConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/ZookeeperDatabaseConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/ZookeeperDatabaseConnection.java
deleted file mode 100644
index cca9dae..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/ZookeeperDatabaseConnection.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.nearinfinity.agent.connections.zookeeper;
-
-import java.util.List;
-import org.apache.commons.lang.StringUtils;
-import org.springframework.dao.EmptyResultDataAccessException;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import com.nearinfinity.agent.connections.zookeeper.interfaces.ClusterDatabaseInterface;
-import com.nearinfinity.agent.connections.zookeeper.interfaces.ControllerDatabaseInterface;
-import com.nearinfinity.agent.connections.zookeeper.interfaces.ShardsDatabaseInterface;
-import com.nearinfinity.agent.connections.zookeeper.interfaces.TableDatabaseInterface;
-import com.nearinfinity.agent.connections.zookeeper.interfaces.ZookeeperDatabaseInterface;
-import com.nearinfinity.agent.types.TimeHelper;
-
-public class ZookeeperDatabaseConnection implements ZookeeperDatabaseInterface, ControllerDatabaseInterface, TableDatabaseInterface,
-		ShardsDatabaseInterface, ClusterDatabaseInterface {
-
-	private final JdbcTemplate jdbc;
-
-	public ZookeeperDatabaseConnection(JdbcTemplate jdbc) {
-		this.jdbc = jdbc;
-	}
-
-	@Override
-	public void setZookeeperOnline(int zookeeperId) {
-		this.jdbc.update("update zookeepers set zookeeper_status=? where id=?", 1, zookeeperId);
-	}
-
-	@Override
-	public void setZookeeperWarning(int zookeeperId) {
-		this.jdbc.update("update zookeepers set zookeeper_status=? where id=?", 2, zookeeperId);
-	}
-
-	@Override
-	public void setZookeeperFailure(int zookeeperId) {
-		this.jdbc.update("update zookeepers set zookeeper_status=? where id=?", 3, zookeeperId);
-	}
-
-	@Override
-	public void setZookeeperOffline(int zookeeperId) {
-		this.jdbc.update("update zookeepers set zookeeper_status=? where id=?", 0, zookeeperId);
-	}
-
-	@Override
-	public int insertOrUpdateZookeeper(String name, String url, String blurConnection) {
-		int updatedCount = jdbc.update("update zookeepers set url=? where name=?", url, name);
-
-		if (updatedCount == 0) {
-			jdbc.update("insert into zookeepers (name, url, blur_urls) values (?, ?, ?)", name, url, blurConnection);
-		}
-
-		return jdbc.queryForInt("select id from zookeepers where name = ?", name);
-	}
-
-	@Override
-	public int insertOrUpdateCluster(boolean safeMode, String cluster, int zookeeperId) {
-		int updateCount = this.jdbc.update("update clusters set safe_mode=? where name=? and zookeeper_id=?", safeMode, cluster, zookeeperId);
-		if (updateCount == 0) {
-			this.jdbc.update("insert into clusters (name, zookeeper_id, safe_mode) values (?, ?, ?)", cluster, zookeeperId, safeMode);
-		}
-		return this.jdbc.queryForInt("select id from clusters where name=? and zookeeper_id=?", cluster, zookeeperId);
-	}
-
-	@Override
-	public int markOfflineControllers(List<String> onlineControllers, int zookeeperId) {
-		if (onlineControllers.isEmpty()) {
-			return this.jdbc.update("update blur_controllers set controller_status=0, updated_at=? where zookeeper_id = ?", TimeHelper.now().getTime(),
-					zookeeperId);
-		} else {
-			return this.jdbc.update(
-					"update blur_controllers set controller_status=0, updated_at=? where controller_status!=0 and node_name not in ('"
-							+ StringUtils.join(onlineControllers, "','") + "') and zookeeper_id = ?", TimeHelper.now().getTime(), zookeeperId);
-		}
-	}
-
-	@Override
-	public int markOfflineShards(List<String> onlineShards, int clusterId) {
-		if (onlineShards.isEmpty()) {
-			return this.jdbc.update("update blur_shards set shard_status=0 updated_at=? where cluster_id = ?", TimeHelper.now().getTime(), clusterId);
-		} else {
-			return this.jdbc.update(
-					"update blur_shards set shard_status=0, updated_at=? where shard_status!=0 and node_name not in ('" + StringUtils.join(onlineShards, "','")
-							+ "') and cluster_id=?", TimeHelper.now().getTime(), clusterId);
-		}
-	}
-
-	@Override
-	public void markDeletedTables(List<String> onlineTables, int clusterId) {
-		if (onlineTables.isEmpty()) {
-			this.jdbc.update("delete from blur_tables where cluster_id=?", clusterId);
-		} else {
-			this.jdbc.update("delete from blur_tables where cluster_id=? and table_name not in ('"
-					+ StringUtils.join(onlineTables, "','") + "')", clusterId);
-		}
-	}
-
-	@Override
-	public void updateOnlineController(String controller, int zookeeperId, String blurVersion) {
-		int zookeeperStatus = this.jdbc.queryForInt("select zookeeper_status from zookeepers where id=?", zookeeperId);
-		int status = (zookeeperStatus == 0 || zookeeperStatus == 2) ? 2 : 1;
-		int updatedCount = this.jdbc.update(
-				"update blur_controllers set controller_status=?, blur_version=?, updated_at=? where node_name=? and zookeeper_id =?", status, blurVersion,
-				TimeHelper.now().getTime(), controller, zookeeperId);
-
-		if (updatedCount == 0) {
-			this.jdbc.update(
-					"insert into blur_controllers (node_name, controller_status, zookeeper_id, blur_version, created_at, updated_at) values (?, ?, ?, ?, ?, ?)",
-					controller, status, zookeeperId, blurVersion, TimeHelper.now().getTime(), TimeHelper.now().getTime());
-		}
-	}
-
-	@Override
-	public void updateOnlineShard(String shard, int clusterId, String blurVersion) {
-		int zookeeperStatus = this.jdbc.queryForInt(
-				"select zookeepers.zookeeper_status from zookeepers, clusters where clusters.id=? and clusters.zookeeper_id=zookeepers.id;",
-				clusterId);
-		int status = (zookeeperStatus == 0 || zookeeperStatus == 2) ? 2 : 1;
-		int updatedCount = this.jdbc.update("update blur_shards set shard_status=?, blur_version=?, updated_at=? where node_name=? and cluster_id=?",
-				status, blurVersion, TimeHelper.now().getTime(), shard, clusterId);
-
-		if (updatedCount == 0) {
-			this.jdbc.update(
-					"insert into blur_shards (node_name, shard_status, cluster_id, blur_version, created_at, updated_at) values (?, ?, ?, ?, ?, ?)", shard,
-					status, clusterId, blurVersion, TimeHelper.now().getTime(), TimeHelper.now().getTime());
-		}
-	}
-
-	@Override
-	public void updateOnlineTable(String table, int clusterId, String uri, boolean enabled) {
-		try{
-			int currentStatus = this.jdbc.queryForInt("select table_status from blur_tables where table_name=? and cluster_id=?", table, clusterId);
-			if (enabled && currentStatus != 3){
-				this.jdbc.update("update blur_tables set table_uri=?, table_status=?, updated_at=? where table_name=? and cluster_id=?",
-						uri, 4, TimeHelper.now().getTime(), table, clusterId);
-				
-			}
-			if (!enabled && currentStatus != 5) {
-				this.jdbc.update("update blur_tables set table_uri=?, table_status=?, updated_at=? where table_name=? and cluster_id=?",
-						uri, 2, TimeHelper.now().getTime(), table, clusterId);
-			}
-		} catch(EmptyResultDataAccessException e){
-			this.jdbc.update("insert into blur_tables (table_name, table_uri, table_status, cluster_id, updated_at) values (?, ?, ?, ?, ?)", table,
-					uri, (enabled ? 4 : 2), clusterId, TimeHelper.now().getTime());
-		}
-	}
-
-	@Override
-	public void setOnlineEnsembleNodes(String ensembleArray, int zookeeperId) {
-		this.jdbc.update("update zookeepers set online_ensemble_nodes=? where id=?", ensembleArray, zookeeperId);
-	}
-
-	@Override
-	public List<String> getRecentOfflineShardNames(int amount) {
-		return this.jdbc.queryForList("select node_name from blur_shards where shard_status=0 order by updated_at limit 0, " + amount, String.class);
-	}
-
-	@Override
-	public List<String> getRecentOfflineControllerNames(int amount) {
-		return this.jdbc.queryForList("select node_name from blur_controllers where controller_status=0 order by updated_at limit 0, " + amount,
-				String.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ClusterDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ClusterDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ClusterDatabaseInterface.java
deleted file mode 100644
index 80713ad..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ClusterDatabaseInterface.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.nearinfinity.agent.connections.zookeeper.interfaces;
-
-public interface ClusterDatabaseInterface extends ShardsDatabaseInterface, TableDatabaseInterface {
-
-	int insertOrUpdateCluster(boolean safeMode, String cluster, int zookeeperId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ControllerDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ControllerDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ControllerDatabaseInterface.java
deleted file mode 100644
index 6fd2831..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ControllerDatabaseInterface.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package com.nearinfinity.agent.connections.zookeeper.interfaces;
-
-import java.util.List;
-
-public interface ControllerDatabaseInterface {
-
-	int markOfflineControllers(List<String> onlineControllers, int zookeeperId);
-
-	void updateOnlineController(String controller, int zookeeperId, String blurVersion);
-
-	List<String> getRecentOfflineControllerNames(int amount);
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ShardsDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ShardsDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ShardsDatabaseInterface.java
deleted file mode 100644
index 7215cd4..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ShardsDatabaseInterface.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.nearinfinity.agent.connections.zookeeper.interfaces;
-
-import java.util.List;
-
-public interface ShardsDatabaseInterface {
-
-	int markOfflineShards(List<String> onlineShards, int clusterId);
-
-	void updateOnlineShard(String shard, int clusterId, String blurVersion);
-
-	List<String> getRecentOfflineShardNames(int amount);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/TableDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/TableDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/TableDatabaseInterface.java
deleted file mode 100644
index f7ede4a..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/TableDatabaseInterface.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.nearinfinity.agent.connections.zookeeper.interfaces;
-
-import java.util.List;
-
-public interface TableDatabaseInterface {
-
-	void markDeletedTables(List<String> onlineTables, int clusterId);
-
-	void updateOnlineTable(String table, int clusterId, String uri, boolean enabled);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ZookeeperDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ZookeeperDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ZookeeperDatabaseInterface.java
deleted file mode 100644
index 47e2ae1..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ZookeeperDatabaseInterface.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.nearinfinity.agent.connections.zookeeper.interfaces;
-
-public interface ZookeeperDatabaseInterface extends ControllerDatabaseInterface, ClusterDatabaseInterface {
-
-	void setZookeeperOnline(int id);
-
-	void setZookeeperWarning(int id);
-
-	void setZookeeperOffline(int id);
-
-  void setZookeeperFailure(int id);
-
-	int insertOrUpdateZookeeper(String name, String url, String blurConnection);
-
-	void setOnlineEnsembleNodes(String ensembleArray, int zookeeperId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/CollisionException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/CollisionException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/CollisionException.java
deleted file mode 100644
index 335217a..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/CollisionException.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-@SuppressWarnings("serial")
-public class CollisionException extends Exception {
-	public CollisionException(int size, String type, String name) {
-		super("Found [" + size + "] " + type + " by identifier [" + name + "].  Need one and only one result.  Skipping collection.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/HdfsThreadException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/HdfsThreadException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/HdfsThreadException.java
deleted file mode 100644
index 8de4997..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/HdfsThreadException.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-@SuppressWarnings("serial")
-public class HdfsThreadException extends Exception {
-	public HdfsThreadException() {
-		super();
-	}
-
-	public HdfsThreadException(String message) {
-		super(message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/InvalidLicenseException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/InvalidLicenseException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/InvalidLicenseException.java
deleted file mode 100644
index f8351e0..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/InvalidLicenseException.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-public class InvalidLicenseException extends Exception {
-	private static final long serialVersionUID = 1317409878802094298L;
-
-	public InvalidLicenseException(String message) {
-		super(message);
-	}
-
-	public InvalidLicenseException(String message, Throwable cause) {
-		super(message, cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/MissingException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/MissingException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/MissingException.java
deleted file mode 100644
index 917991b..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/MissingException.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-@SuppressWarnings("serial")
-public class MissingException extends Exception {
-	public MissingException(String type, String name) {
-		super("Couldn't Find a " + type + " by name [" + name + "].  Need a single result.  Skipping collection.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/NullReturnedException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/NullReturnedException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/NullReturnedException.java
deleted file mode 100644
index 7761462..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/NullReturnedException.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-@SuppressWarnings("serial")
-public class NullReturnedException extends Exception {
-	public NullReturnedException() {
-		super();
-	}
-
-	public NullReturnedException(String message) {
-		super(message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableCollisionException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableCollisionException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableCollisionException.java
deleted file mode 100644
index e174781..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableCollisionException.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-@SuppressWarnings("serial")
-public class TableCollisionException extends CollisionException {
-	public TableCollisionException(int size, String table) {
-		super(size, "Table", table);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableMissingException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableMissingException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableMissingException.java
deleted file mode 100644
index f98c7a3..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableMissingException.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-@SuppressWarnings("serial")
-public class TableMissingException extends MissingException {
-	public TableMissingException(String table) {
-		super("Table", table);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameCollisionException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameCollisionException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameCollisionException.java
deleted file mode 100644
index 6d1dff0..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameCollisionException.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-@SuppressWarnings("serial")
-public class ZookeeperNameCollisionException extends CollisionException {
-	public ZookeeperNameCollisionException(int size, String zookeeperName) {
-		super(size, "Zookeepers", zookeeperName);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameMissingException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameMissingException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameMissingException.java
deleted file mode 100644
index 61f4d3e..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameMissingException.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.nearinfinity.agent.exceptions;
-
-@SuppressWarnings("serial")
-public class ZookeeperNameMissingException extends MissingException {
-	public ZookeeperNameMissingException(String zookeeperName) {
-		super("Zookeeper", zookeeperName);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/monitor/ThreadController.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/monitor/ThreadController.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/monitor/ThreadController.java
deleted file mode 100644
index e2d46df..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/monitor/ThreadController.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.nearinfinity.agent.monitor;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-public class ThreadController {
-	private static Set<Thread> threads = new HashSet<Thread>();
-	public static boolean exitOnStop = true;
-
-	public static void registerThread(Thread thread) {
-		if (thread != null) {
-			threads.add(thread);
-		}
-	}
-
-	public static void stopAllThreads() {
-		Iterator<Thread> iterator = threads.iterator();
-		while (iterator.hasNext()) {
-			Thread next = iterator.next();
-			next.interrupt();
-			iterator.remove();
-		}
-
-		if (exitOnStop) {
-			System.exit(1);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/AgentMailerAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/AgentMailerAuthenticator.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/AgentMailerAuthenticator.java
deleted file mode 100644
index 047bf8c..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/AgentMailerAuthenticator.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.nearinfinity.agent.notifications;
-
-import javax.mail.Authenticator;
-import javax.mail.PasswordAuthentication;
-
-public class AgentMailerAuthenticator extends Authenticator {
-	private final PasswordAuthentication authentication;
-
-	public AgentMailerAuthenticator(String username, String password) {
-		this.authentication = new PasswordAuthentication(username, password);
-	}
-
-	protected PasswordAuthentication getPasswordAuthentication() {
-		return this.authentication;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Mailer.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Mailer.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Mailer.java
deleted file mode 100644
index 7548df4..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Mailer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package com.nearinfinity.agent.notifications;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Session;
-import javax.mail.Transport;
-import javax.mail.internet.AddressException;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class Mailer {
-	private static final Log log = LogFactory.getLog(Mailer.class);
-
-	private AgentMailerAuthenticator authenticator;
-	private String automatedSender;
-	private List<InternetAddress> recipients;
-	private Properties mailerProperties;
-	private boolean sendMail = false;
-
-	public Mailer(Properties props) {
-		if (props.containsKey("mail.enabled") && props.getProperty("mail.enabled").equals("true")) {
-			sendMail = true;
-			authenticator = new AgentMailerAuthenticator(props.getProperty("mail.sender.username"), props.getProperty("mail.sender.password"));
-			automatedSender = props.getProperty("mail.from.address", "DoNotReply");
-			setupRecipients(props.getProperty("mail.recipients", ""));			
-			
-			mailerProperties = new Properties();
-			mailerProperties.put("mail.transport.protocol", "smtp");
-			mailerProperties.put("mail.smtp.host", props.getProperty("mail.host", ""));
-			mailerProperties.put("mail.smtp.port", props.getProperty("mail.port", ""));
-			mailerProperties.put("mail.smtp.auth", "true");
-			mailerProperties.put("mail.smtp.starttls.enable", "true");
-		}
-	}
-
-	public void sendMessage(String subject, String messageBody) {
-		if (!sendMail) {
-			log.info("Mailer has been disabled.  No emails will be sent.");
-			return;
-		}
-		if (recipients.isEmpty()) {
-			log.warn("There were no recipients found to send mail to.  Skipping send mail.");
-			return;
-		}
-
-		Session session = Session.getInstance(this.mailerProperties, this.authenticator);
-		try {
-			MimeMessage message = new MimeMessage(session);
-			message.setFrom(new InternetAddress(this.automatedSender));
-			message.addRecipients(Message.RecipientType.TO, recipients.toArray(new InternetAddress[recipients.size()]));
-
-			message.setSubject(subject);
-			message.setContent(messageBody, "text/plain");
-
-			Transport.send(message);
-		} catch (MessagingException e) {
-			log.error("An error occured while sending a warning email.", e);
-		}
-	}
-	
-	private void setupRecipients(String recipientProp) {
-		recipients = new ArrayList<InternetAddress>();
-		for (String email : recipientProp.split("\\|")) {
-			try {
-				recipients.add(new InternetAddress(email));
-			} catch (AddressException e) {
-				log.warn("Invalid email address found.  Ignoring.", e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Messenger.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Messenger.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Messenger.java
deleted file mode 100644
index 6791375..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Messenger.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package com.nearinfinity.agent.notifications;
-
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jivesoftware.smack.Chat;
-import org.jivesoftware.smack.ChatManager;
-import org.jivesoftware.smack.Connection;
-import org.jivesoftware.smack.ConnectionConfiguration;
-import org.jivesoftware.smack.MessageListener;
-import org.jivesoftware.smack.XMPPConnection;
-import org.jivesoftware.smack.XMPPException;
-import org.jivesoftware.smack.packet.Message;
-
-public class Messenger {
-	private static final Log log = LogFactory.getLog(Messenger.class);
-	
-	private String host;
-	private int port;
-	private String username;
-	private String password;
-	private Set<String> recipients;
-	private boolean sendMessage = false;
-	
-	public Messenger(Properties props) {
-		if (props.containsKey("messenger.enabled") && props.getProperty("messenger.enabled").equals("true")) {
-			sendMessage = true;
-			host = props.getProperty("messenger.host");
-			port = Integer.parseInt(props.getProperty("messenger.port", "5222"));
-			username = props.getProperty("messenger.user");
-			password = props.getProperty("messenger.password");
-			
-			recipients = new HashSet<String>();
-			for (String recip : props.getProperty("messenger.recipients", "").split("\\|")) {
-				if (StringUtils.isNotBlank(recip)) {
-					recipients.add(recip);
-				}
-			}
-		}
-	}
-	
-	public void sendMessage(String message) {
-		if (sendMessage && !recipients.isEmpty()) {
-			try {
-				// Create the configuration for this new connection
-				ConnectionConfiguration config = new ConnectionConfiguration(host, port);
-				config.setCompressionEnabled(true);
-				config.setSASLAuthenticationEnabled(true);
-				config.setSendPresence(false);
-				config.setRosterLoadedAtLogin(false);
-				
-				Connection connection = new XMPPConnection(config);
-				// Connect to the server
-				connection.connect();
-				// Log into the server
-				connection.login(username, password);
-				ChatManager manager = connection.getChatManager();
-				for (String recipient : recipients) {
-					Chat chat = manager.createChat(recipient, new MessageListener() {
-						
-						@Override
-						public void processMessage(Chat chat, Message message) {
-							try {
-								chat.sendMessage("Sorry I'm just a bot");
-							} catch (XMPPException e) {
-								//Ignore response
-							}
-						}
-					});
-					chat.sendMessage(message);
-				}
-				// Disconnect from the server
-				connection.disconnect();
-			} catch (Exception e) {
-				log.warn("Unable to send Instant Message: " + e.getMessage());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Notifier.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Notifier.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Notifier.java
deleted file mode 100644
index 21182d2..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Notifier.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.nearinfinity.agent.notifications;
-
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class Notifier {
-	private static final Log log = LogFactory.getLog(Notifier.class);
-	private static final String SUBJECT = "Blur Console: {0} nodes may have gone offline!";
-	private static final String MESSAGE = "Blur Console has received notice that the following {0} nodes have recently gone offline.\n\n{1}";
-	private static Notifier notifier;
-	
-	private Mailer mailer;
-	private Messenger messenger;
-	
-	private Notifier(Properties props) {
-		mailer = new Mailer(props);
-		messenger = new Messenger(props);
-	}
-	
-	public void notifyZookeeperOffline(String zookeeperName) {
-		sendNotification("Zookeeper", new ArrayList<String>(Arrays.asList(new String[]{zookeeperName})));
-	}
-
-	public void notifyControllerOffline(List<String> controllerNames) {
-		sendNotification("Controller", controllerNames);
-	}
-
-	public void notifyShardOffline(List<String> shardNames) {
-		sendNotification("Shard", shardNames);
-	}
-	
-	private void sendNotification(String nodeType, List<String> nodeNames) {
-		String subject = MessageFormat.format(SUBJECT, nodeType);
-		String message = MessageFormat.format(MESSAGE, nodeType, StringUtils.join(nodeNames, "\n"));
-		
-		mailer.sendMessage(subject, message);
-		messenger.sendMessage(message);
-	}
-	
-	public static Notifier getNotifier(Properties props, boolean forceNew) {
-		if (notifier == null || forceNew) {
-			notifier = new Notifier(props);
-		}
-		return notifier;
-	}
-
-	public static Notifier getNotifier() {
-		if (notifier != null) {
-			return notifier;
-		}
-
-		log.warn("Notifier has not been configured yet.  No notifications will be sent.");
-		return new Notifier(new Properties());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/Column.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/Column.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/Column.java
deleted file mode 100644
index c52d347..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/Column.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package com.nearinfinity.agent.types;
-
-public class Column {
-	String name;
-	String analyzer;
-	boolean fullText;
-	boolean live;
-
-	public Column(String name) {
-		this.name = name;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-
-	public String getAnalyzer() {
-		return analyzer;
-	}
-
-	public void setAnalyzer(String analyzer) {
-		this.analyzer = analyzer;
-	}
-
-	public boolean isLive() {
-		return live;
-	}
-
-	public void setLive(boolean live) {
-		this.live = live;
-	}
-
-	public boolean isFullText() {
-		return fullText;
-	}
-
-	public void setFullText(boolean fullText) {
-		this.fullText = fullText;
-	}
-
-	public boolean isSearchable() {
-		return (this.analyzer != null && this.analyzer != "");
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + ((name == null) ? 0 : name.hashCode());
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		Column other = (Column) obj;
-		if (name == null) {
-			if (other.name != null)
-				return false;
-		} else if (!name.equals(other.name))
-			return false;
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/Family.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/Family.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/Family.java
deleted file mode 100644
index 806f7d0..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/Family.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.nearinfinity.agent.types;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class Family {
-	String name;
-	List<Column> columns = new ArrayList<Column>();
-
-	public Family(String name) {
-		this.name = name;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-
-	public List<Column> getColumns() {
-		return columns;
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + ((name == null) ? 0 : name.hashCode());
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		Family other = (Family) obj;
-		if (name == null) {
-			if (other.name != null)
-				return false;
-		} else if (!name.equals(other.name))
-			return false;
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/TimeHelper.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/TimeHelper.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/TimeHelper.java
deleted file mode 100644
index 5069031..0000000
--- a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/types/TimeHelper.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.nearinfinity.agent.types;
-
-import java.util.Calendar;
-import java.util.Date;
-import java.util.TimeZone;
-
-public class TimeHelper {
-	public static Calendar now() {
-		return getAdjustedTime(new Date().getTime());
-	}
-
-	public static Calendar getAdjustedTime(long time) {
-		Calendar cal = Calendar.getInstance();
-		TimeZone z = cal.getTimeZone();
-		cal.add(Calendar.MILLISECOND, -(z.getOffset(time)));
-		return cal;
-	}
-	
-	public static Calendar getTimeAgo(int timeAgoInMS){
-		Calendar timeAgo = Calendar.getInstance();
-		timeAgo.setTimeInMillis(now().getTimeInMillis());
-		timeAgo.add(Calendar.MILLISECOND, -timeAgoInMS);
-		return timeAgo;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/Agent.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/Agent.java b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/Agent.java
new file mode 100644
index 0000000..e96c040
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/Agent.java
@@ -0,0 +1,189 @@
+package org.apache.blur.agent;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.agent.cleaners.AgentCleaners;
+import org.apache.blur.agent.collectors.blur.BlurCollector;
+import org.apache.blur.agent.collectors.hdfs.HdfsCollector;
+import org.apache.blur.agent.collectors.zookeeper.ZookeeperCollector;
+import org.apache.blur.agent.connections.JdbcConnection;
+import org.apache.blur.agent.connections.blur.BlurDatabaseConnection;
+import org.apache.blur.agent.connections.cleaners.CleanerDatabaseConnection;
+import org.apache.blur.agent.connections.hdfs.HdfsDatabaseConnection;
+import org.apache.blur.agent.connections.zookeeper.ZookeeperDatabaseConnection;
+import org.apache.blur.agent.exceptions.HdfsThreadException;
+import org.apache.blur.agent.notifications.Notifier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.PropertyConfigurator;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+
+public class Agent {
+	public static final long COLLECTOR_SLEEP_TIME = TimeUnit.SECONDS.toMillis(15);
+	public static final long CLEAN_UP_SLEEP_TIME = TimeUnit.SECONDS.toMillis(30);
+
+	private static final Log log = LogFactory.getLog(Agent.class);
+
+	private Agent(Properties props) {
+
+		// Setup database connection
+		JdbcTemplate jdbc = JdbcConnection.createDBConnection(props);
+
+		// Setup the notifier
+		Notifier.getNotifier(props, true);
+
+		List<String> activeCollectors = props.containsKey("active.collectors") ? new ArrayList<String>(Arrays.asList(props.getProperty(
+				"active.collectors").split("\\|"))) : new ArrayList<String>();
+
+		// Setup the collectors
+		setupHdfs(props, jdbc, activeCollectors);
+		setupBlur(props, jdbc, activeCollectors);
+		setupZookeeper(props, jdbc);
+		setupCleaners(jdbc, activeCollectors);
+	}
+
+	public static void main(String[] args) {
+		writePidFile();
+		Properties configProps = loadConfigParams(args);
+		setupLogger(configProps);
+		new Agent(configProps);
+	}
+
+	private void setupCleaners(JdbcTemplate jdbc, List<String> activeCollectors) {
+		new Thread(new AgentCleaners(activeCollectors, new CleanerDatabaseConnection(jdbc)), "Agent Cleaner Thread").start();
+	}
+
+	private void setupBlur(Properties props, JdbcTemplate jdbc, List<String> activeCollectors) {
+		Map<String, String> blurInstances = loadBlurInstances(props);
+		for (Map.Entry<String, String> blurEntry : blurInstances.entrySet()) {
+			final String zookeeperName = blurEntry.getKey();
+			final String connection = blurEntry.getValue();
+			new Thread(new BlurCollector(zookeeperName, connection, activeCollectors, new BlurDatabaseConnection(jdbc), jdbc),
+					"Blur Collector thread - " + zookeeperName).start();
+		}
+	}
+
+	private void setupHdfs(Properties props, final JdbcTemplate jdbc, List<String> activeCollectors) {
+		Map<String, Map<String, String>> hdfsInstances = loadHdfsInstances(props);
+		for (Map<String, String> instance : hdfsInstances.values()) {
+			final String name = instance.get("name");
+			final String thriftUri = instance.get("url.thrift");
+			final String defaultUri = instance.get("url.default");
+			final String user = props.getProperty("hdfs." + name + ".login.user");
+			try {
+				new Thread(new HdfsCollector(name, defaultUri, thriftUri, user, activeCollectors, new HdfsDatabaseConnection(jdbc)),
+						"Hdfs Collector - " + name).start();
+			} catch (HdfsThreadException e) {
+				log.error("The collector for hdfs [" + name + "] will not execute.");
+				continue;
+			}
+		}
+	}
+
+	private void setupZookeeper(Properties props, JdbcTemplate jdbc) {
+		if (props.containsKey("zk.instances")) {
+			List<String> zooKeeperInstances = new ArrayList<String>(Arrays.asList(props.getProperty("zk.instances").split("\\|")));
+			for (String zkInstance : zooKeeperInstances) {
+				String zkUrl = props.getProperty("zk." + zkInstance + ".url");
+				String blurConnection = props.getProperty("blur." + zkInstance + ".url");
+				new Thread(new ZookeeperCollector(zkUrl, zkInstance, blurConnection, new ZookeeperDatabaseConnection(jdbc)), "Zookeeper - "
+						+ zkInstance).start();
+			}
+		}
+	}
+
+	private static void setupLogger(Properties props) {
+		String log4jPropsFile = props.getProperty("log4j.properties", "../conf/log4j.properties");
+
+		if (new File(log4jPropsFile).exists()) {
+			PropertyConfigurator.configure(log4jPropsFile);
+		} else {
+			log.warn("Unable to find log4j properties file.  Using default logging");
+		}
+	}
+
+	private static Properties loadConfigParams(String[] args) {
+		String configFileName;
+		if (args.length == 0) {
+			configFileName = "../conf/blur-agent.config";
+		} else {
+			configFileName = args[0];
+		}
+		File configFile = new File(configFileName);
+
+		if (!configFile.exists() || !configFile.isFile()) {
+			log.fatal("Unable to find config file at " + configFile.getAbsolutePath());
+			System.exit(1);
+		}
+
+		Properties configProps = new Properties();
+		try {
+			configProps.load(new FileInputStream(configFile));
+		} catch (Exception e) {
+			log.fatal("Config File is not a valid properties file: " + e.getMessage());
+			System.exit(1);
+		}
+		return configProps;
+	}
+
+	private static void writePidFile() {
+		try {
+			File pidFile = new File("../agent.pid");
+			PrintWriter pidOut = new PrintWriter(pidFile);
+			log.info("Wrote pid file to: " + pidFile.getAbsolutePath());
+			String nameOfRunningVM = ManagementFactory.getRuntimeMXBean().getName();
+			int p = nameOfRunningVM.indexOf('@');
+			String pid = nameOfRunningVM.substring(0, p);
+			pidOut.write(pid);
+			pidOut.write("\n");
+			pidOut.close();
+		} catch (FileNotFoundException e) {
+			log.fatal("Unable to find pid file. " + e.getMessage());
+			System.exit(1);
+		}
+	}
+
+	private Map<String, String> loadBlurInstances(Properties props) {
+		Map<String, String> instances = new HashMap<String, String>();
+
+		if (props.containsKey("blur.instances")) {
+			String[] blurNames = props.getProperty("blur.instances").split("\\|");
+
+			for (String blur : blurNames) {
+				instances.put(blur, props.getProperty("blur." + blur + ".url"));
+			}
+		}
+
+		return instances;
+	}
+
+	private Map<String, Map<String, String>> loadHdfsInstances(Properties props) {
+		Map<String, Map<String, String>> instances = new HashMap<String, Map<String, String>>();
+
+		if (props.containsKey("hdfs.instances")) {
+			String[] hdfsNames = props.getProperty("hdfs.instances").split("\\|");
+
+			for (String hdfs : hdfsNames) {
+				Map<String, String> instanceInfo = new HashMap<String, String>();
+				instanceInfo.put("url.thrift", props.getProperty("hdfs." + hdfs + ".thrift.url"));
+				instanceInfo.put("url.default", props.getProperty("hdfs." + hdfs + ".url"));
+				instanceInfo.put("name", hdfs);
+				instances.put(hdfs, instanceInfo);
+			}
+		}
+
+		return instances;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/AgentCleaners.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/AgentCleaners.java b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/AgentCleaners.java
new file mode 100644
index 0000000..5308e67
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/AgentCleaners.java
@@ -0,0 +1,40 @@
+package org.apache.blur.agent.cleaners;
+
+import java.util.List;
+
+import org.apache.blur.agent.Agent;
+import org.apache.blur.agent.connections.cleaners.interfaces.CleanerDatabaseInterface;
+
+
+public class AgentCleaners implements Runnable {
+
+	private final boolean cleanQueries;
+	private final boolean cleanHdfsStats;
+	private final CleanerDatabaseInterface database;
+
+	public AgentCleaners(final List<String> activeCollectors, CleanerDatabaseInterface database) {
+		this.cleanQueries = activeCollectors.contains("queries");
+		this.cleanHdfsStats = activeCollectors.contains("hdfs");
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		while (true) {
+			if (this.cleanQueries) {
+				new Thread(new QueriesCleaner(this.database), "Query Cleaner").start();
+			}
+
+			if (this.cleanHdfsStats) {
+				new Thread(new HdfsStatsCleaner(this.database), "Hdfs Stats Cleaner").start();
+			}
+
+			try {
+				Thread.sleep(Agent.CLEAN_UP_SLEEP_TIME);
+			} catch (InterruptedException e) {
+				break;
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/HdfsStatsCleaner.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/HdfsStatsCleaner.java b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/HdfsStatsCleaner.java
new file mode 100644
index 0000000..903a0f5
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/HdfsStatsCleaner.java
@@ -0,0 +1,28 @@
+package org.apache.blur.agent.cleaners;
+
+import org.apache.blur.agent.connections.cleaners.interfaces.HdfsDatabaseCleanerInterface;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.dao.DataAccessException;
+
+
+public class HdfsStatsCleaner implements Runnable {
+	private static final Log log = LogFactory.getLog(QueriesCleaner.class);
+
+	private final HdfsDatabaseCleanerInterface database;
+
+	public HdfsStatsCleaner(HdfsDatabaseCleanerInterface database) {
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			this.database.deleteOldStats();
+		} catch (DataAccessException e) {
+			log.error("An error occured while deleting hdfs stats from the database!", e);
+		} catch (Exception e) {
+			log.error("An unkown error occured while cleaning up the hdfs stats!", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/QueriesCleaner.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/QueriesCleaner.java b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/QueriesCleaner.java
new file mode 100644
index 0000000..5dbd124
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/cleaners/QueriesCleaner.java
@@ -0,0 +1,32 @@
+package org.apache.blur.agent.cleaners;
+
+import org.apache.blur.agent.connections.cleaners.interfaces.QueryDatabaseCleanerInterface;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.dao.DataAccessException;
+
+
+public class QueriesCleaner implements Runnable {
+	private static final Log log = LogFactory.getLog(QueriesCleaner.class);
+
+	private final QueryDatabaseCleanerInterface database;
+
+	public QueriesCleaner(final QueryDatabaseCleanerInterface database) {
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			int deletedQueries = this.database.deleteOldQueries();
+			int expiredQueries = this.database.expireOldQueries();
+			if ((deletedQueries + expiredQueries) > 0) {
+				log.info("Removed " + deletedQueries + " queries and " + "Expired " + expiredQueries + " queries, in this pass!");
+			}
+		} catch (DataAccessException e) {
+			log.error("An error occured while deleting queries from the database!", e);
+		} catch (Exception e) {
+			log.error("An unkown error occured while cleaning up the queries!", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/BlurCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/BlurCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/BlurCollector.java
new file mode 100644
index 0000000..49a3d53
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/BlurCollector.java
@@ -0,0 +1,119 @@
+package org.apache.blur.agent.collectors.blur;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.agent.Agent;
+import org.apache.blur.agent.collectors.blur.query.QueryCollector;
+import org.apache.blur.agent.collectors.blur.table.TableCollector;
+import org.apache.blur.agent.connections.blur.interfaces.BlurDatabaseInterface;
+import org.apache.blur.agent.exceptions.ZookeeperNameCollisionException;
+import org.apache.blur.agent.exceptions.ZookeeperNameMissingException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+
+public class BlurCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(BlurCollector.class);
+
+	private final String zookeeperName;
+	private final BlurDatabaseInterface database;
+	private final boolean collectTables;
+	private final boolean collectQueries;
+
+	private String connection;
+
+	public BlurCollector(final String zookeeperName, final String connection, final List<String> activeCollectors,
+			final BlurDatabaseInterface database, final JdbcTemplate jdbc) {
+		this.zookeeperName = zookeeperName;
+		this.connection = connection;
+		this.database = database;
+		this.collectTables = activeCollectors.contains("tables");
+		this.collectQueries = activeCollectors.contains("queries");
+	}
+
+	@Override
+	public void run() {
+		while (true) {
+			// Retrieve the zookeeper id
+			int zookeeperId = getZookeeperId();
+
+			// If the connection string is blank then we need to build it from the
+			// online controllers from the database
+			String resolvedConnection = getResolvedConnection(zookeeperId);
+
+			if (StringUtils.isBlank(resolvedConnection)) {
+				try {
+					Thread.sleep(Agent.COLLECTOR_SLEEP_TIME);
+				} catch (InterruptedException e) {
+					break;
+				}
+				continue;
+			}
+
+			Iface blurConnection = BlurClient.getClient(resolvedConnection);
+
+			/* Retrieve the clusters and their info */
+			for (Map<String, Object> cluster : this.database.getClusters(zookeeperId)) {
+				String clusterName = (String) cluster.get("NAME");
+				Integer clusterId = (Integer) cluster.get("ID");
+
+				List<String> tables;
+				try {
+					tables = blurConnection.tableListByCluster(clusterName);
+				} catch (Exception e) {
+					log.error("An error occured while trying to retrieve the table list for cluster[" + clusterName + "], skipping cluster", e);
+					continue;
+				}
+
+				for (final String tableName : tables) {
+					int tableId = this.database.getTableId(clusterId, tableName);
+					if (tableId == -1) {
+						continue;
+					}
+
+					if (this.collectTables) {
+						new Thread(new TableCollector(blurConnection, tableName, tableId, this.database), "Table Collector - " + tableName).start();
+					}
+
+					if (this.collectQueries) {
+						new Thread(new QueryCollector(blurConnection, tableName, tableId, this.database), "Query Collector - " + tableName).start();
+					}
+				}
+			}
+
+			try {
+				Thread.sleep(Agent.COLLECTOR_SLEEP_TIME);
+			} catch (InterruptedException e) {
+				break;
+			}
+		}
+	}
+
+	private String getResolvedConnection(int zookeeperId) {
+		if (StringUtils.isBlank(this.connection)) {
+			return this.database.resolveConnectionString(zookeeperId);
+		} else {
+			return this.connection;
+		}
+	}
+
+	private int getZookeeperId() {
+		try {
+			return Integer.parseInt(this.database.getZookeeperId(this.zookeeperName));
+		} catch (NumberFormatException e) {
+			log.error("The returned zookeeperId is not a valid number", e);
+			return -1;
+		} catch (ZookeeperNameMissingException e) {
+			log.error(e.getMessage(), e);
+			return -1;
+		} catch (ZookeeperNameCollisionException e) {
+			log.error(e.getMessage(), e);
+			return -1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/query/QueryCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/query/QueryCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/query/QueryCollector.java
new file mode 100644
index 0000000..f300379
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/query/QueryCollector.java
@@ -0,0 +1,89 @@
+package org.apache.blur.agent.collectors.blur.query;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.blur.agent.connections.blur.interfaces.QueryDatabaseInterface;
+import org.apache.blur.agent.types.TimeHelper;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.SimpleQuery;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+public class QueryCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(QueryCollector.class);
+
+	private final Iface blurConnection;
+	private final String tableName;
+	private final int tableId;
+	private final QueryDatabaseInterface database;
+
+	public QueryCollector(Iface connection, String tableName, int tableId, QueryDatabaseInterface database) {
+		this.blurConnection = connection;
+		this.tableName = tableName;
+		this.tableId = tableId;
+		this.database = database;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void run() {
+		Set<Long> currentQueries = new HashSet<Long>();
+		try {
+			currentQueries.addAll(blurConnection.queryStatusIdList(tableName));
+			//currentQueries.addAll(this.database.getRunningQueries());
+		} catch (Exception e) {
+			log.error("Unable to get the list of current queries [" + tableName + "]." + e.getMessage());
+			return;
+		}
+		
+		// Mark running queries that can't be found as complete - unknown
+		this.database.markOrphanedRunningQueriesComplete(CollectionUtils.subtract(this.database.getRunningQueries(), currentQueries));
+		
+
+		for (Long queryUUID : currentQueries) {
+			BlurQueryStatus status;
+			try {
+				status = blurConnection.queryStatusById(tableName, queryUUID);
+			} catch (Exception e) {
+				log.error("Unable to get query status for query [" + queryUUID + "]." + e.getMessage());
+				continue;
+			}
+
+			Map<String, Object> oldQuery = this.database.getQuery(this.tableId, queryUUID);
+
+			String times;
+			try {
+				times = new ObjectMapper().writeValueAsString(status.getCpuTimes());
+			} catch (Exception e) {
+				log.error("Unable to parse cpu times.", e);
+				times = null;
+			}
+
+			if (oldQuery == null) {
+				SimpleQuery query = status.getQuery().getSimpleQuery();
+				long startTimeLong = status.getQuery().getStartTime();
+
+				// Set the query creation time to now or given start time
+				Date startTime = (startTimeLong > 0) ? TimeHelper.getAdjustedTime(startTimeLong).getTime() : TimeHelper.now().getTime();
+
+				this.database.createQuery(status, query, times, startTime, this.tableId);
+			} else if (queryHasChanged(status, times, oldQuery)) {
+				this.database.updateQuery(status, times, (Integer) oldQuery.get("ID"));
+			}
+		}
+	}
+
+	private static boolean queryHasChanged(BlurQueryStatus blurQueryStatus, String timesJSON, Map<String, Object> oldQueryInfo) {
+		return blurQueryStatus.getState().getValue() == 0
+				|| !(timesJSON.equals(oldQueryInfo.get("TIMES"))
+						&& blurQueryStatus.getCompleteShards() == (Integer) oldQueryInfo.get("COMPLETE_SHARDS") && blurQueryStatus.getState()
+						.getValue() == (Integer) oldQueryInfo.get("STATE"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1485db70/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/table/SchemaCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/table/SchemaCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/table/SchemaCollector.java
new file mode 100644
index 0000000..9a149fe
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/org/apache/blur/agent/collectors/blur/table/SchemaCollector.java
@@ -0,0 +1,143 @@
+package org.apache.blur.agent.collectors.blur.table;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.blur.agent.connections.blur.interfaces.TableDatabaseInterface;
+import org.apache.blur.agent.exceptions.NullReturnedException;
+import org.apache.blur.agent.types.Column;
+import org.apache.blur.agent.types.Family;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.ColumnFamilyDefinition;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.dao.DataAccessException;
+
+
+public class SchemaCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(SchemaCollector.class);
+
+	private final Iface blurConnection;
+	private final String tableName;
+	private final int tableId;
+	private final TableDescriptor descriptor;
+	private final TableDatabaseInterface database;
+
+	public SchemaCollector(Iface connection, String tableName, int tableId, TableDescriptor descriptor, TableDatabaseInterface database) {
+		this.blurConnection = connection;
+		this.tableName = tableName;
+		this.tableId = tableId;
+		this.descriptor = descriptor;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			Schema schema = null;
+			schema = blurConnection.schema(tableName);
+			if (schema == null || descriptor == null) {
+				throw new NullReturnedException("No Schema or Descriptor Defined!");
+			}
+
+			List<Family> columnDefs = getColumnDefinitions(schema);
+
+			AnalyzerDefinition analyzerDefinition = descriptor.getAnalyzerDefinition();
+			if (analyzerDefinition != null) {
+				Map<String, ColumnFamilyDefinition> columnFamilyDefinitions = analyzerDefinition.getColumnFamilyDefinitions();
+				ColumnDefinition analyzerDefaultDefinition = analyzerDefinition.getDefaultDefinition();
+				if (columnFamilyDefinitions == null) {
+					for (Family family : columnDefs) {
+						for (Column column : family.getColumns()) {
+							if (analyzerDefaultDefinition == null) {
+								column.setAnalyzer("UNKNOWN");
+							} else {
+								column.setAnalyzer(analyzerDefaultDefinition.getAnalyzerClassName());
+								column.setFullText(analyzerDefaultDefinition.isFullTextIndex());
+							}
+						}
+					}
+				} else {
+					for (Map.Entry<String, ColumnFamilyDefinition> describeEntry : columnFamilyDefinitions.entrySet()) {
+						Family family = new Family(describeEntry.getKey());
+						int familyIndex = columnDefs.indexOf(family);
+
+						if (familyIndex == -1) {
+							columnDefs.add(family);
+						} else {
+							family = columnDefs.get(familyIndex);
+						}
+
+						Map<String, ColumnDefinition> columnDefinitions = describeEntry.getValue().getColumnDefinitions();
+						ColumnDefinition columnDefaultDefinition = describeEntry.getValue().getDefaultDefinition();
+						if (columnDefinitions == null) {
+							for (Column column : family.getColumns()) {
+								if (columnDefaultDefinition == null && analyzerDefaultDefinition == null) {
+									column.setAnalyzer("UNKNOWN");
+								} else if (columnDefaultDefinition == null) {
+									column.setAnalyzer(analyzerDefaultDefinition.getAnalyzerClassName());
+									column.setFullText(analyzerDefaultDefinition.isFullTextIndex());
+								} else {
+									column.setAnalyzer(columnDefaultDefinition.getAnalyzerClassName());
+									column.setFullText(columnDefaultDefinition.isFullTextIndex());
+								}
+							}
+						} else {
+							for (Map.Entry<String, ColumnDefinition> columnDescription : columnDefinitions.entrySet()) {
+								Column column = new Column(columnDescription.getKey());
+								int columnIndex = family.getColumns().indexOf(column);
+
+								if (columnIndex == -1) {
+									family.getColumns().add(column);
+								} else {
+									column = family.getColumns().get(columnIndex);
+								}
+
+								column.setAnalyzer(columnDescription.getValue().getAnalyzerClassName());
+								column.setFullText(columnDescription.getValue().isFullTextIndex());
+							}
+						}
+					}
+				}
+			}
+			this.database.updateTableSchema(this.tableId, new ObjectMapper().writeValueAsString(columnDefs), this.descriptor
+					.getAnalyzerDefinition().getFullTextAnalyzerClassName());
+		} catch (BlurException e) {
+			log.error("Unable to get the shard schema for table [" + tableName + "].", e);
+		} catch (JsonProcessingException e) {
+			log.error("Unable to convert shard schema to json.", e);
+		} catch (DataAccessException e) {
+			log.error("An error occurred while writing the schema to the database.", e);
+		} catch (NullReturnedException e) {
+			log.error(e.getMessage(), e);
+		} catch (Exception e) {
+			log.error("An unknown error occurred in the TableSchemaCollector.", e);
+		}
+	}
+
+	private List<Family> getColumnDefinitions(final Schema schema) {
+		List<Family> columnDefs = new ArrayList<Family>();
+		Map<String, Set<String>> columnFamilies = schema.getColumnFamilies();
+		if (columnFamilies != null) {
+			for (Map.Entry<String, Set<String>> schemaEntry : columnFamilies.entrySet()) {
+				Family family = new Family(schemaEntry.getKey());
+				for (String columnName : schemaEntry.getValue()) {
+					Column column = new Column(columnName);
+					column.setLive(true);
+					family.getColumns().add(column);
+				}
+				columnDefs.add(family);
+			}
+		}
+		return columnDefs;
+	}
+}


Mime
View raw message