incubator-oodt-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattm...@apache.org
Subject svn commit: r963480 - in /incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index: DataSourceIndex.java InMemoryIndex.java Index.java IngestService.java WorkflowManagerDataSourceIndex.java
Date Mon, 12 Jul 2010 20:39:12 GMT
Author: mattmann
Date: Mon Jul 12 20:39:12 2010
New Revision: 963480

URL: http://svn.apache.org/viewvc?rev=963480&view=rev
Log:
- progress towards OODT-15 One trunk for all OODT components with top level build

Modified:
    incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/DataSourceIndex.java
    incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/InMemoryIndex.java
    incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/Index.java
    incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/IngestService.java
    incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/WorkflowManagerDataSourceIndex.java

Modified: incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/DataSourceIndex.java
URL: http://svn.apache.org/viewvc/incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/DataSourceIndex.java?rev=963480&r1=963479&r2=963480&view=diff
==============================================================================
--- incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/DataSourceIndex.java (original)
+++ incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/DataSourceIndex.java Mon Jul 12 20:39:12 2010
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -65,395 +65,453 @@ import org.apache.oodt.cas.commons.date.
  * 
  * DataSource Indexer which supports both ingest and query
  * 
- * @author bfoster
- *
+ * 
  */
 public class DataSourceIndex implements Index, IngestService, QueryService {
 
-	private static final Logger LOG = Logger.getLogger(DataSourceIndex.class.getName());
-	
-	private DataSource dataSource;
-	private boolean useUTF8;
-	
-	public DataSourceIndex(String user, String pass, String driver, String jdbcUrl, boolean useUTF8) {
-		this.dataSource = DatabaseConnectionBuilder.buildDataSource(user, pass, driver, jdbcUrl);
-		this.useUTF8 = useUTF8;
-	}
-	
-	public IndexPager getPager(PageInfo pageInfo) throws CatalogIndexException {
-		return new IndexPager(new ProcessedPageInfo(pageInfo.getPageSize(), pageInfo.getPageNum(), this.getNumOfTransactions()));
-	}
-	
-	protected int getNumOfTransactions() throws CatalogIndexException {
-		Connection conn = null;
-		Statement stmt = null;
-		ResultSet rs = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			rs = stmt.executeQuery("SELECT COUNT(transaction_id) AS numTransIds FROM transactions");
-			if (rs.next())
-				return rs.getInt("numTransIds");
-			else
-				throw new Exception("Failed to query for number of transactions");
-		}catch (Exception e) {
-			throw new CatalogIndexException("Failed to get number of transactions : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-			try {
-				rs.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public List<TransactionId<?>> getPage(IndexPager indexPage) throws CatalogIndexException {
-//		Connection conn = null;
-//		Statement stmt = null;
-//		ResultSet rs = null;
-//		try {
-//			conn = this.dataSource.getConnection();
-//			stmt = conn.createStatement();
-//			rs = stmt.executeQuery("SELECT transaction_id,transaction_class,transaction_date FROM transactions");
-//			int startLoc = pager.getPageNum() * pager.getPageSize();
-//			int endLoc = startLoc + pager.getPageSize();
-//			List<IngestReceipt> receipts = new Vector<IngestReceipt>();
-//			for (int i = startLoc; i < endLoc && rs.next(); i++) {
-//				receipts.add(new IngestReceipt(((TransactionId<?>) Class.forName(rs.getString("transaction_class")).getConstructor(String.class).newInstance(rs.getString("transaction_id"))), DateUtils.toCalendar(rs.getString("transaction_date"), DateUtils.FormatType.LOCAL_FORMAT).getTime()));
-//			}
-//			return rs.next();
-//		}catch (Exception e) {
-//			throw new CatalogIndexException("Failed to check for transaction id '" + transactionId + "' : " + e.getMessage(), e);
-//		}finally {
-//			try {
-//				conn.close();
-//			}catch(Exception e) {}
-//			try {
-//				stmt.close();
-//			}catch(Exception e) {}
-//			try {
-//				rs.close();
-//			}catch(Exception e) {}
-//		}
-		return null;
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public Properties getProperties() throws CatalogIndexException {
-		return new Properties();
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public String getProperty(String key) throws CatalogIndexException {
-		return null;
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public TransactionIdFactory getTransactionIdFactory()
-			throws CatalogIndexException {
-		return new UuidTransactionIdFactory();
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public boolean hasTransactionId(TransactionId<?> transactionId)
-			throws CatalogIndexException {
-		Connection conn = null;
-		Statement stmt = null;
-		ResultSet rs = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			rs = stmt.executeQuery("SELECT DISTINCT transaction_id FROM transactions WHERE transaction_id = '" + transactionId + "'");
-			return rs.next();
-		}catch (Exception e) {
-			throw new CatalogIndexException("Failed to check for transaction id '" + transactionId + "' : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-			try {
-				rs.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public boolean delete(TransactionId<?> transactionId)
-			throws IngestServiceException {
-		Connection conn = null;
-		Statement stmt = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			stmt.execute("DELETE FROM transactions WHERE transaction_id = '" + transactionId + "'");
-			stmt.execute("DELETE FROM transaction_terms WHERE transaction_id = '" + transactionId + "'");
-			conn.commit();
-			return true;
-		}catch (Exception e) {
-			throw new IngestServiceException("Failed to delete transaction id '" + transactionId + "' : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public IngestReceipt ingest(List<TermBucket> termBuckets) throws IngestServiceException {
-		Connection conn = null;
-		Statement stmt = null;
-		TransactionId<?> catalogTransactionId = null;
-		try {
-			catalogTransactionId = this.getTransactionIdFactory().createNewTransactionId();
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			Calendar calendar = DateUtils.getCurrentLocalTime();
-			stmt.execute("INSERT INTO transactions VALUES ('" + catalogTransactionId + "','" + DateUtils.toString(calendar) + "')");
-			for (TermBucket termBucket : termBuckets) {
-				for (Term term : termBucket.getTerms()) {
-					for (String value : term.getValues()) {
-						try {
-							stmt.execute("INSERT INTO transaction_terms VALUES ('" + catalogTransactionId + "','" + termBucket.getName() + "','" + term.getName() + "','" + (this.useUTF8 ? URLEncoder.encode(value, "UTF8") : value) + "')");
-						}catch (Exception e) {
-							LOG.log(Level.WARNING, "Failed to ingest term: '" + catalogTransactionId + "','" + termBucket.getName() + "','" + term.getName() + "','" + value + "'");
-						}
-					}
-				}
-			}
-			conn.commit();
-			return new IngestReceipt(catalogTransactionId, calendar.getTime());
-		}catch (Exception e) {
-			throw new IngestServiceException("Failed to ingest metadata for transaction id '" + catalogTransactionId + "' : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public boolean reduce(TransactionId<?> transactionId,
-			List<TermBucket> termBuckets) throws IngestServiceException {
-		Connection conn = null;
-		Statement stmt = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			for (TermBucket termBucket : termBuckets) 
-				for (Term term : termBucket.getTerms()) 
-					for (String value : term.getValues()) 
-						try {
-							stmt.execute("DELETE FROM transaction_terms WHERE transaction_id = '" + transactionId + "' AND term_name = '" + term.getName() + "' AND term_value = '" + (this.useUTF8 ? URLEncoder.encode(value, "UTF8") : value) + "'");
-						}catch (Exception e) {
-							LOG.log(Level.WARNING, "Failed to delete term: '" + transactionId + "','" + termBucket.getName() + "','" + term.getName() + "','" + value + "'");
-						}
-			conn.commit();
-			return true;
-		}catch (Exception e) {
-			throw new IngestServiceException("Failed to delete transaction id '" + transactionId + "' : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public IngestReceipt update(TransactionId<?> transactionId,
-			List<TermBucket> termBuckets) throws IngestServiceException {
-		this.reduce(transactionId, termBuckets);
-		Connection conn = null;
-		Statement stmt = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			for (TermBucket termBucket : termBuckets) 
-				for (Term term : termBucket.getTerms()) 
-					for (String value : term.getValues())
-						try {
-							stmt.execute("INSERT INTO transaction_terms VALUES ('" + transactionId + "','" + termBucket.getName() + "','" + term.getName() + "','" + (this.useUTF8 ? URLEncoder.encode(value, "UTF8") : value) + "')");
-						}catch (Exception e) {
-							LOG.log(Level.WARNING, "Failed to ingest term: '" + transactionId + "','" + termBucket.getName() + "','" + term.getName() + "','" + value + "'");
-						}
-			Calendar calendar = DateUtils.getCurrentLocalTime();
-			stmt.execute("UPDATE transactions SET transaction_date = '" + DateUtils.toString(calendar) + "' WHERE transaction_id = '" + transactionId + "'");
-			return new IngestReceipt(transactionId, calendar.getTime());
-		}catch (Exception e) {
-			throw new IngestServiceException("Failed to ingest metadata for transaction id '" + transactionId + "' : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public List<TermBucket> getBuckets(TransactionId<?> transactionId)
-			throws QueryServiceException {
-		Connection conn = null;
-		Statement stmt = null;
-		ResultSet rs = null;
-		try {
-			HashMap<String, TermBucket> termBuckets = new HashMap<String, TermBucket>();
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			rs = stmt.executeQuery("SELECT bucket_name,term_name,term_value FROM transaction_terms WHERE transaction_id = '" + transactionId + "'");
-			while (rs.next()) {
-                String bucketName = rs.getString("bucket_name");
-                String termName = rs.getString("term_name");
-                String termValue = rs.getString("term_value");
-                TermBucket bucket = termBuckets.get(bucketName);
-                if (bucket == null)
-                	bucket = new TermBucket(bucketName);
-                Term term = new Term(termName, Collections.singletonList((this.useUTF8 ? URLDecoder.decode(termValue, "UTF8") : termValue)));
-                bucket.addTerm(term);
-                termBuckets.put(bucketName, bucket);
-			}
-			return new Vector<TermBucket>(termBuckets.values());
-		}catch (Exception e) {
-			throw new QueryServiceException("Failed to get term buckets for transaction id '" + transactionId + "' : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-			try {
-				rs.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public Map<TransactionId<?>, List<TermBucket>> getBuckets(
-			List<TransactionId<?>> transactionIds) throws QueryServiceException {
-		HashMap<TransactionId<?>, List<TermBucket>> map = new HashMap<TransactionId<?>, List<TermBucket>>();
-		for (TransactionId<?> transactionId : transactionIds) 
-			map.put(transactionId, this.getBuckets(transactionId));
-		return map;
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public List<IngestReceipt> query(QueryExpression queryExpression)
-			throws QueryServiceException {
-		Connection conn = null;
-		Statement stmt = null;
-		ResultSet rs = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			String sqlQuery = "SELECT DISTINCT transaction_id,transaction_date FROM transactions WHERE transaction_id IN (" + this.getSqlQuery(queryExpression) + ")";
-//	        LOG.log(Level.INFO, "Performing Query: " + sqlQuery);
-			rs = stmt.executeQuery(sqlQuery);	
-
-			List<IngestReceipt> receipts = new Vector<IngestReceipt>();
-			while (rs.next()) 
-				receipts.add(new IngestReceipt(this.getTransactionIdFactory().createTransactionId(rs.getString("transaction_id")), DateUtils.toCalendar(rs.getString("transaction_date"), DateUtils.FormatType.LOCAL_FORMAT).getTime()));
-			return receipts;
-		}catch (Exception e) {
-			throw new QueryServiceException("Failed to query Workflow Instances Database : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-			try {
-				rs.close();
-			}catch(Exception e) {}
-		}
-	}
-	
-    private String getSqlQuery(QueryExpression queryExpression) throws QueryServiceException, UnsupportedEncodingException {
-        String sqlQuery = null;
-        if (queryExpression instanceof QueryLogicalGroup) {
-        	QueryLogicalGroup qlg = (QueryLogicalGroup) queryExpression;
-            sqlQuery = "(" + this.getSqlQuery(qlg.getExpressions().get(0));
-            String op = qlg.getOperator() == QueryLogicalGroup.Operator.AND ? "INTERSECT" : "UNION";
-            for (int i = 1; i < qlg.getExpressions().size(); i++) 
-                sqlQuery += ") " + op + " (" + this.getSqlQuery(qlg.getExpressions().get(i));
-            sqlQuery += ")";
-        }else if (queryExpression instanceof ComparisonQueryExpression){
-        	ComparisonQueryExpression cqe = (ComparisonQueryExpression) queryExpression;
-        	String operator = null;
-            if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.EQUAL_TO)) {
-            	operator = "=";
-            } else if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.GREATER_THAN)) {
-            	operator = ">";
-            } else if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.GREATER_THAN_EQUAL_TO)) {
-            	operator = ">=";
-            } else if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.LESS_THAN)) {
-            	operator = "<";
-            } else if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.LESS_THAN_EQUAL_TO)) {
-            	operator = "<=";
-            } else {
-                throw new QueryServiceException("Invalid ComparisonQueryExpression Operator '" + cqe.getOperator() + "'");
+  private static final Logger LOG = Logger.getLogger(DataSourceIndex.class
+      .getName());
+
+  private DataSource dataSource;
+  private boolean useUTF8;
+
+  public DataSourceIndex(String user, String pass, String driver,
+      String jdbcUrl, boolean useUTF8) {
+    this.dataSource = DatabaseConnectionBuilder.buildDataSource(user, pass,
+        driver, jdbcUrl);
+    this.useUTF8 = useUTF8;
+  }
+
+  public IndexPager getPager(PageInfo pageInfo) throws CatalogIndexException {
+    return new IndexPager(new ProcessedPageInfo(pageInfo.getPageSize(),
+        pageInfo.getPageNum(), this.getNumOfTransactions()));
+  }
+
+  protected int getNumOfTransactions() throws CatalogIndexException {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      rs = stmt
+          .executeQuery("SELECT COUNT(transaction_id) AS numTransIds FROM transactions");
+      if (rs.next())
+        return rs.getInt("numTransIds");
+      else
+        throw new Exception("Failed to query for number of transactions");
+    } catch (Exception e) {
+      throw new CatalogIndexException("Failed to get number of transactions : "
+          + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+      try {
+        rs.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public List<TransactionId<?>> getPage(IndexPager indexPage)
+      throws CatalogIndexException {
+    return null;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Properties getProperties() throws CatalogIndexException {
+    return new Properties();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public String getProperty(String key) throws CatalogIndexException {
+    return null;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public TransactionIdFactory getTransactionIdFactory()
+      throws CatalogIndexException {
+    return new UuidTransactionIdFactory();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean hasTransactionId(TransactionId<?> transactionId)
+      throws CatalogIndexException {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      rs = stmt
+          .executeQuery("SELECT DISTINCT transaction_id FROM transactions WHERE transaction_id = '"
+              + transactionId + "'");
+      return rs.next();
+    } catch (Exception e) {
+      throw new CatalogIndexException("Failed to check for transaction id '"
+          + transactionId + "' : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+      try {
+        rs.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean delete(TransactionId<?> transactionId)
+      throws IngestServiceException {
+    Connection conn = null;
+    Statement stmt = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      stmt.execute("DELETE FROM transactions WHERE transaction_id = '"
+          + transactionId + "'");
+      stmt.execute("DELETE FROM transaction_terms WHERE transaction_id = '"
+          + transactionId + "'");
+      conn.commit();
+      return true;
+    } catch (Exception e) {
+      throw new IngestServiceException("Failed to delete transaction id '"
+          + transactionId + "' : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public IngestReceipt ingest(List<TermBucket> termBuckets)
+      throws IngestServiceException {
+    Connection conn = null;
+    Statement stmt = null;
+    TransactionId<?> catalogTransactionId = null;
+    try {
+      catalogTransactionId = this.getTransactionIdFactory()
+          .createNewTransactionId();
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      Calendar calendar = DateUtils.getCurrentLocalTime();
+      stmt.execute("INSERT INTO transactions VALUES ('" + catalogTransactionId
+          + "','" + DateUtils.toString(calendar) + "')");
+      for (TermBucket termBucket : termBuckets) {
+        for (Term term : termBucket.getTerms()) {
+          for (String value : term.getValues()) {
+            try {
+              stmt.execute("INSERT INTO transaction_terms VALUES ('"
+                  + catalogTransactionId + "','" + termBucket.getName() + "','"
+                  + term.getName() + "','"
+                  + (this.useUTF8 ? URLEncoder.encode(value, "UTF8") : value)
+                  + "')");
+            } catch (Exception e) {
+              LOG.log(Level.WARNING, "Failed to ingest term: '"
+                  + catalogTransactionId + "','" + termBucket.getName() + "','"
+                  + term.getName() + "','" + value + "'");
             }
-            
-            sqlQuery = "SELECT DISTINCT transaction_id FROM transaction_terms WHERE term_name = '" + cqe.getTerm().getName() + "' AND (";
-        	for (int i = 0; i < cqe.getTerm().getValues().size(); i++) {
-        		String value = cqe.getTerm().getValues().get(i);
-                sqlQuery += "term_value " + operator + " '" + (this.useUTF8 ? URLEncoder.encode(value, "UTF-8") : value) + "'";
-	            if ((i + 1) < cqe.getTerm().getValues().size())
-	            	sqlQuery += "OR";
-        	}
-        	sqlQuery += ")";
-        }else if (queryExpression instanceof NotQueryExpression) {
-        	NotQueryExpression nqe = (NotQueryExpression) queryExpression;
-            sqlQuery = "SELECT DISTINCT transaction_id FROM transaction_terms WHERE NOT (" + this.getSqlQuery(nqe.getQueryExpression()) + ")";
-        }else if (queryExpression instanceof StdQueryExpression) {
-            sqlQuery = "SELECT DISTINCT transaction_id FROM transaction_terms";
-        }else {
-            throw new QueryServiceException("Invalid QueryExpression '" + queryExpression.getClass().getCanonicalName() + "'");
+          }
         }
-        return sqlQuery;
+      }
+      conn.commit();
+      return new IngestReceipt(catalogTransactionId, calendar.getTime());
+    } catch (Exception e) {
+      throw new IngestServiceException(
+          "Failed to ingest metadata for transaction id '"
+              + catalogTransactionId + "' : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean reduce(TransactionId<?> transactionId,
+      List<TermBucket> termBuckets) throws IngestServiceException {
+    Connection conn = null;
+    Statement stmt = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      for (TermBucket termBucket : termBuckets)
+        for (Term term : termBucket.getTerms())
+          for (String value : term.getValues())
+            try {
+              stmt
+                  .execute("DELETE FROM transaction_terms WHERE transaction_id = '"
+                      + transactionId
+                      + "' AND term_name = '"
+                      + term.getName()
+                      + "' AND term_value = '"
+                      + (this.useUTF8 ? URLEncoder.encode(value, "UTF8")
+                          : value) + "'");
+            } catch (Exception e) {
+              LOG.log(Level.WARNING, "Failed to delete term: '" + transactionId
+                  + "','" + termBucket.getName() + "','" + term.getName()
+                  + "','" + value + "'");
+            }
+      conn.commit();
+      return true;
+    } catch (Exception e) {
+      throw new IngestServiceException("Failed to delete transaction id '"
+          + transactionId + "' : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public IngestReceipt update(TransactionId<?> transactionId,
+      List<TermBucket> termBuckets) throws IngestServiceException {
+    this.reduce(transactionId, termBuckets);
+    Connection conn = null;
+    Statement stmt = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      for (TermBucket termBucket : termBuckets)
+        for (Term term : termBucket.getTerms())
+          for (String value : term.getValues())
+            try {
+              stmt.execute("INSERT INTO transaction_terms VALUES ('"
+                  + transactionId + "','" + termBucket.getName() + "','"
+                  + term.getName() + "','"
+                  + (this.useUTF8 ? URLEncoder.encode(value, "UTF8") : value)
+                  + "')");
+            } catch (Exception e) {
+              LOG.log(Level.WARNING, "Failed to ingest term: '" + transactionId
+                  + "','" + termBucket.getName() + "','" + term.getName()
+                  + "','" + value + "'");
+            }
+      Calendar calendar = DateUtils.getCurrentLocalTime();
+      stmt.execute("UPDATE transactions SET transaction_date = '"
+          + DateUtils.toString(calendar) + "' WHERE transaction_id = '"
+          + transactionId + "'");
+      return new IngestReceipt(transactionId, calendar.getTime());
+    } catch (Exception e) {
+      throw new IngestServiceException(
+          "Failed to ingest metadata for transaction id '" + transactionId
+              + "' : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public List<TermBucket> getBuckets(TransactionId<?> transactionId)
+      throws QueryServiceException {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      HashMap<String, TermBucket> termBuckets = new HashMap<String, TermBucket>();
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      rs = stmt
+          .executeQuery("SELECT bucket_name,term_name,term_value FROM transaction_terms WHERE transaction_id = '"
+              + transactionId + "'");
+      while (rs.next()) {
+        String bucketName = rs.getString("bucket_name");
+        String termName = rs.getString("term_name");
+        String termValue = rs.getString("term_value");
+        TermBucket bucket = termBuckets.get(bucketName);
+        if (bucket == null)
+          bucket = new TermBucket(bucketName);
+        Term term = new Term(termName, Collections
+            .singletonList((this.useUTF8 ? URLDecoder.decode(termValue, "UTF8")
+                : termValue)));
+        bucket.addTerm(term);
+        termBuckets.put(bucketName, bucket);
+      }
+      return new Vector<TermBucket>(termBuckets.values());
+    } catch (Exception e) {
+      throw new QueryServiceException(
+          "Failed to get term buckets for transaction id '" + transactionId
+              + "' : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+      try {
+        rs.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Map<TransactionId<?>, List<TermBucket>> getBuckets(
+      List<TransactionId<?>> transactionIds) throws QueryServiceException {
+    HashMap<TransactionId<?>, List<TermBucket>> map = new HashMap<TransactionId<?>, List<TermBucket>>();
+    for (TransactionId<?> transactionId : transactionIds)
+      map.put(transactionId, this.getBuckets(transactionId));
+    return map;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public List<IngestReceipt> query(QueryExpression queryExpression)
+      throws QueryServiceException {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      String sqlQuery = "SELECT DISTINCT transaction_id,transaction_date FROM transactions WHERE transaction_id IN ("
+          + this.getSqlQuery(queryExpression) + ")";
+      rs = stmt.executeQuery(sqlQuery);
+
+      List<IngestReceipt> receipts = new Vector<IngestReceipt>();
+      while (rs.next())
+        receipts.add(new IngestReceipt(this.getTransactionIdFactory()
+            .createTransactionId(rs.getString("transaction_id")), DateUtils
+            .toCalendar(rs.getString("transaction_date"),
+                DateUtils.FormatType.LOCAL_FORMAT).getTime()));
+      return receipts;
+    } catch (Exception e) {
+      throw new QueryServiceException(
+          "Failed to query Workflow Instances Database : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+      try {
+        rs.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  private String getSqlQuery(QueryExpression queryExpression)
+      throws QueryServiceException, UnsupportedEncodingException {
+    String sqlQuery = null;
+    if (queryExpression instanceof QueryLogicalGroup) {
+      QueryLogicalGroup qlg = (QueryLogicalGroup) queryExpression;
+      sqlQuery = "(" + this.getSqlQuery(qlg.getExpressions().get(0));
+      String op = qlg.getOperator() == QueryLogicalGroup.Operator.AND ? "INTERSECT"
+          : "UNION";
+      for (int i = 1; i < qlg.getExpressions().size(); i++)
+        sqlQuery += ") " + op + " ("
+            + this.getSqlQuery(qlg.getExpressions().get(i));
+      sqlQuery += ")";
+    } else if (queryExpression instanceof ComparisonQueryExpression) {
+      ComparisonQueryExpression cqe = (ComparisonQueryExpression) queryExpression;
+      String operator = null;
+      if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.EQUAL_TO)) {
+        operator = "=";
+      } else if (cqe.getOperator().equals(
+          ComparisonQueryExpression.Operator.GREATER_THAN)) {
+        operator = ">";
+      } else if (cqe.getOperator().equals(
+          ComparisonQueryExpression.Operator.GREATER_THAN_EQUAL_TO)) {
+        operator = ">=";
+      } else if (cqe.getOperator().equals(
+          ComparisonQueryExpression.Operator.LESS_THAN)) {
+        operator = "<";
+      } else if (cqe.getOperator().equals(
+          ComparisonQueryExpression.Operator.LESS_THAN_EQUAL_TO)) {
+        operator = "<=";
+      } else {
+        throw new QueryServiceException(
+            "Invalid ComparisonQueryExpression Operator '" + cqe.getOperator()
+                + "'");
+      }
+
+      sqlQuery = "SELECT DISTINCT transaction_id FROM transaction_terms WHERE term_name = '"
+          + cqe.getTerm().getName() + "' AND (";
+      for (int i = 0; i < cqe.getTerm().getValues().size(); i++) {
+        String value = cqe.getTerm().getValues().get(i);
+        sqlQuery += "term_value " + operator + " '"
+            + (this.useUTF8 ? URLEncoder.encode(value, "UTF-8") : value) + "'";
+        if ((i + 1) < cqe.getTerm().getValues().size())
+          sqlQuery += "OR";
+      }
+      sqlQuery += ")";
+    } else if (queryExpression instanceof NotQueryExpression) {
+      NotQueryExpression nqe = (NotQueryExpression) queryExpression;
+      sqlQuery = "SELECT DISTINCT transaction_id FROM transaction_terms WHERE NOT ("
+          + this.getSqlQuery(nqe.getQueryExpression()) + ")";
+    } else if (queryExpression instanceof StdQueryExpression) {
+      sqlQuery = "SELECT DISTINCT transaction_id FROM transaction_terms";
+    } else {
+      throw new QueryServiceException("Invalid QueryExpression '"
+          + queryExpression.getClass().getCanonicalName() + "'");
     }
+    return sqlQuery;
+  }
 
 }
-	

Modified: incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/InMemoryIndex.java
URL: http://svn.apache.org/viewvc/incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/InMemoryIndex.java?rev=963480&r1=963479&r2=963480&view=diff
==============================================================================
--- incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/InMemoryIndex.java (original)
+++ incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/InMemoryIndex.java Mon Jul 12 20:39:12 2010
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -38,83 +38,85 @@ import org.apache.oodt.cas.catalog.struc
 import org.apache.oodt.cas.catalog.term.TermBucket;
 
 /**
- * @author bfoster
- * @version $Revision$
- *
- * <p>
+ * 
  * A in memory Index which is ingestable
- * <p>
+ * 
  */
 public class InMemoryIndex implements Index, IngestService {
 
-	private static final long serialVersionUID = -3978455064365343116L;
-	
-	public HashMap<TransactionId<?>, List<TermBucket>> transactionIdToBucketsMap;
-	public HashMap<TransactionId<?>, Date> transactionIdToTransactionDate;
-	
-	public InMemoryIndex() {
-		this.transactionIdToBucketsMap = new HashMap<TransactionId<?>, List<TermBucket>>();
-		this.transactionIdToTransactionDate = new HashMap<TransactionId<?>, Date>();
-	}
-	
-	public String getProperty(String key) throws CatalogIndexException {
-		return null;
-	}
-	
-	public Properties getProperties() throws CatalogIndexException {
-		return new Properties();
-	}
-	
-	public List<TransactionId<?>> getPage(IndexPager indexPage) {
-		List<TransactionId<?>> returnList = new Vector<TransactionId<?>>();
-		int skipToLocation = (int) ((int) indexPage.getPageSize() * indexPage.getPageNum());
-		List<TransactionId<?>> transactionIds = new Vector<TransactionId<?>>(this.transactionIdToBucketsMap.keySet());
-		for (int i = skipToLocation; i < transactionIds.size() && i < (skipToLocation + indexPage.getPageSize()); i++)
-			returnList.add(transactionIds.get(i));
-		if (returnList.size() > 0)
-			return returnList;
-		else 
-			return Collections.emptyList();
-	}
-
-
-	public TransactionIdFactory getTransactionIdFactory() throws CatalogIndexException {
-		return new UuidTransactionIdFactory();
-	}
-
-	public boolean delete(TransactionId<?> transactionId)
-			throws IngestServiceException {
-		return this.transactionIdToBucketsMap.remove(transactionId) != null;
-	}
-
-	public IngestReceipt ingest(List<TermBucket> termBuckets) throws IngestServiceException {
-		TransactionId<?> transactionId = null;
-		try {
-			transactionId = this.getTransactionIdFactory().createNewTransactionId();
-			this.transactionIdToBucketsMap.put(transactionId, termBuckets);
-			Date transactionDate = new Date();
-			this.transactionIdToTransactionDate.put(transactionId, transactionDate);
-			return new IngestReceipt(transactionId, transactionDate);
-		}catch (Exception e) {
-			throw new IngestServiceException("Failed to ingest '" + transactionId + "' : " + e.getMessage());
-		}
-	}
-
-	public boolean reduce(TransactionId<?> transactionId,
-			List<TermBucket> termBuckets) throws IngestServiceException {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	public IngestReceipt update(TransactionId<?> transactionId,
-			List<TermBucket> termBuckets) throws IngestServiceException {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	public boolean hasTransactionId(TransactionId<?> catalogTransactionid)
-			throws CatalogIndexException {
-		return this.transactionIdToBucketsMap.containsKey(catalogTransactionid);
-	}
+  private static final long serialVersionUID = -3978455064365343116L;
+
+  public HashMap<TransactionId<?>, List<TermBucket>> transactionIdToBucketsMap;
+  public HashMap<TransactionId<?>, Date> transactionIdToTransactionDate;
+
+  public InMemoryIndex() {
+    this.transactionIdToBucketsMap = new HashMap<TransactionId<?>, List<TermBucket>>();
+    this.transactionIdToTransactionDate = new HashMap<TransactionId<?>, Date>();
+  }
+
+  public String getProperty(String key) throws CatalogIndexException {
+    return null;
+  }
+
+  public Properties getProperties() throws CatalogIndexException {
+    return new Properties();
+  }
+
+  public List<TransactionId<?>> getPage(IndexPager indexPage) {
+    List<TransactionId<?>> returnList = new Vector<TransactionId<?>>();
+    int skipToLocation = (int) ((int) indexPage.getPageSize() * indexPage
+        .getPageNum());
+    List<TransactionId<?>> transactionIds = new Vector<TransactionId<?>>(
+        this.transactionIdToBucketsMap.keySet());
+    for (int i = skipToLocation; i < transactionIds.size()
+        && i < (skipToLocation + indexPage.getPageSize()); i++)
+      returnList.add(transactionIds.get(i));
+    if (returnList.size() > 0)
+      return returnList;
+    else
+      return Collections.emptyList();
+  }
+
+  public TransactionIdFactory getTransactionIdFactory()
+      throws CatalogIndexException {
+    return new UuidTransactionIdFactory();
+  }
+
+  public boolean delete(TransactionId<?> transactionId)
+      throws IngestServiceException {
+    return this.transactionIdToBucketsMap.remove(transactionId) != null;
+  }
+
+  public IngestReceipt ingest(List<TermBucket> termBuckets)
+      throws IngestServiceException {
+    TransactionId<?> transactionId = null;
+    try {
+      transactionId = this.getTransactionIdFactory().createNewTransactionId();
+      this.transactionIdToBucketsMap.put(transactionId, termBuckets);
+      Date transactionDate = new Date();
+      this.transactionIdToTransactionDate.put(transactionId, transactionDate);
+      return new IngestReceipt(transactionId, transactionDate);
+    } catch (Exception e) {
+      throw new IngestServiceException("Failed to ingest '" + transactionId
+          + "' : " + e.getMessage());
+    }
+  }
+
+  public boolean reduce(TransactionId<?> transactionId,
+      List<TermBucket> termBuckets) throws IngestServiceException {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  public IngestReceipt update(TransactionId<?> transactionId,
+      List<TermBucket> termBuckets) throws IngestServiceException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public boolean hasTransactionId(TransactionId<?> catalogTransactionid)
+      throws CatalogIndexException {
+    return this.transactionIdToBucketsMap.containsKey(catalogTransactionid);
+  }
 
 }

Modified: incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/Index.java
URL: http://svn.apache.org/viewvc/incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/Index.java?rev=963480&r1=963479&r2=963480&view=diff
==============================================================================
--- incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/Index.java (original)
+++ incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/Index.java Mon Jul 12 20:39:12 2010
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -26,42 +26,43 @@ import java.util.List;
 import java.util.Properties;
 
 /**
- * @author bfoster
- * @version $Revision$
- *
- * <p>
- * A Interface for Communicating with an Term Index.  Should also implement
- * IngestService to allow Term ingest to this Index and/or implement 
+ * 
+ * A Interface for Communicating with an Term Index. Should also implement
+ * IngestService to allow Term ingest to this Index and/or implement
  * QueryService to allow Term query on this Index.
- * <p>
+ * 
  */
 public interface Index {
-	
-	public Properties getProperties() throws CatalogIndexException;
-	
-	public String getProperty(String key) throws CatalogIndexException;
-	
-	/**
-	 * Returns a list of TransactionIds associated with the 
-	 * given Index page.
-	 * @param indexPage The page for which TransactionIds will be returned
-	 * @return A page of TransactionIds, if page does not exist,
-	 * then returns null.
-	 */
-	public List<TransactionId<?>> getPage(IndexPager indexPage) throws CatalogIndexException;
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public TransactionIdFactory getTransactionIdFactory() throws CatalogIndexException;
-	
-	/**
-	 * 
-	 * @param catalogTransactionid
-	 * @return
-	 * @throws CatalogIndexException
-	 */
-	public boolean hasTransactionId(TransactionId<?> transactionid)  throws CatalogIndexException;
-	
+
+  public Properties getProperties() throws CatalogIndexException;
+
+  public String getProperty(String key) throws CatalogIndexException;
+
+  /**
+   * Returns a list of TransactionIds associated with the given Index page.
+   * 
+   * @param indexPage
+   *          The page for which TransactionIds will be returned
+   * @return A page of TransactionIds, if page does not exist, then returns
+   *         null.
+   */
+  public List<TransactionId<?>> getPage(IndexPager indexPage)
+      throws CatalogIndexException;
+
+  /**
+   * 
+   * @return
+   */
+  public TransactionIdFactory getTransactionIdFactory()
+      throws CatalogIndexException;
+
+  /**
+   * 
+   * @param catalogTransactionid
+   * @return
+   * @throws CatalogIndexException
+   */
+  public boolean hasTransactionId(TransactionId<?> transactionid)
+      throws CatalogIndexException;
+
 }

Modified: incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/IngestService.java
URL: http://svn.apache.org/viewvc/incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/IngestService.java?rev=963480&r1=963479&r2=963480&view=diff
==============================================================================
--- incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/IngestService.java (original)
+++ incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/IngestService.java Mon Jul 12 20:39:12 2010
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,7 +17,7 @@
 
 package org.apache.oodt.cas.catalog.struct;
 
-//OODT imports
+//JDK imports
 import java.util.List;
 
 //OODT imports
@@ -26,48 +26,64 @@ import org.apache.oodt.cas.catalog.page.
 import org.apache.oodt.cas.catalog.term.TermBucket;
 
 /**
- * @author bfoster
- * @version $Revision$
- *
- * <p>
+ * 
  * A Interface for performing ingests to an Index
- * <p>
+ * 
  */
 public interface IngestService {
-	
-	/**
-	 * Indexes the given TermBucket to a TransactionId, and returns a IngestReceipt
-	 * @param termBuckets The List of TermBucket to be ingested
-	 * @return IngestReceipt Receipt of ingest
-	 * @throws IngestServiceException Any error 
-	 */
-	public IngestReceipt ingest(List<TermBucket> termBuckets) throws IngestServiceException;
-	
-	/**
-	 * TermBucket updates to given TransactionId.  A new TransactionId can be returned in IngestReceipt
-	 * if so desired and it will automatically get remapped by CatalogService.  Existing metadata 
-	 * for given TransactionId should not be deleted, just the terms in the given term buckets should
-	 * be modified.  For a complete re-ingest, one should instead delete() then ingest().
-	 * @param transactionId
-	 * @param termBuckets
-	 * @throws IngestServiceException
-	 */
-	public IngestReceipt update(TransactionId<?> transactionId, List<TermBucket> termBuckets) throws IngestServiceException;
-	
-	/**
-	 * Deletes all TermBuckets attached to given TransactionId -- there should be no trace of 
-	 * given transaction after this method is called.
-	 * @param transactionId The ID for given transaction which should be erased
-	 * @throws IngestServiceException Any error 
-	 */
-	public boolean delete(TransactionId<?> transactionId) throws IngestServiceException;
-	
-	/**
-	 * Deletes only the Terms in the given TermBuckets from the given TransactionId
-	 * @param transactionId The TransactionId for which Terms will be deleted
-	 * @param termBuckets The reduction set of Terms for each TermBucket
-	 * @throws IngestServiceException Any error
-	 */
-	public boolean reduce(TransactionId<?> transactionId, List<TermBucket> termBuckets) throws IngestServiceException;
-		
+
+  /**
+   * Indexes the given TermBucket to a TransactionId, and returns a
+   * IngestReceipt
+   * 
+   * @param termBuckets
+   *          The List of TermBucket to be ingested
+   * @return IngestReceipt Receipt of ingest
+   * @throws IngestServiceException
+   *           Any error
+   */
+  public IngestReceipt ingest(List<TermBucket> termBuckets)
+      throws IngestServiceException;
+
+  /**
+   * TermBucket updates to given TransactionId. A new TransactionId can be
+   * returned in IngestReceipt if so desired and it will automatically get
+   * remapped by CatalogService. Existing metadata for given TransactionId
+   * should not be deleted, just the terms in the given term buckets should be
+   * modified. For a complete re-ingest, one should instead delete() then
+   * ingest().
+   * 
+   * @param transactionId
+   * @param termBuckets
+   * @throws IngestServiceException
+   */
+  public IngestReceipt update(TransactionId<?> transactionId,
+      List<TermBucket> termBuckets) throws IngestServiceException;
+
+  /**
+   * Deletes all TermBuckets attached to given TransactionId -- there should be
+   * no trace of given transaction after this method is called.
+   * 
+   * @param transactionId
+   *          The ID for given transaction which should be erased
+   * @throws IngestServiceException
+   *           Any error
+   */
+  public boolean delete(TransactionId<?> transactionId)
+      throws IngestServiceException;
+
+  /**
+   * Deletes only the Terms in the given TermBuckets from the given
+   * TransactionId
+   * 
+   * @param transactionId
+   *          The TransactionId for which Terms will be deleted
+   * @param termBuckets
+   *          The reduction set of Terms for each TermBucket
+   * @throws IngestServiceException
+   *           Any error
+   */
+  public boolean reduce(TransactionId<?> transactionId,
+      List<TermBucket> termBuckets) throws IngestServiceException;
+
 }

Modified: incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/WorkflowManagerDataSourceIndex.java
URL: http://svn.apache.org/viewvc/incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/WorkflowManagerDataSourceIndex.java?rev=963480&r1=963479&r2=963480&view=diff
==============================================================================
--- incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/WorkflowManagerDataSourceIndex.java (original)
+++ incubator/oodt/trunk/catalog/src/main/java/org/apache/oodt/cas/catalog/index/WorkflowManagerDataSourceIndex.java Mon Jul 12 20:39:12 2010
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -32,14 +32,10 @@ import java.util.Properties;
 import java.util.Vector;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-
-//SQL imports
 import javax.sql.DataSource;
 
-//EDA imports
-import jpl.eda.util.DateConvert;
-
 //OODT imports
+import jpl.eda.util.DateConvert;
 import org.apache.oodt.cas.catalog.exception.CatalogIndexException;
 import org.apache.oodt.cas.catalog.exception.QueryServiceException;
 import org.apache.oodt.cas.catalog.page.IndexPager;
@@ -59,180 +55,216 @@ import org.apache.oodt.cas.catalog.term.
 import org.apache.oodt.cas.commons.database.DatabaseConnectionBuilder;
 
 /**
- * @author bfoster
- * @version $Revision$
- *
- * <p>
- * A queriable index for querying for original cas-workflow instance metadata (not for cas-workflow2)
- * <p>
+ * 
+ * A queriable index for querying for original cas-workflow instance metadata
+ * (not for cas-workflow2)
+ * 
  */
 public class WorkflowManagerDataSourceIndex implements Index, QueryService {
 
-	private static final Logger LOG = Logger.getLogger(WorkflowManagerDataSourceIndex.class.getName());
-	
-	protected DataSource dataSource;
-	
-	public WorkflowManagerDataSourceIndex(String user, String pass, String driver, String jdbcUrl) throws InstantiationException {
-		this.dataSource = DatabaseConnectionBuilder.buildDataSource(user, pass, driver, jdbcUrl);
-	}
-	
-	public List<TransactionId<?>> getPage(IndexPager indexPage)
-			throws CatalogIndexException {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	public Properties getProperties() throws CatalogIndexException {
-		return new Properties();
-	}
-
-	public String getProperty(String key) throws CatalogIndexException {
-		return null;
-	}
-
-	public TransactionIdFactory getTransactionIdFactory() throws CatalogIndexException {
-		return new LongTransactionIdFactory();
-	}
-
-	public boolean hasTransactionId(TransactionId<?> transactionId)
-			throws CatalogIndexException {
-		Connection conn = null;
-		Statement stmt = null;
-		ResultSet rs = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			rs = stmt.executeQuery("SELECT DISTINCT workflow_instance_id FROM workflow_instance_metadata WHERE workflow_instance_id = '" + transactionId + "'");	
-			return rs.next();
-		}catch (Exception e) {
-			throw new CatalogIndexException("Failed to check for workflow id '" + transactionId + "' : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-			try {
-				rs.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	public List<TermBucket> getBuckets(TransactionId<?> transactionId)
-			throws QueryServiceException {
-		Connection conn = null;
-		Statement stmt = null;
-		ResultSet rs = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			rs = stmt.executeQuery("SELECT * FROM workflow_instance_metadata WHERE workflow_instance_id = '" + transactionId + "'");	
-			
-			TermBucket tb = new TermBucket("Workflows");
-			while (rs.next()) {
-                String key = rs.getString("workflow_met_key");
-                String value = URLDecoder.decode(rs.getString("workflow_met_val"), "UTF-8");
-                tb.addTerm(new Term(key, Collections.singletonList(value)));
-            }
-			return Collections.singletonList(tb);
-		}catch (Exception e) {
-			throw new QueryServiceException("Failed to get Workflow Instance Metadata for workflow id '" + transactionId + "' : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-			try {
-				rs.close();
-			}catch(Exception e) {}
-		}
-	}
-
-	public Map<TransactionId<?>, List<TermBucket>> getBuckets(
-			List<TransactionId<?>> transactionIds) throws QueryServiceException {
-		Map<TransactionId<?>, List<TermBucket>> returnMap = new HashMap<TransactionId<?>, List<TermBucket>>();
-		for (TransactionId<?> transactionId : transactionIds) 
-			returnMap.put(transactionId, this.getBuckets(transactionId));
-		return returnMap;
-	}
-
-	public List<IngestReceipt> query(QueryExpression queryExpression)
-			throws QueryServiceException {
-		Connection conn = null;
-		Statement stmt = null;
-		ResultSet rs = null;
-		try {
-			conn = this.dataSource.getConnection();
-			stmt = conn.createStatement();
-			String sqlQuery = "SELECT workflow_instance_id,start_date_time FROM workflow_instances WHERE workflow_instance_id IN (" + this.getSqlQuery(queryExpression) + ")";
-	        LOG.log(Level.INFO, "Performing Query: " + sqlQuery);
-			rs = stmt.executeQuery(sqlQuery);
-			
-			List<IngestReceipt> receipts = new Vector<IngestReceipt>();
-			while (rs.next()) 
-                receipts.add(new IngestReceipt(new LongTransactionIdFactory().createTransactionId(rs.getString("workflow_instance_id")), DateConvert.isoParse(rs.getString("start_date_time"))));
-			return receipts;
-		}catch (Exception e) {
-			throw new QueryServiceException("Failed to query Workflow Instances Database : " + e.getMessage(), e);
-		}finally {
-			try {
-				conn.close();
-			}catch(Exception e) {}
-			try {
-				stmt.close();
-			}catch(Exception e) {}
-			try {
-				rs.close();
-			}catch(Exception e) {}
-		}
-	}
-	
-    private String getSqlQuery(QueryExpression queryExpression) throws QueryServiceException, UnsupportedEncodingException {
-        String sqlQuery = null;
-        if (queryExpression instanceof QueryLogicalGroup) {
-        	QueryLogicalGroup qlg = (QueryLogicalGroup) queryExpression;
-            sqlQuery = "(" + this.getSqlQuery(qlg.getExpressions().get(0));
-            String op = qlg.getOperator() == QueryLogicalGroup.Operator.AND ? "INTERSECT" : "UNION";
-            for (int i = 1; i < qlg.getExpressions().size(); i++) 
-                sqlQuery += ") " + op + " (" + this.getSqlQuery(qlg.getExpressions().get(i));
-            sqlQuery += ")";
-        }else if (queryExpression instanceof ComparisonQueryExpression){
-        	ComparisonQueryExpression cqe = (ComparisonQueryExpression) queryExpression;
-        	String operator = null;
-            if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.EQUAL_TO)) {
-            	operator = "=";
-            } else if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.GREATER_THAN)) {
-            	operator = ">";
-            } else if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.GREATER_THAN_EQUAL_TO)) {
-            	operator = ">=";
-            } else if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.LESS_THAN)) {
-            	operator = "<";
-            } else if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.LESS_THAN_EQUAL_TO)) {
-            	operator = "<=";
-            } else {
-                throw new QueryServiceException("Invalid ComparisonQueryExpression Operator '" + cqe.getOperator() + "'");
-            }
-            
-            sqlQuery = "SELECT DISTINCT workflow_instance_id FROM workflow_instance_metadata WHERE workflow_met_key = '" + cqe.getTerm().getName() + "' AND (";
-        	for (int i = 0; i < cqe.getTerm().getValues().size(); i++) {
-        		String value = cqe.getTerm().getValues().get(i);
-                sqlQuery += "workflow_met_val " + operator + " '" + URLEncoder.encode(value, "UTF-8") + "'";
-	            if ((i + 1) < cqe.getTerm().getValues().size())
-	            	sqlQuery += "OR";
-        	}
-        	sqlQuery += ")";
-        }else if (queryExpression instanceof NotQueryExpression) {
-        	NotQueryExpression nqe = (NotQueryExpression) queryExpression;
-            sqlQuery = "SELECT DISTINCT workflow_instance_id FROM workflow_instance_metadata WHERE NOT (" + this.getSqlQuery(nqe.getQueryExpression()) + ")";
-        }else if (queryExpression instanceof StdQueryExpression) {
-            sqlQuery = "SELECT DISTINCT workflow_instance_id FROM workflow_instance_metadata";
-        }else {
-            throw new QueryServiceException("Invalid QueryExpression '" + queryExpression.getClass().getCanonicalName() + "'");
-        }
-        return sqlQuery;
+  private static final Logger LOG = Logger
+      .getLogger(WorkflowManagerDataSourceIndex.class.getName());
+
+  protected DataSource dataSource;
+
+  public WorkflowManagerDataSourceIndex(String user, String pass,
+      String driver, String jdbcUrl) throws InstantiationException {
+    this.dataSource = DatabaseConnectionBuilder.buildDataSource(user, pass,
+        driver, jdbcUrl);
+  }
+
+  public List<TransactionId<?>> getPage(IndexPager indexPage)
+      throws CatalogIndexException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public Properties getProperties() throws CatalogIndexException {
+    return new Properties();
+  }
+
+  public String getProperty(String key) throws CatalogIndexException {
+    return null;
+  }
+
+  public TransactionIdFactory getTransactionIdFactory()
+      throws CatalogIndexException {
+    return new LongTransactionIdFactory();
+  }
+
+  public boolean hasTransactionId(TransactionId<?> transactionId)
+      throws CatalogIndexException {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      rs = stmt
+          .executeQuery("SELECT DISTINCT workflow_instance_id FROM workflow_instance_metadata WHERE workflow_instance_id = '"
+              + transactionId + "'");
+      return rs.next();
+    } catch (Exception e) {
+      throw new CatalogIndexException("Failed to check for workflow id '"
+          + transactionId + "' : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+      try {
+        rs.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  public List<TermBucket> getBuckets(TransactionId<?> transactionId)
+      throws QueryServiceException {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      rs = stmt
+          .executeQuery("SELECT * FROM workflow_instance_metadata WHERE workflow_instance_id = '"
+              + transactionId + "'");
+
+      TermBucket tb = new TermBucket("Workflows");
+      while (rs.next()) {
+        String key = rs.getString("workflow_met_key");
+        String value = URLDecoder.decode(rs.getString("workflow_met_val"),
+            "UTF-8");
+        tb.addTerm(new Term(key, Collections.singletonList(value)));
+      }
+      return Collections.singletonList(tb);
+    } catch (Exception e) {
+      throw new QueryServiceException(
+          "Failed to get Workflow Instance Metadata for workflow id '"
+              + transactionId + "' : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+      try {
+        rs.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  public Map<TransactionId<?>, List<TermBucket>> getBuckets(
+      List<TransactionId<?>> transactionIds) throws QueryServiceException {
+    Map<TransactionId<?>, List<TermBucket>> returnMap = new HashMap<TransactionId<?>, List<TermBucket>>();
+    for (TransactionId<?> transactionId : transactionIds)
+      returnMap.put(transactionId, this.getBuckets(transactionId));
+    return returnMap;
+  }
+
+  public List<IngestReceipt> query(QueryExpression queryExpression)
+      throws QueryServiceException {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      conn = this.dataSource.getConnection();
+      stmt = conn.createStatement();
+      String sqlQuery = "SELECT workflow_instance_id,start_date_time FROM workflow_instances WHERE workflow_instance_id IN ("
+          + this.getSqlQuery(queryExpression) + ")";
+      LOG.log(Level.INFO, "Performing Query: " + sqlQuery);
+      rs = stmt.executeQuery(sqlQuery);
+
+      List<IngestReceipt> receipts = new Vector<IngestReceipt>();
+      while (rs.next())
+        receipts.add(new IngestReceipt(new LongTransactionIdFactory()
+            .createTransactionId(rs.getString("workflow_instance_id")),
+            DateConvert.isoParse(rs.getString("start_date_time"))));
+      return receipts;
+    } catch (Exception e) {
+      throw new QueryServiceException(
+          "Failed to query Workflow Instances Database : " + e.getMessage(), e);
+    } finally {
+      try {
+        conn.close();
+      } catch (Exception e) {
+      }
+      try {
+        stmt.close();
+      } catch (Exception e) {
+      }
+      try {
+        rs.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  private String getSqlQuery(QueryExpression queryExpression)
+      throws QueryServiceException, UnsupportedEncodingException {
+    String sqlQuery = null;
+    if (queryExpression instanceof QueryLogicalGroup) {
+      QueryLogicalGroup qlg = (QueryLogicalGroup) queryExpression;
+      sqlQuery = "(" + this.getSqlQuery(qlg.getExpressions().get(0));
+      String op = qlg.getOperator() == QueryLogicalGroup.Operator.AND ? "INTERSECT"
+          : "UNION";
+      for (int i = 1; i < qlg.getExpressions().size(); i++)
+        sqlQuery += ") " + op + " ("
+            + this.getSqlQuery(qlg.getExpressions().get(i));
+      sqlQuery += ")";
+    } else if (queryExpression instanceof ComparisonQueryExpression) {
+      ComparisonQueryExpression cqe = (ComparisonQueryExpression) queryExpression;
+      String operator = null;
+      if (cqe.getOperator().equals(ComparisonQueryExpression.Operator.EQUAL_TO)) {
+        operator = "=";
+      } else if (cqe.getOperator().equals(
+          ComparisonQueryExpression.Operator.GREATER_THAN)) {
+        operator = ">";
+      } else if (cqe.getOperator().equals(
+          ComparisonQueryExpression.Operator.GREATER_THAN_EQUAL_TO)) {
+        operator = ">=";
+      } else if (cqe.getOperator().equals(
+          ComparisonQueryExpression.Operator.LESS_THAN)) {
+        operator = "<";
+      } else if (cqe.getOperator().equals(
+          ComparisonQueryExpression.Operator.LESS_THAN_EQUAL_TO)) {
+        operator = "<=";
+      } else {
+        throw new QueryServiceException(
+            "Invalid ComparisonQueryExpression Operator '" + cqe.getOperator()
+                + "'");
+      }
+
+      sqlQuery = "SELECT DISTINCT workflow_instance_id FROM workflow_instance_metadata WHERE workflow_met_key = '"
+          + cqe.getTerm().getName() + "' AND (";
+      for (int i = 0; i < cqe.getTerm().getValues().size(); i++) {
+        String value = cqe.getTerm().getValues().get(i);
+        sqlQuery += "workflow_met_val " + operator + " '"
+            + URLEncoder.encode(value, "UTF-8") + "'";
+        if ((i + 1) < cqe.getTerm().getValues().size())
+          sqlQuery += "OR";
+      }
+      sqlQuery += ")";
+    } else if (queryExpression instanceof NotQueryExpression) {
+      NotQueryExpression nqe = (NotQueryExpression) queryExpression;
+      sqlQuery = "SELECT DISTINCT workflow_instance_id FROM workflow_instance_metadata WHERE NOT ("
+          + this.getSqlQuery(nqe.getQueryExpression()) + ")";
+    } else if (queryExpression instanceof StdQueryExpression) {
+      sqlQuery = "SELECT DISTINCT workflow_instance_id FROM workflow_instance_metadata";
+    } else {
+      throw new QueryServiceException("Invalid QueryExpression '"
+          + queryExpression.getClass().getCanonicalName() + "'");
     }
+    return sqlQuery;
+  }
 
 }



Mime
View raw message