Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9D4007BD7 for ; Tue, 27 Dec 2011 18:20:33 +0000 (UTC) Received: (qmail 30795 invoked by uid 500); 27 Dec 2011 18:20:33 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 30763 invoked by uid 500); 27 Dec 2011 18:20:33 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 30756 invoked by uid 99); 27 Dec 2011 18:20:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Dec 2011 18:20:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Dec 2011 18:20:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7618F2388B9B; Tue, 27 Dec 2011 18:19:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1224966 [8/10] - in /incubator/accumulo/branches/1.4: ./ contrib/accumulo_sample/ contrib/accumulo_sample/ingest/src/main/java/aggregator/ contrib/accumulo_sample/ingest/src/main/java/ingest/ contrib/accumulo_sample/ingest/src/test/java/ag... Date: Tue, 27 Dec 2011 18:19:44 -0000 To: accumulo-commits@incubator.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111227181946.7618F2388B9B@eris.apache.org> Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java?rev=1224966&r1=1224965&r2=1224966&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java (original) +++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java Tue Dec 27 18:19:43 2011 @@ -1,22 +1,21 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package logic; - import ingest.WikipediaMapper; import iterator.BooleanLogicIterator; import iterator.EvaluatingIterator; @@ -39,6 +38,16 @@ import java.util.TreeSet; import normalizer.LcNoDiacriticsNormalizer; import normalizer.Normalizer; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.security.Authorizations; import org.apache.commons.jexl2.parser.ParserTreeConstants; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.StopWatch; @@ -47,25 +56,14 @@ import org.apache.log4j.Logger; import parser.EventFields; import parser.EventFields.FieldValue; -import parser.JexlOperatorConstants; import parser.FieldIndexQueryReWriter; +import parser.JexlOperatorConstants; import parser.QueryParser; import parser.QueryParser.QueryTerm; import parser.RangeCalculator; import sample.Document; import sample.Field; import sample.Results; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.RegExIterator; -import org.apache.accumulo.core.iterators.filter.RegExFilter; -import org.apache.accumulo.core.security.Authorizations; import com.esotericsoftware.kryo.Kryo; import com.google.common.collect.HashMultimap; @@ -77,830 +75,829 @@ import com.google.common.collect.Multima * Query implementation that works with the JEXL grammar. This * uses the metadata, global index, and partitioned table to return * results based on the query. Example queries: - * + * * Single Term Query * 'foo' - looks in global index for foo, and if any entries are found, then the query * is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed * down the optimized query path which uses the intersecting iterators on the partitioned * table. - * + * * Boolean expression * field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number), * the query is parsed and the set of eventFields in the query that are indexed is determined by * querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the * eventFields that are indexed, the query may be sent down the optimized path or the full scan path. - * + * * We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators: - * + * * ==, !=, >, ≥, <, ≤, =~, and !~ - * + * * Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction * with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An * example using this function is : "f:between(LATITUDE,60.0, 70.0)" - * + * *

Constraints on Query Structure

* Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are * rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. If * an error occurs in the evaluation we are skipping the event. - * + * *

Notes on Optimization

* Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table: - * + * * 1. An 'or' conjunction exists in the query but not all of the terms are indexed. * 2. No indexed terms exist in the query * 3. An unsupported operator exists in the query - * + * * - * + * */ public abstract class AbstractQueryLogic { - - protected static Logger log = Logger.getLogger(AbstractQueryLogic.class); + + protected static Logger log = Logger.getLogger(AbstractQueryLogic.class); + + /** + * Set of datatypes to limit the query to. + */ + public static final String DATATYPE_FILTER_SET = "datatype.filter.set"; + + private static class DoNotPerformOptimizedQueryException extends Exception { + private static final long serialVersionUID = 1L; + } + + /** + * Object that is used to hold ranges found in the index. Subclasses may compute the final range set in various ways. + */ + public static abstract class IndexRanges { - /** - * Set of datatypes to limit the query to. - */ - public static final String DATATYPE_FILTER_SET = "datatype.filter.set"; - - private static class DoNotPerformOptimizedQueryException extends Exception { - private static final long serialVersionUID = 1L; - } - - /** - * Object that is used to hold ranges found in the index. Subclasses may compute - * the final range set in various ways. - */ - public static abstract class IndexRanges { - - private Map indexValuesToOriginalValues = null; - private Multimap fieldNamesAndValues = HashMultimap.create(); - private Map termCardinality = new HashMap(); - protected Map> ranges = new HashMap>(); - - - public Multimap getFieldNamesAndValues() { - return fieldNamesAndValues; - } - - public void setFieldNamesAndValues(Multimap fieldNamesAndValues) { - this.fieldNamesAndValues = fieldNamesAndValues; - } - - public final Map getTermCardinality() { - return termCardinality; - } - - public Map getIndexValuesToOriginalValues() { - return indexValuesToOriginalValues; - } - - public void setIndexValuesToOriginalValues( - Map indexValuesToOriginalValues) { - this.indexValuesToOriginalValues = indexValuesToOriginalValues; - } - - public abstract void add(String term, Range r); - - public abstract Set getRanges(); - } - - /** - * Object that computes the ranges by unioning all of the ranges for all - * of the terms together. In the case where ranges overlap, the largest range - * is used. - */ - public static class UnionIndexRanges extends IndexRanges { - - public static String DEFAULT_KEY = "default"; - - public UnionIndexRanges() { - this.ranges.put(DEFAULT_KEY, new TreeSet()); - } - - public Set getRanges() { - //So the set of ranges is ordered. It *should* be the case that - //ranges with partition ids will sort before ranges that point to - //a specific event. Populate a new set of ranges but don't add a - //range for an event where that range is contained in a range already - //added. - Set shardsAdded = new HashSet(); - Set returnSet = new HashSet(); - for (Range r : ranges.get(DEFAULT_KEY)) { - if (!shardsAdded.contains(r.getStartKey().getRow())) { - //Only add ranges with a start key for the entire partition. - if (r.getStartKey().getColumnFamily() == null) { - shardsAdded.add(r.getStartKey().getRow()); - } - returnSet.add(r); - } else { - //if (log.isTraceEnabled()) - log.info("Skipping event specific range: " + r.toString() + " because range has already been added: " + shardsAdded.contains(r.getStartKey().getRow())); - } - } - return returnSet; - } - - public void add(String term, Range r) { - ranges.get(DEFAULT_KEY).add(r); - } + private Map indexValuesToOriginalValues = null; + private Multimap fieldNamesAndValues = HashMultimap.create(); + private Map termCardinality = new HashMap(); + protected Map> ranges = new HashMap>(); + + public Multimap getFieldNamesAndValues() { + return fieldNamesAndValues; } - - private String metadataTableName; - private String indexTableName; - private String reverseIndexTableName; - private String tableName; - private int queryThreads = 8; - private String readAheadQueueSize; - private String readAheadTimeOut; - private boolean useReadAheadIterator; - private Kryo kryo = new Kryo(); - private EventFields eventFields = new EventFields(); - private List unevaluatedFields = null; - private int numPartitions = 0; - private Map, Normalizer> normalizerCacheMap = new HashMap, Normalizer>(); - private static final String NULL_BYTE = "\u0000"; - - public AbstractQueryLogic() { - super(); - EventFields.initializeKryo(kryo); + + public void setFieldNamesAndValues(Multimap fieldNamesAndValues) { + this.fieldNamesAndValues = fieldNamesAndValues; } - - /** - * Queries metadata table to determine which terms are indexed. - * @param c - * @param auths - * @param queryLiterals - * @param begin - * @param end - * @param datatypes - optional list of types - * @return map of indexed field names to types to normalizers used in this date range - * @throws TableNotFoundException - * @throws IllegalAccessException - * @throws InstantiationException - */ - protected Map>> findIndexedTerms(Connector c, Authorizations auths, Set queryLiterals, Set datatypes) throws TableNotFoundException, InstantiationException, IllegalAccessException { - - Map>> results = new HashMap>>(); - - for (String literal : queryLiterals) { - if(log.isDebugEnabled()) - log.debug("Querying "+this.getMetadataTableName()+" table for " + literal); - Range range = new Range(literal.toUpperCase()); - Scanner scanner = c.createScanner(this.getMetadataTableName(), auths); - scanner.setRange(range); - scanner.fetchColumnFamily(new Text(WikipediaMapper.METADATA_INDEX_COLUMN_FAMILY)); - for (Entry entry : scanner) { - if (!results.containsKey(literal)) { - Multimap> m = HashMultimap.create(); - results.put(literal, m); - } - //Get the column qualifier from the key. It contains the datatype and normalizer class - String colq = entry.getKey().getColumnQualifier().toString(); - if (null != colq && colq.contains("\0")) { - int idx = colq.indexOf("\0"); - if (idx != -1) { - String type = colq.substring(0, idx); - //If types are specified and this type is not in the list then skip it. - if (null != datatypes && !datatypes.contains(type)) - continue; - try { - @SuppressWarnings("unchecked") - Class clazz = (Class) Class.forName(colq.substring(idx+1)); - if (!normalizerCacheMap.containsKey(clazz)) - normalizerCacheMap.put(clazz, clazz.newInstance()); - results.get(literal).put(type, clazz); - } catch (ClassNotFoundException e) { - log.error("Unable to find normalizer on class path: " + colq.substring(idx+1), e); - results.get(literal).put(type, LcNoDiacriticsNormalizer.class); - } - } else { - log.warn("EventMetadata entry did not contain NULL byte: " + entry.getKey().toString()); - } - } else { - log.warn("ColumnQualifier null in EventMetadata for key: " + entry.getKey().toString()); - } - } - } - if (log.isDebugEnabled()) - log.debug("METADATA RESULTS: " + results.toString()); - return results; + + public final Map getTermCardinality() { + return termCardinality; } - - /** - * Performs a lookup in the global index for a single non-fielded term. - * - * @param c - * @param auths - * @param value - * @param begin - * @param end - * @param datatypes - optional list of types - * @return ranges that fit into the date range. - */ - protected abstract IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set datatypes) throws TableNotFoundException; - - - /** - * Performs a lookup in the global index / reverse index and returns a RangeCalculator - * - * @param c Accumulo connection - * @param auths authset for queries - * @param indexedTerms multimap of indexed field name and Normalizers used - * @param terms multimap of field name and QueryTerm object - * @param begin query begin date - * @param end query end date - * @param dateFormatter - * @param indexTableName - * @param reverseIndexTableName - * @param queryString original query string - * @param queryThreads - * @param datatypes - optional list of types - * @return range calculator - * @throws TableNotFoundException - */ - protected abstract RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap indexedTerms, Multimap terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set datatypes) throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException; - protected abstract Collection getFullScanRange(Date begin, Date end, Multimap terms); - - public String getMetadataTableName() { - return metadataTableName; + public Map getIndexValuesToOriginalValues() { + return indexValuesToOriginalValues; } - - public String getIndexTableName() { - return indexTableName; + + public void setIndexValuesToOriginalValues(Map indexValuesToOriginalValues) { + this.indexValuesToOriginalValues = indexValuesToOriginalValues; } - - public String getTableName() { - return tableName; + + public abstract void add(String term, Range r); + + public abstract Set getRanges(); + } + + /** + * Object that computes the ranges by unioning all of the ranges for all of the terms together. In the case where ranges overlap, the largest range is used. + */ + public static class UnionIndexRanges extends IndexRanges { + + public static String DEFAULT_KEY = "default"; + + public UnionIndexRanges() { + this.ranges.put(DEFAULT_KEY, new TreeSet()); } - - public void setMetadataTableName(String metadataTableName) { - this.metadataTableName = metadataTableName; + + public Set getRanges() { + // So the set of ranges is ordered. It *should* be the case that + // ranges with partition ids will sort before ranges that point to + // a specific event. Populate a new set of ranges but don't add a + // range for an event where that range is contained in a range already + // added. + Set shardsAdded = new HashSet(); + Set returnSet = new HashSet(); + for (Range r : ranges.get(DEFAULT_KEY)) { + if (!shardsAdded.contains(r.getStartKey().getRow())) { + // Only add ranges with a start key for the entire partition. + if (r.getStartKey().getColumnFamily() == null) { + shardsAdded.add(r.getStartKey().getRow()); + } + returnSet.add(r); + } else { + // if (log.isTraceEnabled()) + log.info("Skipping event specific range: " + r.toString() + " because range has already been added: " + + shardsAdded.contains(r.getStartKey().getRow())); + } + } + return returnSet; } - - public void setIndexTableName(String indexTableName) { - this.indexTableName = indexTableName; + + public void add(String term, Range r) { + ranges.get(DEFAULT_KEY).add(r); } - - public void setTableName(String tableName) { - this.tableName = tableName; + } + + private String metadataTableName; + private String indexTableName; + private String reverseIndexTableName; + private String tableName; + private int queryThreads = 8; + private String readAheadQueueSize; + private String readAheadTimeOut; + private boolean useReadAheadIterator; + private Kryo kryo = new Kryo(); + private EventFields eventFields = new EventFields(); + private List unevaluatedFields = null; + private int numPartitions = 0; + private Map,Normalizer> normalizerCacheMap = new HashMap,Normalizer>(); + private static final String NULL_BYTE = "\u0000"; + + public AbstractQueryLogic() { + super(); + EventFields.initializeKryo(kryo); + } + + /** + * Queries metadata table to determine which terms are indexed. + * + * @param c + * @param auths + * @param queryLiterals + * @param begin + * @param end + * @param datatypes + * - optional list of types + * @return map of indexed field names to types to normalizers used in this date range + * @throws TableNotFoundException + * @throws IllegalAccessException + * @throws InstantiationException + */ + protected Map>> findIndexedTerms(Connector c, Authorizations auths, Set queryLiterals, + Set datatypes) throws TableNotFoundException, InstantiationException, IllegalAccessException { + + Map>> results = new HashMap>>(); + + for (String literal : queryLiterals) { + if (log.isDebugEnabled()) + log.debug("Querying " + this.getMetadataTableName() + " table for " + literal); + Range range = new Range(literal.toUpperCase()); + Scanner scanner = c.createScanner(this.getMetadataTableName(), auths); + scanner.setRange(range); + scanner.fetchColumnFamily(new Text(WikipediaMapper.METADATA_INDEX_COLUMN_FAMILY)); + for (Entry entry : scanner) { + if (!results.containsKey(literal)) { + Multimap> m = HashMultimap.create(); + results.put(literal, m); + } + // Get the column qualifier from the key. It contains the datatype and normalizer class + String colq = entry.getKey().getColumnQualifier().toString(); + if (null != colq && colq.contains("\0")) { + int idx = colq.indexOf("\0"); + if (idx != -1) { + String type = colq.substring(0, idx); + // If types are specified and this type is not in the list then skip it. + if (null != datatypes && !datatypes.contains(type)) + continue; + try { + @SuppressWarnings("unchecked") + Class clazz = (Class) Class.forName(colq.substring(idx + 1)); + if (!normalizerCacheMap.containsKey(clazz)) + normalizerCacheMap.put(clazz, clazz.newInstance()); + results.get(literal).put(type, clazz); + } catch (ClassNotFoundException e) { + log.error("Unable to find normalizer on class path: " + colq.substring(idx + 1), e); + results.get(literal).put(type, LcNoDiacriticsNormalizer.class); + } + } else { + log.warn("EventMetadata entry did not contain NULL byte: " + entry.getKey().toString()); + } + } else { + log.warn("ColumnQualifier null in EventMetadata for key: " + entry.getKey().toString()); + } + } + } + if (log.isDebugEnabled()) + log.debug("METADATA RESULTS: " + results.toString()); + return results; + } + + /** + * Performs a lookup in the global index for a single non-fielded term. + * + * @param c + * @param auths + * @param value + * @param begin + * @param end + * @param datatypes + * - optional list of types + * @return ranges that fit into the date range. + */ + protected abstract IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set datatypes) throws TableNotFoundException; + + /** + * Performs a lookup in the global index / reverse index and returns a RangeCalculator + * + * @param c + * Accumulo connection + * @param auths + * authset for queries + * @param indexedTerms + * multimap of indexed field name and Normalizers used + * @param terms + * multimap of field name and QueryTerm object + * @param begin + * query begin date + * @param end + * query end date + * @param dateFormatter + * @param indexTableName + * @param reverseIndexTableName + * @param queryString + * original query string + * @param queryThreads + * @param datatypes + * - optional list of types + * @return range calculator + * @throws TableNotFoundException + */ + protected abstract RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap indexedTerms, + Multimap terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set datatypes) + throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException; + + protected abstract Collection getFullScanRange(Date begin, Date end, Multimap terms); + + public String getMetadataTableName() { + return metadataTableName; + } + + public String getIndexTableName() { + return indexTableName; + } + + public String getTableName() { + return tableName; + } + + public void setMetadataTableName(String metadataTableName) { + this.metadataTableName = metadataTableName; + } + + public void setIndexTableName(String indexTableName) { + this.indexTableName = indexTableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public int getQueryThreads() { + return queryThreads; + } + + public void setQueryThreads(int queryThreads) { + this.queryThreads = queryThreads; + } + + public String getReadAheadQueueSize() { + return readAheadQueueSize; + } + + public String getReadAheadTimeOut() { + return readAheadTimeOut; + } + + public boolean isUseReadAheadIterator() { + return useReadAheadIterator; + } + + public void setReadAheadQueueSize(String readAheadQueueSize) { + this.readAheadQueueSize = readAheadQueueSize; + } + + public void setReadAheadTimeOut(String readAheadTimeOut) { + this.readAheadTimeOut = readAheadTimeOut; + } + + public void setUseReadAheadIterator(boolean useReadAheadIterator) { + this.useReadAheadIterator = useReadAheadIterator; + } + + public String getReverseIndexTableName() { + return reverseIndexTableName; + } + + public void setReverseIndexTableName(String reverseIndexTableName) { + this.reverseIndexTableName = reverseIndexTableName; + } + + public List getUnevaluatedFields() { + return unevaluatedFields; + } + + public void setUnevaluatedFields(List unevaluatedFields) { + this.unevaluatedFields = unevaluatedFields; + } + + public void setUnevaluatedFields(String unevaluatedFieldList) { + this.unevaluatedFields = new ArrayList(); + for (String field : unevaluatedFieldList.split(",")) + this.unevaluatedFields.add(field); + } + + public int getNumPartitions() { + return numPartitions; + } + + public void setNumPartitions(int numPartitions) { + this.numPartitions = numPartitions; + } + + public Document createDocument(Key key, Value value) { + eventFields.clear(); + ByteBuffer buf = ByteBuffer.wrap(value.get()); + eventFields.readObjectData(kryo, buf); + + Document doc = new Document(); + // Set the id to the document id which is located in the colf + String row = key.getRow().toString(); + String colf = key.getColumnFamily().toString(); + int idx = colf.indexOf(NULL_BYTE); + String type = colf.substring(0, idx); + String id = colf.substring(idx + 1); + doc.setId(id); + for (Entry> entry : eventFields.asMap().entrySet()) { + for (FieldValue fv : entry.getValue()) { + Field val = new Field(); + val.setFieldName(entry.getKey()); + val.setFieldValue(new String(fv.getValue(), Charset.forName("UTF-8"))); + doc.getFields().add(val); + } } - - public int getQueryThreads() { - return queryThreads; + + // Add the pointer for the content. + Field docPointer = new Field(); + docPointer.setFieldName("DOCUMENT"); + docPointer.setFieldValue("DOCUMENT:" + row + "/" + type + "/" + id); + doc.getFields().add(docPointer); + + return doc; + } + + public String getResultsKey(Entry key) { + // Use the colf from the table, it contains the uuid and datatype + return key.getKey().getColumnFamily().toString(); + } + + public Results runQuery(Connector connector, List authorizations, String query, Date beginDate, Date endDate, Set types) { + + if (StringUtils.isEmpty(query)) { + throw new IllegalArgumentException("NULL QueryNode reference passed to " + this.getClass().getSimpleName()); } - - public void setQueryThreads(int queryThreads) { - this.queryThreads = queryThreads; + + Set ranges = new HashSet(); + Set typeFilter = types; + String array[] = authorizations.toArray(new String[0]); + Authorizations auths = new Authorizations(array); + Results results = new Results(); + + // Get the query string + String queryString = query; + + StopWatch abstractQueryLogic = new StopWatch(); + StopWatch optimizedQuery = new StopWatch(); + StopWatch queryGlobalIndex = new StopWatch(); + StopWatch optimizedEventQuery = new StopWatch(); + StopWatch fullScanQuery = new StopWatch(); + StopWatch processResults = new StopWatch(); + + abstractQueryLogic.start(); + + StopWatch parseQuery = new StopWatch(); + parseQuery.start(); + + QueryParser parser; + try { + if (log.isDebugEnabled()) { + log.debug("ShardQueryLogic calling QueryParser.execute"); + } + parser = new QueryParser(); + parser.execute(queryString); + } catch (org.apache.commons.jexl2.parser.ParseException e1) { + throw new IllegalArgumentException("Error parsing query", e1); + } + int hash = parser.getHashValue(); + parseQuery.stop(); + if (log.isDebugEnabled()) { + log.debug(hash + " Query: " + queryString); } - - public String getReadAheadQueueSize() { - return readAheadQueueSize; + + Set fields = new HashSet(); + for (String f : parser.getQueryIdentifiers()) { + fields.add(f); + } + if (log.isDebugEnabled()) { + log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString()); + } + // Remove any negated fields from the fields list, we don't want to lookup negated fields + // in the index. + fields.removeAll(parser.getNegatedTermsForOptimizer()); + + if (log.isDebugEnabled()) { + log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString()); } - - public String getReadAheadTimeOut() { - return readAheadTimeOut; + // Get the mapping of field name to QueryTerm object from the query. The query term object + // contains the operator, whether its negated or not, and the literal to test against. + Multimap terms = parser.getQueryTerms(); + + // Find out which terms are indexed + // TODO: Should we cache indexed terms or does that not make sense since we are always + // loading data. + StopWatch queryMetadata = new StopWatch(); + queryMetadata.start(); + Map>> metadataResults; + try { + metadataResults = findIndexedTerms(connector, auths, fields, typeFilter); + } catch (Exception e1) { + throw new RuntimeException("Error in metadata lookup", e1); } - - public boolean isUseReadAheadIterator() { - return useReadAheadIterator; + + // Create a map of indexed term to set of normalizers for it + Multimap indexedTerms = HashMultimap.create(); + for (Entry>> entry : metadataResults.entrySet()) { + // Get the normalizer from the normalizer cache + for (Class clazz : entry.getValue().values()) { + indexedTerms.put(entry.getKey(), normalizerCacheMap.get(clazz)); + } + } + queryMetadata.stop(); + if (log.isDebugEnabled()) { + log.debug(hash + " Indexed Terms: " + indexedTerms.toString()); } - - public void setReadAheadQueueSize(String readAheadQueueSize) { - this.readAheadQueueSize = readAheadQueueSize; + + Set orTerms = parser.getOrTermsForOptimizer(); + + // Iterate over the query terms to get the operators specified in the query. + ArrayList unevaluatedExpressions = new ArrayList(); + boolean unsupportedOperatorSpecified = false; + for (Entry entry : terms.entries()) { + if (null == entry.getValue()) { + continue; + } + + if (null != this.unevaluatedFields && this.unevaluatedFields.contains(entry.getKey().trim())) { + unevaluatedExpressions.add(entry.getKey().trim() + " " + entry.getValue().getOperator() + " " + entry.getValue().getValue()); + } + + int operator = JexlOperatorConstants.getJJTNodeType(entry.getValue().getOperator()); + if (!(operator == ParserTreeConstants.JJTEQNODE || operator == ParserTreeConstants.JJTNENODE || operator == ParserTreeConstants.JJTLENODE + || operator == ParserTreeConstants.JJTLTNODE || operator == ParserTreeConstants.JJTGENODE || operator == ParserTreeConstants.JJTGTNODE || operator == ParserTreeConstants.JJTERNODE)) { + unsupportedOperatorSpecified = true; + break; + } + } + if (null != unevaluatedExpressions) + unevaluatedExpressions.trimToSize(); + if (log.isDebugEnabled()) { + log.debug(hash + " unsupportedOperators: " + unsupportedOperatorSpecified + " indexedTerms: " + indexedTerms.toString() + " orTerms: " + + orTerms.toString() + " unevaluatedExpressions: " + unevaluatedExpressions.toString()); } - - public void setReadAheadTimeOut(String readAheadTimeOut) { - this.readAheadTimeOut = readAheadTimeOut; + + // We can use the intersecting iterator over the field index as an optimization under the + // following conditions + // + // 1. No unsupported operators in the query. + // 2. No 'or' operators and at least one term indexed + // or + // 1. No unsupported operators in the query. + // 2. and all terms indexed + // or + // 1. All or'd terms are indexed. NOTE, this will potentially skip some queries and push to a full table scan + // // WE should look into finding a better way to handle whether we do an optimized query or not. + boolean optimizationSucceeded = false; + boolean orsAllIndexed = false; + if (orTerms.isEmpty()) { + orsAllIndexed = false; + } else { + orsAllIndexed = indexedTerms.keySet().containsAll(orTerms); } - - public void setUseReadAheadIterator(boolean useReadAheadIterator) { - this.useReadAheadIterator = useReadAheadIterator; + + if (log.isDebugEnabled()) { + log.debug("All or terms are indexed"); } - - public String getReverseIndexTableName() { - return reverseIndexTableName; - } - - public void setReverseIndexTableName(String reverseIndexTableName) { - this.reverseIndexTableName = reverseIndexTableName; - } - - public List getUnevaluatedFields() { - return unevaluatedFields; - } - - public void setUnevaluatedFields(List unevaluatedFields) { - this.unevaluatedFields = unevaluatedFields; - } - - public void setUnevaluatedFields(String unevaluatedFieldList) { - this.unevaluatedFields = new ArrayList(); - for (String field : unevaluatedFieldList.split(",")) - this.unevaluatedFields.add(field); - } - - public int getNumPartitions() { - return numPartitions; - } - - public void setNumPartitions(int numPartitions) { - this.numPartitions = numPartitions; - } - - public Document createDocument(Key key, Value value) { - eventFields.clear(); - ByteBuffer buf = ByteBuffer.wrap(value.get()); - eventFields.readObjectData(kryo, buf); - - Document doc = new Document(); - //Set the id to the document id which is located in the colf - String row = key.getRow().toString(); - String colf = key.getColumnFamily().toString(); - int idx = colf.indexOf(NULL_BYTE); - String type = colf.substring(0, idx); - String id = colf.substring(idx+1); - doc.setId(id); - for (Entry> entry : eventFields.asMap().entrySet()) { - for (FieldValue fv : entry.getValue()) { - Field val = new Field(); - val.setFieldName(entry.getKey()); - val.setFieldValue(new String(fv.getValue(), Charset.forName("UTF-8"))); - doc.getFields().add(val); + + if (!unsupportedOperatorSpecified + && (((null == orTerms || orTerms.isEmpty()) && indexedTerms.size() > 0) || (fields.size() > 0 && indexedTerms.size() == fields.size()) || orsAllIndexed)) { + optimizedQuery.start(); + // Set up intersecting iterator over field index. + + // Get information from the global index for the indexed terms. The results object will contain the term + // mapped to an object that contains the total count, and partitions where this term is located. + + // TODO: Should we cache indexed term information or does that not make sense since we are always loading data + queryGlobalIndex.start(); + IndexRanges termIndexInfo; + try { + // If fields is null or zero, then it's probably the case that the user entered a value + // to search for with no fields. Check for the value in index. + if (fields.isEmpty()) { + termIndexInfo = this.getTermIndexInformation(connector, auths, queryString, typeFilter); + if (null != termIndexInfo && termIndexInfo.getRanges().isEmpty()) { + // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards + // in unhandled locations. + // Break out of here by throwing a named exception and do full scan + throw new DoNotPerformOptimizedQueryException(); + } + // We need to rewrite the query string here so that it's valid. + if (termIndexInfo instanceof UnionIndexRanges) { + UnionIndexRanges union = (UnionIndexRanges) termIndexInfo; + StringBuilder buf = new StringBuilder(); + String sep = ""; + for (String fieldName : union.getFieldNamesAndValues().keySet()) { + buf.append(sep).append(fieldName).append(" == "); + if (!(queryString.startsWith("'") && queryString.endsWith("'"))) { + buf.append("'").append(queryString).append("'"); + } else { + buf.append(queryString); + } + sep = " or "; } - } - - //Add the pointer for the content. - Field docPointer = new Field(); - docPointer.setFieldName("DOCUMENT"); - docPointer.setFieldValue("DOCUMENT:"+row+"/"+type+"/"+id); - doc.getFields().add(docPointer); - - return doc; - } - - public String getResultsKey(Entry key) { - //Use the colf from the table, it contains the uuid and datatype - return key.getKey().getColumnFamily().toString(); - } - - public Results runQuery(Connector connector, List authorizations, String query, Date beginDate, Date endDate, Set types) { - - if (StringUtils.isEmpty(query)) { - throw new IllegalArgumentException("NULL QueryNode reference passed to " + this.getClass().getSimpleName()); - } - - Set ranges = new HashSet(); - Set typeFilter = types; - Authorizations auths = new Authorizations(StringUtils.join(authorizations, "|")); - Results results = new Results(); - - //Get the query string - String queryString = query; - - - StopWatch abstractQueryLogic = new StopWatch(); - StopWatch optimizedQuery = new StopWatch(); - StopWatch queryGlobalIndex = new StopWatch(); - StopWatch optimizedEventQuery = new StopWatch(); - StopWatch fullScanQuery = new StopWatch(); - StopWatch processResults = new StopWatch(); - - abstractQueryLogic.start(); - - StopWatch parseQuery = new StopWatch(); - parseQuery.start(); - - QueryParser parser; - try { - if(log.isDebugEnabled()){ - log.debug("ShardQueryLogic calling QueryParser.execute"); + if (log.isDebugEnabled()) { + log.debug("Rewrote query for non-fielded single term query: " + queryString + " to " + buf.toString()); } - parser = new QueryParser(); - parser.execute(queryString); - } catch (org.apache.commons.jexl2.parser.ParseException e1) { - throw new IllegalArgumentException("Error parsing query", e1); - } - int hash = parser.getHashValue(); - parseQuery.stop(); - if (log.isDebugEnabled()) { - log.debug(hash + " Query: " + queryString); - } - - Set fields = new HashSet(); - for(String f : parser.getQueryIdentifiers()){ - fields.add(f); - } - if(log.isDebugEnabled()){ - log.debug("getQueryIdentifiers: "+parser.getQueryIdentifiers().toString()); - } - //Remove any negated fields from the fields list, we don't want to lookup negated fields - //in the index. - fields.removeAll(parser.getNegatedTermsForOptimizer()); - - if(log.isDebugEnabled()){ - log.debug("getQueryIdentifiers: "+parser.getQueryIdentifiers().toString()); - } - //Get the mapping of field name to QueryTerm object from the query. The query term object - //contains the operator, whether its negated or not, and the literal to test against. - Multimap terms = parser.getQueryTerms(); - - //Find out which terms are indexed - //TODO: Should we cache indexed terms or does that not make sense since we are always - //loading data. - StopWatch queryMetadata = new StopWatch(); - queryMetadata.start(); - Map>> metadataResults; - try { - metadataResults = findIndexedTerms(connector, auths, fields, typeFilter); - } catch (Exception e1) { - throw new RuntimeException("Error in metadata lookup", e1); - } + queryString = buf.toString(); + } else { + throw new RuntimeException("Unexpected IndexRanges implementation"); + } + } else { + RangeCalculator calc = this.getTermIndexInformation(connector, auths, indexedTerms, terms, this.getIndexTableName(), this.getReverseIndexTableName(), + queryString, this.queryThreads, typeFilter); + if (null == calc.getResult() || calc.getResult().isEmpty()) { + // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards + // in unhandled locations. + // Break out of here by throwing a named exception and do full scan + throw new DoNotPerformOptimizedQueryException(); + } + termIndexInfo = new UnionIndexRanges(); + termIndexInfo.setIndexValuesToOriginalValues(calc.getIndexValues()); + termIndexInfo.setFieldNamesAndValues(calc.getIndexEntries()); + termIndexInfo.getTermCardinality().putAll(calc.getTermCardinalities()); + for (Range r : calc.getResult()) { + // foo is a placeholder and is ignored. + termIndexInfo.add("foo", r); + } + } + } catch (TableNotFoundException e) { + log.error(this.getIndexTableName() + "not found", e); + throw new RuntimeException(this.getIndexTableName() + "not found", e); + } catch (org.apache.commons.jexl2.parser.ParseException e) { + throw new RuntimeException("Error determining ranges for query: " + queryString, e); + } catch (DoNotPerformOptimizedQueryException e) { + log.info("Indexed fields not found in index, performing full scan"); + termIndexInfo = null; + } + queryGlobalIndex.stop(); + + // Determine if we should proceed with optimized query based on results from the global index + boolean proceed = false; + if (null == termIndexInfo || termIndexInfo.getFieldNamesAndValues().values().size() == 0) { + proceed = false; + } else if (null != orTerms && orTerms.size() > 0 && (termIndexInfo.getFieldNamesAndValues().values().size() == indexedTerms.size())) { + proceed = true; + } else if (termIndexInfo.getFieldNamesAndValues().values().size() > 0) { + proceed = true; + } else if (orsAllIndexed) { + proceed = true; + } else { + proceed = false; + } + if (log.isDebugEnabled()) { + log.debug("Proceed with optimized query: " + proceed); + if (null != termIndexInfo) + log.debug("termIndexInfo.getTermsFound().size(): " + termIndexInfo.getFieldNamesAndValues().values().size() + " indexedTerms.size: " + + indexedTerms.size() + " fields.size: " + fields.size()); + } + if (proceed) { - //Create a map of indexed term to set of normalizers for it - Multimap indexedTerms = HashMultimap.create(); - for (Entry>> entry : metadataResults.entrySet()) { - //Get the normalizer from the normalizer cache - for (Class clazz : entry.getValue().values()) { - indexedTerms.put(entry.getKey(), normalizerCacheMap.get(clazz)); - } + if (log.isDebugEnabled()) { + log.debug(hash + " Performing optimized query"); } - queryMetadata.stop(); + // Use the scan ranges from the GlobalIndexRanges object as the ranges for the batch scanner + ranges = termIndexInfo.getRanges(); if (log.isDebugEnabled()) { - log.debug(hash + " Indexed Terms: " + indexedTerms.toString()); + log.info(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString()); } - - Set orTerms = parser.getOrTermsForOptimizer(); - - //Iterate over the query terms to get the operators specified in the query. - ArrayList unevaluatedExpressions = new ArrayList(); - boolean unsupportedOperatorSpecified = false; - for (Entry entry : terms.entries()) { - if (null == entry.getValue()) { - continue; + + // Create BatchScanner, set the ranges, and setup the iterators. + optimizedEventQuery.start(); + BatchScanner bs = null; + try { + bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads); + bs.setRanges(ranges); + IteratorSetting si = new IteratorSetting(21, "eval", OptimizedQueryIterator.class); + + if (log.isDebugEnabled()) { + log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString); + } + // Set the query option + si.addOption(EvaluatingIterator.QUERY_OPTION, queryString); + // Set the Indexed Terms List option. This is the field name and normalized field value pair separated + // by a comma. + StringBuilder buf = new StringBuilder(); + String sep = ""; + for (Entry entry : termIndexInfo.getFieldNamesAndValues().entries()) { + buf.append(sep); + buf.append(entry.getKey()); + buf.append(":"); + buf.append(termIndexInfo.getIndexValuesToOriginalValues().get(entry.getValue())); + buf.append(":"); + buf.append(entry.getValue()); + if (sep.equals("")) { + sep = ";"; + } + } + if (log.isDebugEnabled()) { + log.debug("Setting scan option: " + FieldIndexQueryReWriter.INDEXED_TERMS_LIST + " to " + buf.toString()); + } + FieldIndexQueryReWriter rewriter = new FieldIndexQueryReWriter(); + String q = ""; + try { + q = queryString; + q = rewriter.applyCaseSensitivity(q, true, false);// Set upper/lower case for fieldname/fieldvalue + Map opts = new HashMap(); + opts.put(FieldIndexQueryReWriter.INDEXED_TERMS_LIST, buf.toString()); + q = rewriter.removeNonIndexedTermsAndInvalidRanges(q, opts); + q = rewriter.applyNormalizedTerms(q, opts); + if (log.isDebugEnabled()) { + log.debug("runServerQuery, FieldIndex Query: " + q); } - - if (null != this.unevaluatedFields && this.unevaluatedFields.contains(entry.getKey().trim())) { - unevaluatedExpressions.add(entry.getKey().trim() + " " + entry.getValue().getOperator() + " " + entry.getValue().getValue()); + } catch (org.apache.commons.jexl2.parser.ParseException ex) { + log.error("Could not parse query, Jexl ParseException: " + ex); + } catch (Exception ex) { + log.error("Problem rewriting query, Exception: " + ex.getMessage()); + } + si.addOption(BooleanLogicIterator.FIELD_INDEX_QUERY, q); + + // Set the term cardinality option + sep = ""; + buf.delete(0, buf.length()); + for (Entry entry : termIndexInfo.getTermCardinality().entrySet()) { + buf.append(sep); + buf.append(entry.getKey()); + buf.append(":"); + buf.append(entry.getValue()); + sep = ","; + } + if (log.isDebugEnabled()) + log.debug("Setting scan option: " + BooleanLogicIterator.TERM_CARDINALITIES + " to " + buf.toString()); + si.addOption(BooleanLogicIterator.TERM_CARDINALITIES, buf.toString()); + if (this.useReadAheadIterator) { + if (log.isDebugEnabled()) { + log.debug("Enabling read ahead iterator with queue size: " + this.readAheadQueueSize + " and timeout: " + this.readAheadTimeOut); } + si.addOption(ReadAheadIterator.QUEUE_SIZE, this.readAheadQueueSize); + si.addOption(ReadAheadIterator.TIMEOUT, this.readAheadTimeOut); - int operator = JexlOperatorConstants.getJJTNodeType(entry.getValue().getOperator()); - if (!(operator == ParserTreeConstants.JJTEQNODE - || operator == ParserTreeConstants.JJTNENODE - || operator == ParserTreeConstants.JJTLENODE - || operator == ParserTreeConstants.JJTLTNODE - || operator == ParserTreeConstants.JJTGENODE - || operator == ParserTreeConstants.JJTGTNODE - || operator == ParserTreeConstants.JJTERNODE)) { - unsupportedOperatorSpecified = true; - break; - } - } - if (null != unevaluatedExpressions) - unevaluatedExpressions.trimToSize(); + } + + if (null != unevaluatedExpressions) { + StringBuilder unevaluatedExpressionList = new StringBuilder(); + String sep2 = ""; + for (String exp : unevaluatedExpressions) { + unevaluatedExpressionList.append(sep2).append(exp); + sep2 = ","; + } + if (log.isDebugEnabled()) + log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString()); + si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString()); + } + + bs.addScanIterator(si); + + processResults.start(); + processResults.suspend(); + long count = 0; + for (Entry entry : bs) { + count++; + // The key that is returned by the EvaluatingIterator is not the same key that is in + // the table. The value that is returned by the EvaluatingIterator is a kryo + // serialized EventFields object. + processResults.resume(); + Document d = this.createDocument(entry.getKey(), entry.getValue()); + results.getResults().add(d); + processResults.suspend(); + } + log.info(count + " matching entries found in optimized query."); + optimizationSucceeded = true; + processResults.stop(); + } catch (TableNotFoundException e) { + log.error(this.getTableName() + "not found", e); + throw new RuntimeException(this.getIndexTableName() + "not found", e); + } finally { + if (bs != null) { + bs.close(); + } + } + optimizedEventQuery.stop(); + } + optimizedQuery.stop(); + } + + // WE should look into finding a better way to handle whether we do an optimized query or not. + // We are not setting up an else condition here because we may have aborted the logic early in the if statement. + if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()) && !orsAllIndexed)) { + // if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()))) { + fullScanQuery.start(); + if (log.isDebugEnabled()) { + log.debug(hash + " Performing full scan query"); + } + + // Set up a full scan using the date ranges from the query + // Create BatchScanner, set the ranges, and setup the iterators. + BatchScanner bs = null; + try { + // The ranges are the start and end dates + Collection r = getFullScanRange(beginDate, endDate, terms); + ranges.addAll(r); + if (log.isDebugEnabled()) { - log.debug(hash + " unsupportedOperators: " + unsupportedOperatorSpecified - + " indexedTerms: " + indexedTerms.toString() + " orTerms: " + orTerms.toString() + - " unevaluatedExpressions: " + unevaluatedExpressions.toString()); - } - - //We can use the intersecting iterator over the field index as an optimization under the - //following conditions - // - // 1. No unsupported operators in the query. - // 2. No 'or' operators and at least one term indexed - // or - // 1. No unsupported operators in the query. - // 2. and all terms indexed - // or - // 1. All or'd terms are indexed. NOTE, this will potentially skip some queries and push to a full table scan - //// WE should look into finding a better way to handle whether we do an optimized query or not. - boolean optimizationSucceeded = false; - boolean orsAllIndexed = false; - if(orTerms.isEmpty()){ - orsAllIndexed = false; - }else{ - orsAllIndexed = indexedTerms.keySet().containsAll(orTerms); + log.debug(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString()); } - - if(log.isDebugEnabled()){ - log.debug("All or terms are indexed"); + + bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads); + bs.setRanges(ranges); + IteratorSetting si = new IteratorSetting(22, "eval", EvaluatingIterator.class); + // Create datatype regex if needed + if (null != typeFilter) { + StringBuilder buf = new StringBuilder(); + String s = ""; + for (String type : typeFilter) { + buf.append(s).append(type).append(".*"); + s = "|"; + } + if (log.isDebugEnabled()) + log.debug("Setting colf regex iterator to: " + buf.toString()); + IteratorSetting ri = new IteratorSetting(21, "typeFilter", RegExFilter.class); + RegExFilter.setRegexs(ri, null, buf.toString(), null, null, false); + bs.addScanIterator(ri); } - - if (!unsupportedOperatorSpecified && (((null == orTerms || orTerms.isEmpty()) && indexedTerms.size() > 0) || (fields.size() > 0 && indexedTerms.size() == fields.size()) ||orsAllIndexed ) ) { - optimizedQuery.start(); - //Set up intersecting iterator over field index. - - //Get information from the global index for the indexed terms. The results object will contain the term - //mapped to an object that contains the total count, and partitions where this term is located. - - //TODO: Should we cache indexed term information or does that not make sense since we are always loading data - queryGlobalIndex.start(); - IndexRanges termIndexInfo; - Set indexedColumns; - try { - //If fields is null or zero, then it's probably the case that the user entered a value - //to search for with no fields. Check for the value in index. - if (fields.isEmpty()) { - termIndexInfo = this.getTermIndexInformation(connector, auths, queryString, typeFilter); - if (null != termIndexInfo && termIndexInfo.getRanges().isEmpty()) { - //Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards - //in unhandled locations. - //Break out of here by throwing a named exception and do full scan - throw new DoNotPerformOptimizedQueryException(); - } - //We need to rewrite the query string here so that it's valid. - if (termIndexInfo instanceof UnionIndexRanges) { - UnionIndexRanges union = (UnionIndexRanges) termIndexInfo; - StringBuilder buf = new StringBuilder(); - String sep = ""; - for (String fieldName : union.getFieldNamesAndValues().keySet()) { - buf.append(sep).append(fieldName).append(" == "); - if (!(queryString.startsWith("'") && queryString.endsWith("'"))) { - buf.append("'").append(queryString).append("'"); - } else { - buf.append(queryString); - } - sep = " or "; - } - if (log.isDebugEnabled()) { - log.debug("Rewrote query for non-fielded single term query: " + queryString + " to " + buf.toString()); - } - queryString = buf.toString(); - //We also need to set the set of indexed terms since we found these in the index. - indexedColumns = union.getFieldNamesAndValues().keySet(); - } else { - throw new RuntimeException("Unexpected IndexRanges implementation"); - } - } else { - RangeCalculator calc = this.getTermIndexInformation(connector, auths, indexedTerms, terms, this.getIndexTableName(), this.getReverseIndexTableName(), queryString, this.queryThreads, typeFilter); - if (null == calc.getResult() || calc.getResult().isEmpty()) { - //Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards - //in unhandled locations. - //Break out of here by throwing a named exception and do full scan - throw new DoNotPerformOptimizedQueryException(); - } - termIndexInfo = new UnionIndexRanges(); - termIndexInfo.setIndexValuesToOriginalValues(calc.getIndexValues()); - termIndexInfo.setFieldNamesAndValues(calc.getIndexEntries()); - termIndexInfo.getTermCardinality().putAll(calc.getTermCardinalities()); - for (Range r : calc.getResult()) { - //foo is a placeholder and is ignored. - termIndexInfo.add("foo", r); - } - indexedColumns = termIndexInfo.getFieldNamesAndValues().keySet(); - } - } catch (TableNotFoundException e) { - log.error(this.getIndexTableName() + "not found", e); - throw new RuntimeException(this.getIndexTableName() + "not found", e); - } catch (org.apache.commons.jexl2.parser.ParseException e) { - throw new RuntimeException("Error determining ranges for query: " + queryString, e); - } catch (DoNotPerformOptimizedQueryException e) { - log.info("Indexed fields not found in index, performing full scan"); - termIndexInfo = null; - } - queryGlobalIndex.stop(); - - //Determine if we should proceed with optimized query based on results from the global index - boolean proceed = false; - if (null == termIndexInfo || termIndexInfo.getFieldNamesAndValues().values().size() == 0) { - proceed = false; - } else if (null != orTerms && orTerms.size() > 0 && (termIndexInfo.getFieldNamesAndValues().values().size() == indexedTerms.size())) { - proceed = true; - } else if (termIndexInfo.getFieldNamesAndValues().values().size() > 0) { - proceed = true; - }else if(orsAllIndexed){ - proceed = true; - } else { - proceed = false; - } - if (log.isDebugEnabled()) { - log.debug("Proceed with optimized query: " + proceed); - if (null != termIndexInfo) - log.debug("termIndexInfo.getTermsFound().size(): " + termIndexInfo.getFieldNamesAndValues().values().size() + " indexedTerms.size: " + indexedTerms.size() + " fields.size: " + fields.size()); - } - if (proceed) { - - if (log.isDebugEnabled()) { - log.debug(hash + " Performing optimized query"); - } - //Use the scan ranges from the GlobalIndexRanges object as the ranges for the batch scanner - ranges = termIndexInfo.getRanges(); - if (log.isDebugEnabled()) { - log.info(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString()); - } - - //Create BatchScanner, set the ranges, and setup the iterators. - optimizedEventQuery.start(); - BatchScanner bs = null; - try { - bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads); - bs.setRanges(ranges); - IteratorSetting si = new IteratorSetting(21, "eval", OptimizedQueryIterator.class); - - if (log.isDebugEnabled()) { - log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString); - } - //Set the query option - si.addOption(EvaluatingIterator.QUERY_OPTION, queryString); - //Set the Indexed Terms List option. This is the field name and normalized field value pair separated - //by a comma. - StringBuilder buf = new StringBuilder(); - String sep = ""; - for (Entry entry : termIndexInfo.getFieldNamesAndValues().entries()) { - buf.append(sep); - buf.append(entry.getKey()); - buf.append(":"); - buf.append(termIndexInfo.getIndexValuesToOriginalValues().get(entry.getValue())); - buf.append(":"); - buf.append(entry.getValue()); - if (sep.equals("")) { - sep = ";"; - } - } - if (log.isDebugEnabled()) { - log.debug("Setting scan option: " + FieldIndexQueryReWriter.INDEXED_TERMS_LIST + " to " + buf.toString()); - } - FieldIndexQueryReWriter rewriter = new FieldIndexQueryReWriter(); - String q=""; - try { - q=queryString; - q = rewriter.applyCaseSensitivity(q, true, false);//Set upper/lower case for fieldname/fieldvalue - Map opts = new HashMap(); - opts.put(FieldIndexQueryReWriter.INDEXED_TERMS_LIST, buf.toString()); - q = rewriter.removeNonIndexedTermsAndInvalidRanges(q, opts); - q = rewriter.applyNormalizedTerms(q, opts); - if(log.isDebugEnabled()){ - log.debug("runServerQuery, FieldIndex Query: "+q); - } - } catch (org.apache.commons.jexl2.parser.ParseException ex) { - log.error("Could not parse query, Jexl ParseException: "+ex); - } catch (Exception ex) { - log.error("Problem rewriting query, Exception: "+ex.getMessage()); - } - si.addOption(BooleanLogicIterator.FIELD_INDEX_QUERY,q); - - //Set the term cardinality option - sep = ""; - buf.delete(0, buf.length()); - for (Entry entry : termIndexInfo.getTermCardinality().entrySet()) { - buf.append(sep); - buf.append(entry.getKey()); - buf.append(":"); - buf.append(entry.getValue()); - sep = ","; - } - if (log.isDebugEnabled()) - log.debug("Setting scan option: " + BooleanLogicIterator.TERM_CARDINALITIES + " to " + buf.toString()); - si.addOption(BooleanLogicIterator.TERM_CARDINALITIES, buf.toString()); - if (this.useReadAheadIterator) { - if (log.isDebugEnabled()) { - log.debug("Enabling read ahead iterator with queue size: " + this.readAheadQueueSize - + " and timeout: " + this.readAheadTimeOut); - } - si.addOption(ReadAheadIterator.QUEUE_SIZE, this.readAheadQueueSize); - si.addOption(ReadAheadIterator.TIMEOUT, this.readAheadTimeOut); - - } - - if (null != unevaluatedExpressions) { - StringBuilder unevaluatedExpressionList = new StringBuilder(); - String sep2 = ""; - for (String exp : unevaluatedExpressions) { - unevaluatedExpressionList.append(sep2).append(exp); - sep2 = ","; - } - if (log.isDebugEnabled()) - log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString()); - si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString()); - } - - bs.addScanIterator(si); - - processResults.start(); - processResults.suspend(); - long count = 0; - for (Entry entry : bs) { - count++; - if (log.isDebugEnabled()) { - log.debug("Key: " + entry.getKey());// + "\nValue: " + entry.getValue() + "\n"); - } - //The key that is returned by the EvaluatingIterator is not the same key that is in - //the table. The value that is returned by the EvaluatingIterator is a kryo - //serialized EventFields object. - processResults.resume(); - Document d = this.createDocument(entry.getKey(), entry.getValue()); - results.getResults().add(d); - processResults.suspend(); - } - log.info(count + " matching entries found in optimized query."); - optimizationSucceeded = true; - processResults.stop(); - } catch (TableNotFoundException e) { - log.error(this.getTableName() + "not found", e); - throw new RuntimeException(this.getIndexTableName() + "not found", e); - } finally { - if (bs != null) { - bs.close(); - } - } - optimizedEventQuery.stop(); - } - optimizedQuery.stop(); + if (log.isDebugEnabled()) { + log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString); } - - // WE should look into finding a better way to handle whether we do an optimized query or not. - //We are not setting up an else condition here because we may have aborted the logic early in the if statement. - if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()) && !orsAllIndexed)) { - //if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()))) { - fullScanQuery.start(); - if (log.isDebugEnabled()) { - log.debug(hash + " Performing full scan query"); - } - - //Set up a full scan using the date ranges from the query - //Create BatchScanner, set the ranges, and setup the iterators. - BatchScanner bs = null; - try { - //The ranges are the start and end dates - Collection r = getFullScanRange(beginDate, endDate, terms); - ranges.addAll(r); - - if (log.isDebugEnabled()) { - log.debug(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString()); - } - - bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads); - bs.setRanges(ranges); - IteratorSetting si = new IteratorSetting(22, "eval", EvaluatingIterator.class); - //Create datatype regex if needed - if (null != typeFilter) { - StringBuilder buf = new StringBuilder(); - String s = ""; - for (String type : typeFilter) { - buf.append(s).append(type).append(".*"); - s = "|"; - } - if (log.isDebugEnabled()) - log.debug("Setting colf regex iterator to: " + buf.toString()); - IteratorSetting ri = new IteratorSetting(21, "typeFilter", RegExIterator.class); - ri.addOption(RegExFilter.COLF_REGEX, buf.toString()); - bs.addScanIterator(ri); - } - if (log.isDebugEnabled()) { - log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString); - } - si.addOption(EvaluatingIterator.QUERY_OPTION, queryString); - if (null != unevaluatedExpressions) { - StringBuilder unevaluatedExpressionList = new StringBuilder(); - String sep2 = ""; - for (String exp : unevaluatedExpressions) { - unevaluatedExpressionList.append(sep2).append(exp); - sep2 = ","; - } - if (log.isDebugEnabled()) - log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString()); - si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString()); - } - bs.addScanIterator(si); - long count = 0; - processResults.start(); - processResults.suspend(); - for (Entry entry : bs) { - count++; - //The key that is returned by the EvaluatingIterator is not the same key that is in - //the partition table. The value that is returned by the EvaluatingIterator is a kryo - //serialized EventFields object. - processResults.resume(); - Document d = this.createDocument(entry.getKey(), entry.getValue()); - results.getResults().add(d); - processResults.suspend(); - } - processResults.stop(); - log.info(count + " matching entries found in full scan query."); - } catch (TableNotFoundException e) { - log.error(this.getTableName() + "not found", e); - } finally { - if (bs != null) { - bs.close(); - } - } - fullScanQuery.stop(); + si.addOption(EvaluatingIterator.QUERY_OPTION, queryString); + if (null != unevaluatedExpressions) { + StringBuilder unevaluatedExpressionList = new StringBuilder(); + String sep2 = ""; + for (String exp : unevaluatedExpressions) { + unevaluatedExpressionList.append(sep2).append(exp); + sep2 = ","; + } + if (log.isDebugEnabled()) + log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString()); + si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString()); + } + bs.addScanIterator(si); + long count = 0; + processResults.start(); + processResults.suspend(); + for (Entry entry : bs) { + count++; + // The key that is returned by the EvaluatingIterator is not the same key that is in + // the partition table. The value that is returned by the EvaluatingIterator is a kryo + // serialized EventFields object. + processResults.resume(); + Document d = this.createDocument(entry.getKey(), entry.getValue()); + results.getResults().add(d); + processResults.suspend(); + } + processResults.stop(); + log.info(count + " matching entries found in full scan query."); + } catch (TableNotFoundException e) { + log.error(this.getTableName() + "not found", e); + } finally { + if (bs != null) { + bs.close(); } - - log.info("AbstractQueryLogic: " + queryString + " " + timeString(abstractQueryLogic.getTime())); - log.info(" 1) parse query " + timeString(parseQuery.getTime())); - log.info(" 2) query metadata " + timeString(queryMetadata.getTime())); - log.info(" 3) full scan query " + timeString(fullScanQuery.getTime())); - log.info(" 3) optimized query " + timeString(optimizedQuery.getTime())); - log.info(" 1) process results " + timeString(processResults.getTime())); - log.info(" 1) query global index " + timeString(queryGlobalIndex.getTime())); - log.info(hash + " Query completed."); - - return results; - } - private static String timeString(long millis) { - return String.format("%4.2f", millis / 1000.); + } + fullScanQuery.stop(); } + log.info("AbstractQueryLogic: " + queryString + " " + timeString(abstractQueryLogic.getTime())); + log.info(" 1) parse query " + timeString(parseQuery.getTime())); + log.info(" 2) query metadata " + timeString(queryMetadata.getTime())); + log.info(" 3) full scan query " + timeString(fullScanQuery.getTime())); + log.info(" 3) optimized query " + timeString(optimizedQuery.getTime())); + log.info(" 1) process results " + timeString(processResults.getTime())); + log.info(" 1) query global index " + timeString(queryGlobalIndex.getTime())); + log.info(hash + " Query completed."); + + return results; + } + + private static String timeString(long millis) { + return String.format("%4.2f", millis / 1000.); + } + }