incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [05/50] [abbrv] Initial Blur Console commit.
Date Fri, 17 May 2013 03:24:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/SchemaCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/SchemaCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/SchemaCollector.java
new file mode 100644
index 0000000..029b472
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/SchemaCollector.java
@@ -0,0 +1,143 @@
+package com.nearinfinity.agent.collectors.blur.table;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.dao.DataAccessException;
+
+import com.nearinfinity.agent.connections.blur.interfaces.TableDatabaseInterface;
+import com.nearinfinity.agent.exceptions.NullReturnedException;
+import com.nearinfinity.agent.types.Column;
+import com.nearinfinity.agent.types.Family;
+import com.nearinfinity.blur.thrift.generated.AnalyzerDefinition;
+import com.nearinfinity.blur.thrift.generated.BlurException;
+import com.nearinfinity.blur.thrift.generated.ColumnDefinition;
+import com.nearinfinity.blur.thrift.generated.ColumnFamilyDefinition;
+import com.nearinfinity.blur.thrift.generated.Schema;
+import com.nearinfinity.blur.thrift.generated.TableDescriptor;
+import com.nearinfinity.blur.thrift.generated.Blur.Iface;
+
+public class SchemaCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(SchemaCollector.class);
+
+	private final Iface blurConnection;
+	private final String tableName;
+	private final int tableId;
+	private final TableDescriptor descriptor;
+	private final TableDatabaseInterface database;
+
+	public SchemaCollector(Iface connection, String tableName, int tableId, TableDescriptor descriptor, TableDatabaseInterface database) {
+		this.blurConnection = connection;
+		this.tableName = tableName;
+		this.tableId = tableId;
+		this.descriptor = descriptor;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			Schema schema = null;
+			schema = blurConnection.schema(tableName);
+			if (schema == null || descriptor == null) {
+				throw new NullReturnedException("No Schema or Descriptor Defined!");
+			}
+
+			List<Family> columnDefs = getColumnDefinitions(schema);
+
+			AnalyzerDefinition analyzerDefinition = descriptor.getAnalyzerDefinition();
+			if (analyzerDefinition != null) {
+				Map<String, ColumnFamilyDefinition> columnFamilyDefinitions = analyzerDefinition.getColumnFamilyDefinitions();
+				ColumnDefinition analyzerDefaultDefinition = analyzerDefinition.getDefaultDefinition();
+				if (columnFamilyDefinitions == null) {
+					for (Family family : columnDefs) {
+						for (Column column : family.getColumns()) {
+							if (analyzerDefaultDefinition == null) {
+								column.setAnalyzer("UNKNOWN");
+							} else {
+								column.setAnalyzer(analyzerDefaultDefinition.getAnalyzerClassName());
+								column.setFullText(analyzerDefaultDefinition.isFullTextIndex());
+							}
+						}
+					}
+				} else {
+					for (Map.Entry<String, ColumnFamilyDefinition> describeEntry : columnFamilyDefinitions.entrySet()) {
+						Family family = new Family(describeEntry.getKey());
+						int familyIndex = columnDefs.indexOf(family);
+
+						if (familyIndex == -1) {
+							columnDefs.add(family);
+						} else {
+							family = columnDefs.get(familyIndex);
+						}
+
+						Map<String, ColumnDefinition> columnDefinitions = describeEntry.getValue().getColumnDefinitions();
+						ColumnDefinition columnDefaultDefinition = describeEntry.getValue().getDefaultDefinition();
+						if (columnDefinitions == null) {
+							for (Column column : family.getColumns()) {
+								if (columnDefaultDefinition == null && analyzerDefaultDefinition == null) {
+									column.setAnalyzer("UNKNOWN");
+								} else if (columnDefaultDefinition == null) {
+									column.setAnalyzer(analyzerDefaultDefinition.getAnalyzerClassName());
+									column.setFullText(analyzerDefaultDefinition.isFullTextIndex());
+								} else {
+									column.setAnalyzer(columnDefaultDefinition.getAnalyzerClassName());
+									column.setFullText(columnDefaultDefinition.isFullTextIndex());
+								}
+							}
+						} else {
+							for (Map.Entry<String, ColumnDefinition> columnDescription : columnDefinitions.entrySet()) {
+								Column column = new Column(columnDescription.getKey());
+								int columnIndex = family.getColumns().indexOf(column);
+
+								if (columnIndex == -1) {
+									family.getColumns().add(column);
+								} else {
+									column = family.getColumns().get(columnIndex);
+								}
+
+								column.setAnalyzer(columnDescription.getValue().getAnalyzerClassName());
+								column.setFullText(columnDescription.getValue().isFullTextIndex());
+							}
+						}
+					}
+				}
+			}
+			this.database.updateTableSchema(this.tableId, new ObjectMapper().writeValueAsString(columnDefs), this.descriptor
+					.getAnalyzerDefinition().getFullTextAnalyzerClassName());
+		} catch (BlurException e) {
+			log.error("Unable to get the shard schema for table [" + tableName + "].", e);
+		} catch (JsonProcessingException e) {
+			log.error("Unable to convert shard schema to json.", e);
+		} catch (DataAccessException e) {
+			log.error("An error occurred while writing the schema to the database.", e);
+		} catch (NullReturnedException e) {
+			log.error(e.getMessage(), e);
+		} catch (Exception e) {
+			log.error("An unknown error occurred in the TableSchemaCollector.", e);
+		}
+	}
+
+	private List<Family> getColumnDefinitions(final Schema schema) {
+		List<Family> columnDefs = new ArrayList<Family>();
+		Map<String, Set<String>> columnFamilies = schema.getColumnFamilies();
+		if (columnFamilies != null) {
+			for (Map.Entry<String, Set<String>> schemaEntry : columnFamilies.entrySet()) {
+				Family family = new Family(schemaEntry.getKey());
+				for (String columnName : schemaEntry.getValue()) {
+					Column column = new Column(columnName);
+					column.setLive(true);
+					family.getColumns().add(column);
+				}
+				columnDefs.add(family);
+			}
+		}
+		return columnDefs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/ServerCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/ServerCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/ServerCollector.java
new file mode 100644
index 0000000..31c8fbc
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/ServerCollector.java
@@ -0,0 +1,69 @@
+package com.nearinfinity.agent.collectors.blur.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.dao.DataAccessException;
+
+import com.nearinfinity.agent.connections.blur.interfaces.TableDatabaseInterface;
+import com.nearinfinity.agent.exceptions.NullReturnedException;
+import com.nearinfinity.blur.thrift.generated.BlurException;
+import com.nearinfinity.blur.thrift.generated.Blur.Iface;
+
+public class ServerCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(ServerCollector.class);
+
+	private final Iface blurConnection;
+	private final String tableName;
+	private final int tableId;
+	private final TableDatabaseInterface database;
+
+	public ServerCollector(Iface connection, String tableName, int tableId, TableDatabaseInterface database) {
+		this.blurConnection = connection;
+		this.tableName = tableName;
+		this.tableId = tableId;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			Map<String, String> shardServerLayout = blurConnection.shardServerLayout(tableName);
+			if (shardServerLayout == null) {
+				throw new NullReturnedException("No server layout was returned!");
+			}
+			Map<String, ArrayList<String>> serverLayout = getServerLayout(shardServerLayout);
+			this.database.updateTableServer(tableId, new ObjectMapper().writeValueAsString(serverLayout));
+
+		} catch (BlurException e) {
+			log.error("Unable to get shard server layout for table [" + tableName + "].", e);
+		} catch (JsonProcessingException e) {
+			log.error("Unable to convert the shard layout to json.", e);
+		} catch (DataAccessException e) {
+			log.error("An error occurred while writing the server to the database.", e);
+		} catch (NullReturnedException e) {
+			log.error(e.getMessage(), e);
+		} catch (Exception e) {
+			log.error("An unknown error occurred in the TableServerCollector.", e);
+		}
+	}
+
+	private Map<String, ArrayList<String>> getServerLayout(Map<String, String> shardServerLayout) {
+		Map<String, ArrayList<String>> formattedShard = new HashMap<String, ArrayList<String>>();
+		for (String shard : shardServerLayout.keySet()) {
+			String host = shardServerLayout.get(shard);
+			if (formattedShard.get(host) != null) {
+				formattedShard.get(host).add(shard);
+			} else {
+				formattedShard.put(host, new ArrayList<String>(Arrays.asList(shard)));
+			}
+		}
+		return formattedShard;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/StatsCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/StatsCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/StatsCollector.java
new file mode 100644
index 0000000..3b91f8f
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/StatsCollector.java
@@ -0,0 +1,49 @@
+package com.nearinfinity.agent.collectors.blur.table;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.dao.DataAccessException;
+
+import com.nearinfinity.agent.connections.blur.interfaces.TableDatabaseInterface;
+import com.nearinfinity.agent.exceptions.NullReturnedException;
+import com.nearinfinity.blur.thrift.generated.BlurException;
+import com.nearinfinity.blur.thrift.generated.TableStats;
+import com.nearinfinity.blur.thrift.generated.Blur.Iface;
+
+public class StatsCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(StatsCollector.class);
+
+	private final Iface blurConnection;
+	private final String tableName;
+	private final int tableId;
+	private final TableDatabaseInterface database;
+
+	public StatsCollector(Iface connection, String tableName, int tableId, TableDatabaseInterface database) {
+		this.blurConnection = connection;
+		this.tableName = tableName;
+		this.tableId = tableId;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			TableStats tableStats = blurConnection.getTableStats(this.tableName);
+
+			if (tableStats == null) {
+				throw new NullReturnedException("No table statistics were returned!");
+			}
+
+			this.database.updateTableStats(tableId, tableStats.getBytes(), tableStats.getQueries(), tableStats.getRecordCount(),
+					tableStats.getRowCount());
+		} catch (BlurException e) {
+			log.error("Unable to get table stats for table [" + tableId + "].", e);
+		} catch (DataAccessException e) {
+			log.error("An error occurred while writing the server to the database.", e);
+		} catch (NullReturnedException e) {
+			log.error(e.getMessage(), e);
+		} catch (Exception e) {
+			log.error("An unknown error occurred in the TableServerCollector.", e);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/TableCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/TableCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/TableCollector.java
new file mode 100644
index 0000000..606092a
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/blur/table/TableCollector.java
@@ -0,0 +1,50 @@
+package com.nearinfinity.agent.collectors.blur.table;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.nearinfinity.agent.connections.blur.interfaces.TableDatabaseInterface;
+import com.nearinfinity.blur.thrift.generated.Blur.Iface;
+import com.nearinfinity.blur.thrift.generated.TableDescriptor;
+
+public class TableCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(TableCollector.class);
+
+	private final Iface blurConnection;
+	private final String tableName;
+	private final int tableId;
+	private final TableDatabaseInterface database;
+
+	public TableCollector(Iface connection, String tableName, int tableId, TableDatabaseInterface database) {
+		this.blurConnection = connection;
+		this.tableName = tableName;
+		this.database = database;
+		this.tableId = tableId;
+	}
+
+	@Override
+	public void run() {
+		try {
+			TableDescriptor descriptor;
+			try {
+				descriptor = blurConnection.describe(tableName);
+			} catch (Exception e) {
+				log.error("An error occured while trying to describe the table [" + tableName + "], skipping table", e);
+				return;
+			}
+
+			/* spawn the different table info collectors */
+			if (descriptor.isEnabled) {
+				new Thread(new SchemaCollector(this.blurConnection, this.tableName, this.tableId, descriptor, this.database),
+						"Table Schema Collector - " + this.tableName).start();
+			}
+			new Thread(new ServerCollector(this.blurConnection, this.tableName, this.tableId, this.database), "Table Server Collector - "
+					+ this.tableName).start();
+			new Thread(new StatsCollector(this.blurConnection, this.tableName, this.tableId, this.database), "Table Stats Collector - "
+					+ this.tableName).start();
+
+		} catch (Exception e) {
+			log.error("An unknown error occurred.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/hdfs/HdfsCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/hdfs/HdfsCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/hdfs/HdfsCollector.java
new file mode 100644
index 0000000..4802799
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/hdfs/HdfsCollector.java
@@ -0,0 +1,62 @@
+package com.nearinfinity.agent.collectors.hdfs;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import com.nearinfinity.agent.Agent;
+import com.nearinfinity.agent.connections.hdfs.interfaces.HdfsDatabaseInterface;
+import com.nearinfinity.agent.exceptions.HdfsThreadException;
+
+public class HdfsCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(HdfsCollector.class);
+
+	private final URI defaultUri;
+	private final String hdfsName;
+	private final String user;
+	private final HdfsDatabaseInterface databaseConnection;
+	private final boolean collectHdfs;
+
+	public HdfsCollector(final String hdfsName, final String defaultUri, final String thriftUri, final String user,
+			final List<String> activeCollectors, final HdfsDatabaseInterface databaseConnection) throws HdfsThreadException {
+		try {
+			this.defaultUri = new URI(defaultUri);
+			this.hdfsName = hdfsName;
+			this.user = user;
+			this.databaseConnection = databaseConnection;
+			this.collectHdfs = activeCollectors.contains("hdfs");
+
+			initializeHdfs(hdfsName, thriftUri);
+
+		} catch (URISyntaxException e) {
+			log.error(e.getMessage(), e);
+			throw new HdfsThreadException();
+		} catch (Exception e) {
+			log.error("An unkown error occured while creating the collector.", e);
+			throw new HdfsThreadException();
+		}
+	}
+
+	@Override
+	public void run() {
+		while (true) {
+			if (this.collectHdfs) {
+				new Thread(new HdfsStatsCollector(this.hdfsName, defaultUri, this.user, this.databaseConnection), "Hdfs Collector - "
+						+ this.hdfsName).start();
+			}
+
+			try {
+				Thread.sleep(Agent.COLLECTOR_SLEEP_TIME);
+			} catch (InterruptedException e) {
+				break;
+			}
+		}
+	}
+
+	private void initializeHdfs(String name, String thriftUri) throws URISyntaxException {
+		URI parsedThriftUri = new URI(thriftUri);
+		this.databaseConnection.setHdfsInfo(name, parsedThriftUri.getHost(), parsedThriftUri.getPort());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/hdfs/HdfsStatsCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/hdfs/HdfsStatsCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/hdfs/HdfsStatsCollector.java
new file mode 100644
index 0000000..faa5eb3
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/hdfs/HdfsStatsCollector.java
@@ -0,0 +1,93 @@
+package com.nearinfinity.agent.collectors.hdfs;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.springframework.dao.DataAccessException;
+
+import com.nearinfinity.agent.connections.hdfs.interfaces.HdfsDatabaseInterface;
+import com.nearinfinity.agent.exceptions.NullReturnedException;
+import com.nearinfinity.agent.types.TimeHelper;
+
+public class HdfsStatsCollector implements Runnable {
+	private final static Log log = LogFactory.getLog(HdfsStatsCollector.class);
+
+	private final String hdfsName;
+	private final URI uri;
+	private final String host;
+	private final int port;
+	private final String user;
+	private final HdfsDatabaseInterface database;
+
+	public HdfsStatsCollector(final String hdfsName, final URI uri, final String user, final HdfsDatabaseInterface database) {
+		this.uri = uri;
+		this.host = uri.getHost();
+		this.port = uri.getPort();
+		this.hdfsName = hdfsName;
+		this.user = user;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			int hdfsId = this.database.getHdfsId(this.hdfsName);
+			if (hdfsId == -1) {
+				log.error("The HDFS [" + this.hdfsName + "] does not exist in the database");
+				return;
+			}
+
+			// Creates a filesystem connection (if a user is given
+			// then the filesystem can get additional information)
+			FileSystem fileSystem = (this.user != null) ? FileSystem.get(this.uri, new Configuration(), this.user) : FileSystem.get(this.uri,
+					new Configuration());
+
+			if (fileSystem instanceof DistributedFileSystem) {
+				DistributedFileSystem dfs = (DistributedFileSystem) fileSystem;
+
+				FsStatus ds = dfs.getStatus();
+				long capacity = ds.getCapacity();
+				long used = ds.getUsed();
+				long logical_used = used / dfs.getDefaultReplication();
+				long remaining = ds.getRemaining();
+				long presentCapacity = used + remaining;
+
+				long liveNodes = -1;
+				long deadNodes = -1;
+				long totalNodes = -1;
+
+				try {
+					DatanodeInfo[] live = dfs.getClient().datanodeReport(DatanodeReportType.LIVE);
+					DatanodeInfo[] dead = dfs.getClient().datanodeReport(DatanodeReportType.DEAD);
+
+					liveNodes = live.length;
+					deadNodes = dead.length;
+					totalNodes = liveNodes + deadNodes;
+				} catch (Exception e) {
+					log.warn("Access denied for user. Skipping node information.");
+				}
+
+				this.database.insertHdfsStats(capacity, presentCapacity, remaining, used, logical_used, (((1.0 * used) / presentCapacity) * 100),
+						dfs.getUnderReplicatedBlocksCount(), dfs.getCorruptBlocksCount(), dfs.getMissingBlocksCount(), totalNodes, liveNodes,
+						deadNodes, TimeHelper.now().getTime(), host, port, hdfsId);
+
+				dfs.close();
+			}
+		} catch (IOException e) {
+			log.error("An IO error occurred while communicating with the DFS.", e);
+		} catch (DataAccessException e) {
+			log.error("An error occurred while writing the HDFSStats to the DB.", e);
+		} catch (NullReturnedException e) {
+			log.error(e.getMessage(), e);
+		} catch (Exception e) {
+			log.error("An unknown error occurred in the CollectorManager.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ClusterCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ClusterCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ClusterCollector.java
new file mode 100644
index 0000000..f08cf15
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ClusterCollector.java
@@ -0,0 +1,70 @@
+package com.nearinfinity.agent.collectors.zookeeper;
+
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import com.nearinfinity.agent.connections.zookeeper.interfaces.ClusterDatabaseInterface;
+
+public class ClusterCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(ClusterCollector.class);
+
+	private final int zookeeperId;
+	private final ZooKeeper zookeeper;
+	private final ClusterDatabaseInterface database;
+
+	public ClusterCollector(int zookeeperId, ZooKeeper zookeeper, ClusterDatabaseInterface database) {
+		this.zookeeperId = zookeeperId;
+		this.zookeeper = zookeeper;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		List<String> onlineClusters;
+		try {
+			onlineClusters = zookeeper.getChildren("/blur/clusters", false);
+		} catch (Exception e) {
+			log.error("Error getting clusters from zookeeper in ClusterCollector.", e);
+			return;
+		}
+
+		for (String cluster : onlineClusters) {
+			try {
+				boolean safeMode = isClusterInSafeMode(cluster);
+				int clusterId = this.database.insertOrUpdateCluster(safeMode, cluster, zookeeperId);
+
+				new Thread(new ShardCollector(clusterId, cluster, this.zookeeper, this.database), "Shard Collector - " + cluster).start();
+				new Thread(new TableCollector(clusterId, cluster, this.zookeeper, this.database), "Table Collector - " + cluster).start();
+			} catch (KeeperException e) {
+				log.error("Error talking to zookeeper in ClusterCollector.", e);
+			} catch (InterruptedException e) {
+				log.error("Zookeeper session expired in ClusterCollector.", e);
+			}
+		}
+
+	}
+
+	private boolean isClusterInSafeMode(String cluster) throws KeeperException, InterruptedException {
+		String blurSafemodePath = "/blur/clusters/" + cluster + "/safemode";
+		Stat stat = this.zookeeper.exists(blurSafemodePath, false);
+		if (stat == null) {
+			return false;
+		}
+
+		byte[] data = this.zookeeper.getData(blurSafemodePath, false, stat);
+		if (data == null) {
+			return false;
+		}
+
+		long timestamp = Long.parseLong(new String(data));
+		long waitTime = timestamp - System.currentTimeMillis();
+		if (waitTime > 0) {
+			return true;
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ControllerCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ControllerCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ControllerCollector.java
new file mode 100644
index 0000000..3a87497
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ControllerCollector.java
@@ -0,0 +1,55 @@
+package com.nearinfinity.agent.collectors.zookeeper;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.nearinfinity.agent.connections.zookeeper.interfaces.ControllerDatabaseInterface;
+import com.nearinfinity.agent.notifications.Notifier;
+
+public class ControllerCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(ControllerCollector.class);
+
+	private final int zookeeperId;
+	private final ZooKeeper zookeeper;
+	private final ControllerDatabaseInterface database;
+
+	public ControllerCollector(int zookeeperId, ZooKeeper zookeeper, ControllerDatabaseInterface database) {
+		this.zookeeperId = zookeeperId;
+		this.zookeeper = zookeeper;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			List<String> onlineControllers = this.zookeeper.getChildren("/blur/online-controller-nodes", false);
+			int recentlyOffline = this.database.markOfflineControllers(onlineControllers, this.zookeeperId);
+			if (recentlyOffline > 0) {
+				Notifier.getNotifier().notifyControllerOffline(this.database.getRecentOfflineControllerNames(recentlyOffline));
+			}
+			updateOnlineControllers(onlineControllers);
+		} catch (KeeperException e) {
+			log.error("Error talking to zookeeper in ControllerCollector.", e);
+		} catch (InterruptedException e) {
+			log.error("Zookeeper session expired in ControllerCollector.", e);
+		}
+
+	}
+
+	private void updateOnlineControllers(List<String> controllers) throws KeeperException, InterruptedException {
+		for (String controller : controllers) {
+			String blurVersion = "UNKNOWN";
+
+			byte[] b = this.zookeeper.getData("/blur/online-controller-nodes/" + controller, false, null);
+			if (b != null && b.length > 0) {
+				blurVersion = new String(b);
+			}
+
+			this.database.updateOnlineController(controller, zookeeperId, blurVersion);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ShardCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ShardCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ShardCollector.java
new file mode 100644
index 0000000..c081478
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ShardCollector.java
@@ -0,0 +1,56 @@
+package com.nearinfinity.agent.collectors.zookeeper;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.nearinfinity.agent.connections.zookeeper.interfaces.ShardsDatabaseInterface;
+import com.nearinfinity.agent.notifications.Notifier;
+
+public class ShardCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(ShardCollector.class);
+
+	private final int clusterId;
+	private final String clusterName;
+	private final ZooKeeper zookeeper;
+	private final ShardsDatabaseInterface database;
+
+	public ShardCollector(int clusterId, String clusterName, ZooKeeper zookeeper, ShardsDatabaseInterface database) {
+		this.clusterId = clusterId;
+		this.clusterName = clusterName;
+		this.zookeeper = zookeeper;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			List<String> shards = this.zookeeper.getChildren("/blur/clusters/" + clusterName + "/online/shard-nodes", false);
+			int recentlyOffline = this.database.markOfflineShards(shards, this.clusterId);
+			if (recentlyOffline > 0) {
+				Notifier.getNotifier().notifyShardOffline(this.database.getRecentOfflineShardNames(recentlyOffline));
+			}
+			updateOnlineShards(shards);
+		} catch (KeeperException e) {
+			log.error("Error talking to zookeeper in ShardCollector.", e);
+		} catch (InterruptedException e) {
+			log.error("Zookeeper session expired in ShardCollector.", e);
+		}
+	}
+
+	private void updateOnlineShards(List<String> shards) throws KeeperException, InterruptedException {
+		for (String shard : shards) {
+			String blurVersion = "UNKNOWN";
+
+			byte[] b = this.zookeeper.getData("/blur/clusters/" + clusterName + "/online/shard-nodes/" + shard, false, null);
+			if (b != null && b.length > 0) {
+				blurVersion = new String(b);
+			}
+
+			this.database.updateOnlineShard(shard, this.clusterId, blurVersion);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/TableCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/TableCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/TableCollector.java
new file mode 100644
index 0000000..f218a13
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/TableCollector.java
@@ -0,0 +1,50 @@
+package com.nearinfinity.agent.collectors.zookeeper;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.nearinfinity.agent.connections.zookeeper.interfaces.TableDatabaseInterface;
+
+public class TableCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(TableCollector.class);
+
+	private final int clusterId;
+	private final String clusterName;
+	private final ZooKeeper zookeeper;
+	private final TableDatabaseInterface database;
+
+	public TableCollector(int clusterId, String clusterName, ZooKeeper zookeeper, TableDatabaseInterface database) {
+		this.clusterId = clusterId;
+		this.clusterName = clusterName;
+		this.zookeeper = zookeeper;
+		this.database = database;
+	}
+
+	@Override
+	public void run() {
+		try {
+			List<String> tables = this.zookeeper.getChildren("/blur/clusters/" + clusterName + "/tables", false);
+			this.database.markDeletedTables(tables, this.clusterId);
+			updateOnlineTables(tables);
+		} catch (KeeperException e) {
+			log.error("Error talking to zookeeper in TableCollector.", e);
+		} catch (InterruptedException e) {
+			log.error("Zookeeper session expired in TableCollector.", e);
+		}
+	}
+
+	private void updateOnlineTables(List<String> tables) throws KeeperException, InterruptedException {
+		for (String table : tables) {
+			String tablePath = "/blur/clusters/" + clusterName + "/tables/" + table;
+
+			String uri = new String(this.zookeeper.getData(tablePath + "/uri", false, null));
+			boolean enabled = this.zookeeper.getChildren(tablePath, false).contains("enabled");
+
+			this.database.updateOnlineTable(table, this.clusterId, uri, enabled);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ZookeeperCollector.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ZookeeperCollector.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ZookeeperCollector.java
new file mode 100644
index 0000000..107327e
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/collectors/zookeeper/ZookeeperCollector.java
@@ -0,0 +1,145 @@
+package com.nearinfinity.agent.collectors.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.nearinfinity.agent.Agent;
+import com.nearinfinity.agent.connections.zookeeper.interfaces.ZookeeperDatabaseInterface;
+import com.nearinfinity.agent.notifications.Notifier;
+
+public class ZookeeperCollector implements Runnable {
+	private static final Log log = LogFactory.getLog(ZookeeperCollector.class);
+
+	private ZooKeeper zookeeper;
+	private boolean connected;
+
+	private final String url;
+	private final String name;
+	private final int id;
+	private final ZookeeperDatabaseInterface database;
+
+	public ZookeeperCollector(String url, String name, String blurConnection, ZookeeperDatabaseInterface database) {
+		this.url = url;
+		this.name = name;
+		this.database = database;
+		this.id = database.insertOrUpdateZookeeper(name, url, blurConnection);
+	}
+
+	@Override
+	public void run() {
+		while (true) {
+			try {
+				if (!this.connected) {
+					this.zookeeper = new ZooKeeper(this.url, 3000, new Watcher() {
+						@Override
+						public void process(WatchedEvent event) {
+							KeeperState state = event.getState();
+							if (state == KeeperState.Disconnected || state == KeeperState.Expired) {
+								log.warn("Zookeeper [" + name + "] disconnected event.");
+								database.setZookeeperOffline(id);
+								Notifier.getNotifier().notifyZookeeperOffline(name);
+								connected = false;
+							} else if (state == KeeperState.SyncConnected) {
+								log.info("Zookeeper [" + name + "] session established.");
+								connected = true;
+							}
+						}
+					});
+				}
+			} catch (IOException e) {
+				log.error("A zookeeper [" + this.name + "] connection could not be created, waiting 30 seconds.");
+				// Sleep the thread for 30secs to give the Zookeeper a chance to become
+				// available.
+				try {
+					Thread.sleep(30000);
+					continue;
+				} catch (InterruptedException ex) {
+					log.info("Exiting Zookeeper [" + this.name + "] instance");
+					return;
+				}
+			}
+
+			if (this.connected) {
+				new Thread(new ControllerCollector(this.id, this.zookeeper, this.database), "Controller Collector - " + this.name).start();
+				new Thread(new ClusterCollector(this.id, this.zookeeper, this.database), "Cluster Collector - " + this.name).start();
+			}
+
+			testEnsembleHealth();
+
+			try {
+				Thread.sleep(Agent.COLLECTOR_SLEEP_TIME);
+			} catch (InterruptedException e) {
+				log.info("Exiting Zookeeper [" + this.name + "] instance");
+				return;
+			}
+		}
+	}
+
+	private void testEnsembleHealth() {
+		String[] connections = this.url.split(",");
+		List<String> onlineZookeepers = new ArrayList<String>();
+		for (String connection : connections) {
+			try {
+				URI parsedConnection = new URI("my://" + connection);
+				String host = parsedConnection.getHost();
+				int port = parsedConnection.getPort() >= 0 ? parsedConnection.getPort() : 2181;
+				byte[] reqBytes = new byte[4];
+				ByteBuffer req = ByteBuffer.wrap(reqBytes);
+				req.putInt(ByteBuffer.wrap("ruok".getBytes()).getInt());
+				Socket socket = new Socket();
+				socket.setSoLinger(false, 10);
+				socket.setSoTimeout(20000);
+				parsedConnection.getPort();
+				socket.connect(new InetSocketAddress(host, port));
+
+				InputStream response = socket.getInputStream();
+				OutputStream question = socket.getOutputStream();
+
+				question.write(reqBytes);
+
+				byte[] resBytes = new byte[4];
+
+				response.read(resBytes);
+				String status = new String(resBytes);
+				if (status.equals("imok")) {
+					onlineZookeepers.add(connection);
+				}
+				socket.close();
+				response.close();
+				question.close();
+			} catch (Exception e) {
+				log.error("A connection to " + connection + " could not be made.", e);
+			}
+		}
+		try {
+			if (connections.length == onlineZookeepers.size()){
+				this.database.setZookeeperOnline(id);
+			} else if (connections.length < onlineZookeepers.size() * 2) {
+				this.database.setZookeeperWarning(this.id);
+			} else if (this.connected){
+				this.database.setZookeeperFailure(this.id);
+			} else {
+				this.database.setZookeeperOffline(this.id);
+			}
+			this.database.setOnlineEnsembleNodes(new ObjectMapper().writeValueAsString(onlineZookeepers), this.id);
+		} catch (Exception e) {
+			log.error("The online ensemble nodes array could not be created, writing that they are all offline!");
+			this.database.setOnlineEnsembleNodes("{}", this.id);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/JdbcConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/JdbcConnection.java
new file mode 100644
index 0000000..3804c89
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/JdbcConnection.java
@@ -0,0 +1,26 @@
+package com.nearinfinity.agent.connections;
+
+import java.util.Properties;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+public class JdbcConnection {
+	public static JdbcTemplate createDBConnection(Properties props) {
+		String url = props.getProperty("store.url");
+		BasicDataSource dataSource = new BasicDataSource();
+		dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+		dataSource.setUrl(url);
+		dataSource.setUsername(props.getProperty("store.user"));
+		dataSource.setPassword(props.getProperty("store.password"));
+		dataSource.setMaxActive(80);
+		dataSource.setMinIdle(2);
+		dataSource.setMaxWait(10000);
+		dataSource.setMaxIdle(-1);
+		dataSource.setRemoveAbandoned(true);
+		dataSource.setRemoveAbandonedTimeout(60);
+		dataSource.setDefaultAutoCommit(true);
+
+		return new JdbcTemplate(dataSource);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/BlurDatabaseConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/BlurDatabaseConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/BlurDatabaseConnection.java
new file mode 100644
index 0000000..311fba2
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/BlurDatabaseConnection.java
@@ -0,0 +1,149 @@
+package com.nearinfinity.agent.connections.blur;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.json.simple.JSONValue;
+import org.springframework.dao.IncorrectResultSizeDataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import com.nearinfinity.agent.connections.blur.interfaces.BlurDatabaseInterface;
+import com.nearinfinity.agent.exceptions.TableCollisionException;
+import com.nearinfinity.agent.exceptions.TableMissingException;
+import com.nearinfinity.agent.exceptions.ZookeeperNameCollisionException;
+import com.nearinfinity.agent.exceptions.ZookeeperNameMissingException;
+import com.nearinfinity.agent.types.TimeHelper;
+import com.nearinfinity.blur.thrift.generated.BlurQueryStatus;
+import com.nearinfinity.blur.thrift.generated.SimpleQuery;
+
+public class BlurDatabaseConnection implements BlurDatabaseInterface {
+
+	private final JdbcTemplate jdbc;
+
+	public BlurDatabaseConnection(JdbcTemplate jdbc) {
+		this.jdbc = jdbc;
+	}
+
+	@Override
+	public String resolveConnectionString(int zookeeperId) {
+		String queryString = "select distinct node_name from blur_controllers where zookeeper_id = ? and controller_status = 1";
+		List<String> controller_uris = jdbc.queryForList(queryString, new String[] { Integer.toString(zookeeperId) }, String.class);
+		String connection = StringUtils.join(controller_uris, ',');
+		this.jdbc.update("update zookeepers set blur_urls=? where id = ?", connection, zookeeperId);
+		return connection;
+	}
+
+	@Override
+	public String getZookeeperId(final String zookeeperName) throws ZookeeperNameMissingException, ZookeeperNameCollisionException {
+		List<Map<String, Object>> zookeepers = jdbc.queryForList("select id from zookeepers where name = ?", zookeeperName);
+		switch (zookeepers.size()) {
+		case 0:
+			throw new ZookeeperNameMissingException(zookeeperName);
+		case 1:
+			return zookeepers.get(0).get("ID").toString();
+		default:
+			throw new ZookeeperNameCollisionException(zookeepers.size(), zookeeperName);
+		}
+	}
+
+	@Override
+	public List<Map<String, Object>> getClusters(final int zookeeperId) {
+		return jdbc.queryForList("select id, name from clusters where zookeeper_id = ?", zookeeperId);
+	}
+
+	@Override
+	public Map<String, Object> getExistingTable(final String table, final Integer clusterId) throws TableMissingException,
+			TableCollisionException {
+		List<Map<String, Object>> existingTable = jdbc.queryForList(
+				"select id, cluster_id from blur_tables where table_name=? and cluster_id=?", table, clusterId);
+		switch (existingTable.size()) {
+		case 0:
+			throw new TableMissingException(table);
+		case 1:
+			return existingTable.get(0);
+		default:
+			throw new TableCollisionException(existingTable.size(), table);
+		}
+	}
+
+	@Override
+	public int getTableId(int clusterId, String tableName) {
+		try {
+			return jdbc.queryForInt("select id from blur_tables where cluster_id=? and table_name =?", clusterId, tableName);
+		} catch (IncorrectResultSizeDataAccessException e) {
+			return -1;
+		}
+	}
+
+	@Override
+	public void updateTableSchema(final int tableId, final String schema, final String tableAnalyzer) {
+		jdbc.update("update blur_tables set table_schema=?, table_analyzer=? where id=?", new Object[] { schema, tableAnalyzer, tableId });
+	}
+
+	@Override
+	public void updateTableServer(final int tableId, String server) {
+		jdbc.update("update blur_tables set server=? where id=?", new Object[] { server, tableId });
+	}
+
+	@Override
+	public void updateTableStats(final int tableId, Long tableBytes, Long tableQueries, Long tableRecordCount, Long tableRowCount) {
+		jdbc.update("update blur_tables set current_size=?, query_usage=?, record_count=?, row_count=? where id=?", new Object[] { tableBytes,
+				tableQueries, tableRecordCount, tableRowCount, tableId });
+	}
+
+	public Map<String, Object> getQuery(int tableId, long UUID) {
+		try {
+			return this.jdbc.queryForMap("select id, complete_shards, times, state from blur_queries where blur_table_id=? and uuid=?", tableId,
+					UUID);
+		} catch (IncorrectResultSizeDataAccessException e) {
+			return null;
+		}
+	}
+
+	public void createQuery(BlurQueryStatus status, SimpleQuery query, String times, Date startTime, int tableId) {
+		this.jdbc
+				.update(
+						"insert into blur_queries (query_string, times, complete_shards, total_shards, state, uuid, created_at, updated_at, blur_table_id, super_query_on, facets, start, fetch_num, pre_filters, post_filters, selector_column_families, selector_columns, userid, record_only) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
+						query.getQueryStr(),
+						times,
+						status.getCompleteShards(),
+						status.getTotalShards(),
+						status.getState().getValue(),
+						status.getUuid(),
+						startTime,
+						TimeHelper.now().getTime(),
+						tableId,
+						query.isSuperQueryOn(),
+						StringUtils.join(status.getQuery().getFacets(), ", "),
+						status.getQuery().getStart(),
+						status.getQuery().getFetch(),
+						query.getPreSuperFilter(),
+						query.getPostSuperFilter(),
+						status.getQuery().getSelector() == null ? null : JSONValue.toJSONString(status.getQuery().getSelector()
+								.getColumnFamiliesToFetch()),
+						status.getQuery().getSelector() == null ? null : JSONValue.toJSONString(status.getQuery().getSelector().getColumnsToFetch()),
+						status.getQuery().getUserContext(), status.getQuery().getSelector() == null ? null : status.getQuery().getSelector()
+								.isRecordOnly());
+	}
+
+	public void updateQuery(BlurQueryStatus status, String times, int queryId) {
+		jdbc.update("update blur_queries set times=?, complete_shards=?, total_shards=?, state=?, updated_at=? where id=?", times,
+				status.getCompleteShards(), status.getTotalShards(), status.getState().getValue(), TimeHelper.now().getTime(), queryId);
+	}
+
+	@Override
+	public List<Long> getRunningQueries() {
+		return this.jdbc.queryForList("select uuid from blur_queries where state = 0", Long.class);
+	}
+
+	@Override
+	public void markOrphanedRunningQueriesComplete(Collection<Long> queries) {
+		if (!queries.isEmpty()) {
+			this.jdbc.update("update blur_queries set state=3 where uuid in (" + StringUtils.join(queries, ", ") + ")");
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/BlurDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/BlurDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/BlurDatabaseInterface.java
new file mode 100644
index 0000000..679e903
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/BlurDatabaseInterface.java
@@ -0,0 +1,21 @@
+package com.nearinfinity.agent.connections.blur.interfaces;
+
+import java.util.List;
+import java.util.Map;
+
+import com.nearinfinity.agent.connections.blur.interfaces.TableDatabaseInterface;
+import com.nearinfinity.agent.exceptions.TableCollisionException;
+import com.nearinfinity.agent.exceptions.TableMissingException;
+import com.nearinfinity.agent.exceptions.ZookeeperNameCollisionException;
+import com.nearinfinity.agent.exceptions.ZookeeperNameMissingException;
+
+public interface BlurDatabaseInterface extends TableDatabaseInterface, QueryDatabaseInterface {
+	String resolveConnectionString(int zookeeperId);
+
+	String getZookeeperId(final String zookeeperName) throws ZookeeperNameMissingException, ZookeeperNameCollisionException;
+
+	List<Map<String, Object>> getClusters(final int zookeeperId);
+
+	Map<String, Object> getExistingTable(final String table, final Integer clusterId) throws TableMissingException, TableCollisionException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/QueryDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/QueryDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/QueryDatabaseInterface.java
new file mode 100644
index 0000000..e9436b1
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/QueryDatabaseInterface.java
@@ -0,0 +1,21 @@
+package com.nearinfinity.agent.connections.blur.interfaces;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import com.nearinfinity.blur.thrift.generated.BlurQueryStatus;
+import com.nearinfinity.blur.thrift.generated.SimpleQuery;
+
+public interface QueryDatabaseInterface {
+	Map<String, Object> getQuery(int tableId, long UUID);
+
+	List<Long> getRunningQueries();
+
+	void createQuery(BlurQueryStatus status, SimpleQuery query, String times, Date startTime, int tableId);
+
+	void updateQuery(BlurQueryStatus status, String times, int queryId);
+	
+	void markOrphanedRunningQueriesComplete(Collection<Long> queries);
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/TableDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/TableDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/TableDatabaseInterface.java
new file mode 100644
index 0000000..dbc8e01
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/blur/interfaces/TableDatabaseInterface.java
@@ -0,0 +1,12 @@
+package com.nearinfinity.agent.connections.blur.interfaces;
+
+public interface TableDatabaseInterface {
+	int getTableId(int clusterId, String tableName);
+
+	void updateTableSchema(final int tableId, final String schema, final String tableAnalyzer);
+
+	void updateTableServer(final int tableId, final String server);
+
+	void updateTableStats(final int tableId, final Long tableBytes, final Long tableQueries, final Long tableRecordCount,
+			final Long tableRowCount);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/CleanerDatabaseConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/CleanerDatabaseConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/CleanerDatabaseConnection.java
new file mode 100644
index 0000000..6ba5555
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/CleanerDatabaseConnection.java
@@ -0,0 +1,43 @@
+package com.nearinfinity.agent.connections.cleaners;
+
+import java.util.Calendar;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import com.nearinfinity.agent.connections.cleaners.interfaces.CleanerDatabaseInterface;
+import com.nearinfinity.agent.types.TimeHelper;
+
+public class CleanerDatabaseConnection implements CleanerDatabaseInterface {
+	private final JdbcTemplate jdbc;
+
+	// Query Expiration Times
+	private final int expireThreshold = 2 * 60 * 1000;
+	private final int deleteThreshold = 2 * 60 * 60 * 1000;
+
+	// Hdfs Expiration Times
+	private final int timeToLive = 14 * 24 * 60 * 60 * 1000;
+
+	public CleanerDatabaseConnection(JdbcTemplate jdbc) {
+		this.jdbc = jdbc;
+	}
+
+	@Override
+	public int deleteOldQueries() {
+		Calendar twoHoursAgo = TimeHelper.getTimeAgo(deleteThreshold);
+		return this.jdbc.update("delete from blur_queries where created_at < ?", twoHoursAgo.getTime());
+	}
+
+	@Override
+	public int expireOldQueries() {
+		Calendar now = TimeHelper.now();
+		Calendar twoMinutesAgo = TimeHelper.getTimeAgo(expireThreshold);
+		return this.jdbc.update("update blur_queries set state=1, updated_at=? where updated_at < ? and state = 0", now.getTime(),
+				twoMinutesAgo);
+	}
+
+	@Override
+	public int deleteOldStats() {
+		Calendar ttlDaysAgo = TimeHelper.getTimeAgo(timeToLive);
+		return jdbc.update("delete from hdfs_stats where created_at < ?", ttlDaysAgo);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/CleanerDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/CleanerDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/CleanerDatabaseInterface.java
new file mode 100644
index 0000000..fef657e
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/CleanerDatabaseInterface.java
@@ -0,0 +1,5 @@
+package com.nearinfinity.agent.connections.cleaners.interfaces;
+
+public interface CleanerDatabaseInterface extends HdfsDatabaseCleanerInterface, QueryDatabaseCleanerInterface {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/HdfsDatabaseCleanerInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/HdfsDatabaseCleanerInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/HdfsDatabaseCleanerInterface.java
new file mode 100644
index 0000000..eeb7f48
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/HdfsDatabaseCleanerInterface.java
@@ -0,0 +1,5 @@
+package com.nearinfinity.agent.connections.cleaners.interfaces;
+
+public interface HdfsDatabaseCleanerInterface {
+	int deleteOldStats();
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/QueryDatabaseCleanerInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/QueryDatabaseCleanerInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/QueryDatabaseCleanerInterface.java
new file mode 100644
index 0000000..5d2b337
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/cleaners/interfaces/QueryDatabaseCleanerInterface.java
@@ -0,0 +1,8 @@
+package com.nearinfinity.agent.connections.cleaners.interfaces;
+
+public interface QueryDatabaseCleanerInterface {
+	int deleteOldQueries();
+
+	int expireOldQueries();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/HdfsDatabaseConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/HdfsDatabaseConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/HdfsDatabaseConnection.java
new file mode 100644
index 0000000..6e65620
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/HdfsDatabaseConnection.java
@@ -0,0 +1,49 @@
+package com.nearinfinity.agent.connections.hdfs;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.springframework.dao.IncorrectResultSizeDataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import com.nearinfinity.agent.connections.hdfs.interfaces.HdfsDatabaseInterface;
+import com.nearinfinity.agent.exceptions.NullReturnedException;
+
+public class HdfsDatabaseConnection implements HdfsDatabaseInterface {
+	private final JdbcTemplate jdbc;
+
+	public HdfsDatabaseConnection(JdbcTemplate jdbc) {
+		this.jdbc = jdbc;
+	}
+
+	@Override
+	public void setHdfsInfo(String name, String host, int port) {
+		List<Map<String, Object>> existingHdfs = jdbc.queryForList("select id from hdfs where name=?", name);
+
+		if (existingHdfs.isEmpty()) {
+			jdbc.update("insert into hdfs (name, host, port) values (?, ?, ?)", name, host, port);
+		} else {
+			jdbc.update("update hdfs set host=?, port=? where id=?", host, port, existingHdfs.get(0).get("ID"));
+		}
+	}
+
+	@Override
+	public int getHdfsId(String name) throws NullReturnedException {
+		try {
+			return jdbc.queryForInt("select id from hdfs where name = ?", name);
+		} catch (IncorrectResultSizeDataAccessException e) {
+			return -1;
+		}
+	}
+
+	@Override
+	public void insertHdfsStats(long capacity, long presentCapacity, long remaining, long used, long logical_used, double d,
+			long underReplicatedBlocksCount, long corruptBlocksCount, long missingBlocksCount, long totalNodes, long liveNodes, long deadNodes,
+			Date time, String host, int port, int hdfsId) {
+		jdbc.update(
+				"insert into hdfs_stats (config_capacity, present_capacity, dfs_remaining, dfs_used_real, dfs_used_logical, dfs_used_percent, under_replicated, corrupt_blocks, missing_blocks, total_nodes, live_nodes, dead_nodes, created_at, host, port, hdfs_id) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
+				capacity, presentCapacity, remaining, used, logical_used, (((1.0 * used) / presentCapacity) * 100), underReplicatedBlocksCount,
+				corruptBlocksCount, missingBlocksCount, totalNodes, liveNodes, deadNodes, time, host, port, hdfsId);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/interfaces/HdfsDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/interfaces/HdfsDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/interfaces/HdfsDatabaseInterface.java
new file mode 100644
index 0000000..0d6abe2
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/hdfs/interfaces/HdfsDatabaseInterface.java
@@ -0,0 +1,15 @@
+package com.nearinfinity.agent.connections.hdfs.interfaces;
+
+import java.util.Date;
+
+import com.nearinfinity.agent.exceptions.NullReturnedException;
+
+public interface HdfsDatabaseInterface {
+	void setHdfsInfo(String name, String host, int port);
+
+	int getHdfsId(String name) throws NullReturnedException;
+
+	void insertHdfsStats(long capacity, long presentCapacity, long remaining, long used, long logical_used, double d,
+			long underReplicatedBlocksCount, long corruptBlocksCount, long missingBlocksCount, long totalNodes, long liveNodes, long deadNodes,
+			Date time, String host, int port, int hdfsId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/ZookeeperDatabaseConnection.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/ZookeeperDatabaseConnection.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/ZookeeperDatabaseConnection.java
new file mode 100644
index 0000000..cca9dae
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/ZookeeperDatabaseConnection.java
@@ -0,0 +1,162 @@
+package com.nearinfinity.agent.connections.zookeeper;
+
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.dao.EmptyResultDataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import com.nearinfinity.agent.connections.zookeeper.interfaces.ClusterDatabaseInterface;
+import com.nearinfinity.agent.connections.zookeeper.interfaces.ControllerDatabaseInterface;
+import com.nearinfinity.agent.connections.zookeeper.interfaces.ShardsDatabaseInterface;
+import com.nearinfinity.agent.connections.zookeeper.interfaces.TableDatabaseInterface;
+import com.nearinfinity.agent.connections.zookeeper.interfaces.ZookeeperDatabaseInterface;
+import com.nearinfinity.agent.types.TimeHelper;
+
+public class ZookeeperDatabaseConnection implements ZookeeperDatabaseInterface, ControllerDatabaseInterface, TableDatabaseInterface,
+		ShardsDatabaseInterface, ClusterDatabaseInterface {
+
+	private final JdbcTemplate jdbc;
+
+	public ZookeeperDatabaseConnection(JdbcTemplate jdbc) {
+		this.jdbc = jdbc;
+	}
+
+	@Override
+	public void setZookeeperOnline(int zookeeperId) {
+		this.jdbc.update("update zookeepers set zookeeper_status=? where id=?", 1, zookeeperId);
+	}
+
+	@Override
+	public void setZookeeperWarning(int zookeeperId) {
+		this.jdbc.update("update zookeepers set zookeeper_status=? where id=?", 2, zookeeperId);
+	}
+
+	@Override
+	public void setZookeeperFailure(int zookeeperId) {
+		this.jdbc.update("update zookeepers set zookeeper_status=? where id=?", 3, zookeeperId);
+	}
+
+	@Override
+	public void setZookeeperOffline(int zookeeperId) {
+		this.jdbc.update("update zookeepers set zookeeper_status=? where id=?", 0, zookeeperId);
+	}
+
+	@Override
+	public int insertOrUpdateZookeeper(String name, String url, String blurConnection) {
+		int updatedCount = jdbc.update("update zookeepers set url=? where name=?", url, name);
+
+		if (updatedCount == 0) {
+			jdbc.update("insert into zookeepers (name, url, blur_urls) values (?, ?, ?)", name, url, blurConnection);
+		}
+
+		return jdbc.queryForInt("select id from zookeepers where name = ?", name);
+	}
+
+	@Override
+	public int insertOrUpdateCluster(boolean safeMode, String cluster, int zookeeperId) {
+		int updateCount = this.jdbc.update("update clusters set safe_mode=? where name=? and zookeeper_id=?", safeMode, cluster, zookeeperId);
+		if (updateCount == 0) {
+			this.jdbc.update("insert into clusters (name, zookeeper_id, safe_mode) values (?, ?, ?)", cluster, zookeeperId, safeMode);
+		}
+		return this.jdbc.queryForInt("select id from clusters where name=? and zookeeper_id=?", cluster, zookeeperId);
+	}
+
+	@Override
+	public int markOfflineControllers(List<String> onlineControllers, int zookeeperId) {
+		if (onlineControllers.isEmpty()) {
+			return this.jdbc.update("update blur_controllers set controller_status=0, updated_at=? where zookeeper_id = ?", TimeHelper.now().getTime(),
+					zookeeperId);
+		} else {
+			return this.jdbc.update(
+					"update blur_controllers set controller_status=0, updated_at=? where controller_status!=0 and node_name not in ('"
+							+ StringUtils.join(onlineControllers, "','") + "') and zookeeper_id = ?", TimeHelper.now().getTime(), zookeeperId);
+		}
+	}
+
+	@Override
+	public int markOfflineShards(List<String> onlineShards, int clusterId) {
+		if (onlineShards.isEmpty()) {
+			return this.jdbc.update("update blur_shards set shard_status=0 updated_at=? where cluster_id = ?", TimeHelper.now().getTime(), clusterId);
+		} else {
+			return this.jdbc.update(
+					"update blur_shards set shard_status=0, updated_at=? where shard_status!=0 and node_name not in ('" + StringUtils.join(onlineShards, "','")
+							+ "') and cluster_id=?", TimeHelper.now().getTime(), clusterId);
+		}
+	}
+
+	@Override
+	public void markDeletedTables(List<String> onlineTables, int clusterId) {
+		if (onlineTables.isEmpty()) {
+			this.jdbc.update("delete from blur_tables where cluster_id=?", clusterId);
+		} else {
+			this.jdbc.update("delete from blur_tables where cluster_id=? and table_name not in ('"
+					+ StringUtils.join(onlineTables, "','") + "')", clusterId);
+		}
+	}
+
+	@Override
+	public void updateOnlineController(String controller, int zookeeperId, String blurVersion) {
+		int zookeeperStatus = this.jdbc.queryForInt("select zookeeper_status from zookeepers where id=?", zookeeperId);
+		int status = (zookeeperStatus == 0 || zookeeperStatus == 2) ? 2 : 1;
+		int updatedCount = this.jdbc.update(
+				"update blur_controllers set controller_status=?, blur_version=?, updated_at=? where node_name=? and zookeeper_id =?", status, blurVersion,
+				TimeHelper.now().getTime(), controller, zookeeperId);
+
+		if (updatedCount == 0) {
+			this.jdbc.update(
+					"insert into blur_controllers (node_name, controller_status, zookeeper_id, blur_version, created_at, updated_at) values (?, ?, ?, ?, ?, ?)",
+					controller, status, zookeeperId, blurVersion, TimeHelper.now().getTime(), TimeHelper.now().getTime());
+		}
+	}
+
+	@Override
+	public void updateOnlineShard(String shard, int clusterId, String blurVersion) {
+		int zookeeperStatus = this.jdbc.queryForInt(
+				"select zookeepers.zookeeper_status from zookeepers, clusters where clusters.id=? and clusters.zookeeper_id=zookeepers.id;",
+				clusterId);
+		int status = (zookeeperStatus == 0 || zookeeperStatus == 2) ? 2 : 1;
+		int updatedCount = this.jdbc.update("update blur_shards set shard_status=?, blur_version=?, updated_at=? where node_name=? and cluster_id=?",
+				status, blurVersion, TimeHelper.now().getTime(), shard, clusterId);
+
+		if (updatedCount == 0) {
+			this.jdbc.update(
+					"insert into blur_shards (node_name, shard_status, cluster_id, blur_version, created_at, updated_at) values (?, ?, ?, ?, ?, ?)", shard,
+					status, clusterId, blurVersion, TimeHelper.now().getTime(), TimeHelper.now().getTime());
+		}
+	}
+
+	@Override
+	public void updateOnlineTable(String table, int clusterId, String uri, boolean enabled) {
+		try{
+			int currentStatus = this.jdbc.queryForInt("select table_status from blur_tables where table_name=? and cluster_id=?", table, clusterId);
+			if (enabled && currentStatus != 3){
+				this.jdbc.update("update blur_tables set table_uri=?, table_status=?, updated_at=? where table_name=? and cluster_id=?",
+						uri, 4, TimeHelper.now().getTime(), table, clusterId);
+				
+			}
+			if (!enabled && currentStatus != 5) {
+				this.jdbc.update("update blur_tables set table_uri=?, table_status=?, updated_at=? where table_name=? and cluster_id=?",
+						uri, 2, TimeHelper.now().getTime(), table, clusterId);
+			}
+		} catch(EmptyResultDataAccessException e){
+			this.jdbc.update("insert into blur_tables (table_name, table_uri, table_status, cluster_id, updated_at) values (?, ?, ?, ?, ?)", table,
+					uri, (enabled ? 4 : 2), clusterId, TimeHelper.now().getTime());
+		}
+	}
+
+	@Override
+	public void setOnlineEnsembleNodes(String ensembleArray, int zookeeperId) {
+		this.jdbc.update("update zookeepers set online_ensemble_nodes=? where id=?", ensembleArray, zookeeperId);
+	}
+
+	@Override
+	public List<String> getRecentOfflineShardNames(int amount) {
+		return this.jdbc.queryForList("select node_name from blur_shards where shard_status=0 order by updated_at limit 0, " + amount, String.class);
+	}
+
+	@Override
+	public List<String> getRecentOfflineControllerNames(int amount) {
+		return this.jdbc.queryForList("select node_name from blur_controllers where controller_status=0 order by updated_at limit 0, " + amount,
+				String.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ClusterDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ClusterDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ClusterDatabaseInterface.java
new file mode 100644
index 0000000..80713ad
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ClusterDatabaseInterface.java
@@ -0,0 +1,7 @@
+package com.nearinfinity.agent.connections.zookeeper.interfaces;
+
+public interface ClusterDatabaseInterface extends ShardsDatabaseInterface, TableDatabaseInterface {
+
+	int insertOrUpdateCluster(boolean safeMode, String cluster, int zookeeperId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ControllerDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ControllerDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ControllerDatabaseInterface.java
new file mode 100644
index 0000000..6fd2831
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ControllerDatabaseInterface.java
@@ -0,0 +1,12 @@
+package com.nearinfinity.agent.connections.zookeeper.interfaces;
+
+import java.util.List;
+
+public interface ControllerDatabaseInterface {
+
+	int markOfflineControllers(List<String> onlineControllers, int zookeeperId);
+
+	void updateOnlineController(String controller, int zookeeperId, String blurVersion);
+
+	List<String> getRecentOfflineControllerNames(int amount);
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ShardsDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ShardsDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ShardsDatabaseInterface.java
new file mode 100644
index 0000000..7215cd4
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ShardsDatabaseInterface.java
@@ -0,0 +1,13 @@
+package com.nearinfinity.agent.connections.zookeeper.interfaces;
+
+import java.util.List;
+
+public interface ShardsDatabaseInterface {
+
+	int markOfflineShards(List<String> onlineShards, int clusterId);
+
+	void updateOnlineShard(String shard, int clusterId, String blurVersion);
+
+	List<String> getRecentOfflineShardNames(int amount);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/TableDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/TableDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/TableDatabaseInterface.java
new file mode 100644
index 0000000..f7ede4a
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/TableDatabaseInterface.java
@@ -0,0 +1,11 @@
+package com.nearinfinity.agent.connections.zookeeper.interfaces;
+
+import java.util.List;
+
+public interface TableDatabaseInterface {
+
+	void markDeletedTables(List<String> onlineTables, int clusterId);
+
+	void updateOnlineTable(String table, int clusterId, String uri, boolean enabled);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ZookeeperDatabaseInterface.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ZookeeperDatabaseInterface.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ZookeeperDatabaseInterface.java
new file mode 100644
index 0000000..47e2ae1
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/connections/zookeeper/interfaces/ZookeeperDatabaseInterface.java
@@ -0,0 +1,17 @@
+package com.nearinfinity.agent.connections.zookeeper.interfaces;
+
+public interface ZookeeperDatabaseInterface extends ControllerDatabaseInterface, ClusterDatabaseInterface {
+
+	void setZookeeperOnline(int id);
+
+	void setZookeeperWarning(int id);
+
+	void setZookeeperOffline(int id);
+
+  void setZookeeperFailure(int id);
+
+	int insertOrUpdateZookeeper(String name, String url, String blurConnection);
+
+	void setOnlineEnsembleNodes(String ensembleArray, int zookeeperId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/CollisionException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/CollisionException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/CollisionException.java
new file mode 100644
index 0000000..335217a
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/CollisionException.java
@@ -0,0 +1,8 @@
+package com.nearinfinity.agent.exceptions;
+
+@SuppressWarnings("serial")
+public class CollisionException extends Exception {
+	public CollisionException(int size, String type, String name) {
+		super("Found [" + size + "] " + type + " by identifier [" + name + "].  Need one and only one result.  Skipping collection.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/HdfsThreadException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/HdfsThreadException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/HdfsThreadException.java
new file mode 100644
index 0000000..8de4997
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/HdfsThreadException.java
@@ -0,0 +1,12 @@
+package com.nearinfinity.agent.exceptions;
+
+@SuppressWarnings("serial")
+public class HdfsThreadException extends Exception {
+	public HdfsThreadException() {
+		super();
+	}
+
+	public HdfsThreadException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/InvalidLicenseException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/InvalidLicenseException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/InvalidLicenseException.java
new file mode 100644
index 0000000..f8351e0
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/InvalidLicenseException.java
@@ -0,0 +1,13 @@
+package com.nearinfinity.agent.exceptions;
+
+public class InvalidLicenseException extends Exception {
+	private static final long serialVersionUID = 1317409878802094298L;
+
+	public InvalidLicenseException(String message) {
+		super(message);
+	}
+
+	public InvalidLicenseException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/MissingException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/MissingException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/MissingException.java
new file mode 100644
index 0000000..917991b
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/MissingException.java
@@ -0,0 +1,8 @@
+package com.nearinfinity.agent.exceptions;
+
+@SuppressWarnings("serial")
+public class MissingException extends Exception {
+	public MissingException(String type, String name) {
+		super("Couldn't Find a " + type + " by name [" + name + "].  Need a single result.  Skipping collection.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/NullReturnedException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/NullReturnedException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/NullReturnedException.java
new file mode 100644
index 0000000..7761462
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/NullReturnedException.java
@@ -0,0 +1,12 @@
+package com.nearinfinity.agent.exceptions;
+
+@SuppressWarnings("serial")
+public class NullReturnedException extends Exception {
+	public NullReturnedException() {
+		super();
+	}
+
+	public NullReturnedException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableCollisionException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableCollisionException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableCollisionException.java
new file mode 100644
index 0000000..e174781
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableCollisionException.java
@@ -0,0 +1,8 @@
+package com.nearinfinity.agent.exceptions;
+
+@SuppressWarnings("serial")
+public class TableCollisionException extends CollisionException {
+	public TableCollisionException(int size, String table) {
+		super(size, "Table", table);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableMissingException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableMissingException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableMissingException.java
new file mode 100644
index 0000000..f98c7a3
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/TableMissingException.java
@@ -0,0 +1,8 @@
+package com.nearinfinity.agent.exceptions;
+
+@SuppressWarnings("serial")
+public class TableMissingException extends MissingException {
+	public TableMissingException(String table) {
+		super("Table", table);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameCollisionException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameCollisionException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameCollisionException.java
new file mode 100644
index 0000000..6d1dff0
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameCollisionException.java
@@ -0,0 +1,8 @@
+package com.nearinfinity.agent.exceptions;
+
+@SuppressWarnings("serial")
+public class ZookeeperNameCollisionException extends CollisionException {
+	public ZookeeperNameCollisionException(int size, String zookeeperName) {
+		super(size, "Zookeepers", zookeeperName);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameMissingException.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameMissingException.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameMissingException.java
new file mode 100644
index 0000000..61f4d3e
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/exceptions/ZookeeperNameMissingException.java
@@ -0,0 +1,8 @@
+package com.nearinfinity.agent.exceptions;
+
+@SuppressWarnings("serial")
+public class ZookeeperNameMissingException extends MissingException {
+	public ZookeeperNameMissingException(String zookeeperName) {
+		super("Zookeeper", zookeeperName);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/monitor/ThreadController.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/monitor/ThreadController.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/monitor/ThreadController.java
new file mode 100644
index 0000000..e2d46df
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/monitor/ThreadController.java
@@ -0,0 +1,29 @@
+package com.nearinfinity.agent.monitor;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+public class ThreadController {
+	private static Set<Thread> threads = new HashSet<Thread>();
+	public static boolean exitOnStop = true;
+
+	public static void registerThread(Thread thread) {
+		if (thread != null) {
+			threads.add(thread);
+		}
+	}
+
+	public static void stopAllThreads() {
+		Iterator<Thread> iterator = threads.iterator();
+		while (iterator.hasNext()) {
+			Thread next = iterator.next();
+			next.interrupt();
+			iterator.remove();
+		}
+
+		if (exitOnStop) {
+			System.exit(1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/AgentMailerAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/AgentMailerAuthenticator.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/AgentMailerAuthenticator.java
new file mode 100644
index 0000000..047bf8c
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/AgentMailerAuthenticator.java
@@ -0,0 +1,16 @@
+package com.nearinfinity.agent.notifications;
+
+import javax.mail.Authenticator;
+import javax.mail.PasswordAuthentication;
+
+public class AgentMailerAuthenticator extends Authenticator {
+	private final PasswordAuthentication authentication;
+
+	public AgentMailerAuthenticator(String username, String password) {
+		this.authentication = new PasswordAuthentication(username, password);
+	}
+
+	protected PasswordAuthentication getPasswordAuthentication() {
+		return this.authentication;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b5b86c7e/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Mailer.java
----------------------------------------------------------------------
diff --git a/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Mailer.java b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Mailer.java
new file mode 100644
index 0000000..7548df4
--- /dev/null
+++ b/src/contrib/blur-console/blur-agent/src/main/java/com/nearinfinity/agent/notifications/Mailer.java
@@ -0,0 +1,78 @@
+package com.nearinfinity.agent.notifications;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class Mailer {
+	private static final Log log = LogFactory.getLog(Mailer.class);
+
+	private AgentMailerAuthenticator authenticator;
+	private String automatedSender;
+	private List<InternetAddress> recipients;
+	private Properties mailerProperties;
+	private boolean sendMail = false;
+
+	public Mailer(Properties props) {
+		if (props.containsKey("mail.enabled") && props.getProperty("mail.enabled").equals("true")) {
+			sendMail = true;
+			authenticator = new AgentMailerAuthenticator(props.getProperty("mail.sender.username"), props.getProperty("mail.sender.password"));
+			automatedSender = props.getProperty("mail.from.address", "DoNotReply");
+			setupRecipients(props.getProperty("mail.recipients", ""));			
+			
+			mailerProperties = new Properties();
+			mailerProperties.put("mail.transport.protocol", "smtp");
+			mailerProperties.put("mail.smtp.host", props.getProperty("mail.host", ""));
+			mailerProperties.put("mail.smtp.port", props.getProperty("mail.port", ""));
+			mailerProperties.put("mail.smtp.auth", "true");
+			mailerProperties.put("mail.smtp.starttls.enable", "true");
+		}
+	}
+
+	public void sendMessage(String subject, String messageBody) {
+		if (!sendMail) {
+			log.info("Mailer has been disabled.  No emails will be sent.");
+			return;
+		}
+		if (recipients.isEmpty()) {
+			log.warn("There were no recipients found to send mail to.  Skipping send mail.");
+			return;
+		}
+
+		Session session = Session.getInstance(this.mailerProperties, this.authenticator);
+		try {
+			MimeMessage message = new MimeMessage(session);
+			message.setFrom(new InternetAddress(this.automatedSender));
+			message.addRecipients(Message.RecipientType.TO, recipients.toArray(new InternetAddress[recipients.size()]));
+
+			message.setSubject(subject);
+			message.setContent(messageBody, "text/plain");
+
+			Transport.send(message);
+		} catch (MessagingException e) {
+			log.error("An error occured while sending a warning email.", e);
+		}
+	}
+	
+	private void setupRecipients(String recipientProp) {
+		recipients = new ArrayList<InternetAddress>();
+		for (String email : recipientProp.split("\\|")) {
+			try {
+				recipients.add(new InternetAddress(email));
+			} catch (AddressException e) {
+				log.warn("Invalid email address found.  Ignoring.", e);
+			}
+		}
+	}
+}


Mime
View raw message