Return-Path: X-Original-To: apmail-atlas-commits-archive@minotaur.apache.org Delivered-To: apmail-atlas-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7E3561861F for ; Thu, 3 Dec 2015 05:46:27 +0000 (UTC) Received: (qmail 31817 invoked by uid 500); 3 Dec 2015 05:46:27 -0000 Delivered-To: apmail-atlas-commits-archive@atlas.apache.org Received: (qmail 31784 invoked by uid 500); 3 Dec 2015 05:46:27 -0000 Mailing-List: contact commits-help@atlas.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.incubator.apache.org Delivered-To: mailing list commits@atlas.incubator.apache.org Received: (qmail 31775 invoked by uid 99); 3 Dec 2015 05:46:27 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Dec 2015 05:46:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id AFB97180A62 for ; Thu, 3 Dec 2015 05:46:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.226 X-Spam-Level: * X-Spam-Status: No, score=1.226 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id S4U2LAuyVXN1 for ; Thu, 3 Dec 2015 05:46:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 4731431AB6 for ; Thu, 3 Dec 2015 05:46:13 +0000 (UTC) Received: (qmail 31570 invoked by uid 99); 3 Dec 2015 05:46:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Dec 2015 05:46:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B2961E67DC; Thu, 3 Dec 2015 05:46:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sumasai@apache.org To: commits@atlas.incubator.apache.org Date: Thu, 03 Dec 2015 05:46:12 -0000 Message-Id: <9103f65cdad040eda75f65433612e13e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] incubator-atlas git commit: ATLAS-352 Improve write performance on type and entity creation with Hbase(sumasai) Repository: incubator-atlas Updated Branches: refs/heads/master 91ad0218f -> 919120f65 http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java new file mode 100644 index 0000000..f756a4a --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java @@ -0,0 +1,973 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.solr; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.thinkaurelius.titan.core.Order; +import com.thinkaurelius.titan.core.TitanElement; +import com.thinkaurelius.titan.core.attribute.Cmp; +import com.thinkaurelius.titan.core.attribute.Geo; +import com.thinkaurelius.titan.core.attribute.Geoshape; +import com.thinkaurelius.titan.core.attribute.Text; +import com.thinkaurelius.titan.core.schema.Mapping; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransaction; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfigurable; +import com.thinkaurelius.titan.diskstorage.PermanentBackendException; +import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.indexing.IndexEntry; +import com.thinkaurelius.titan.diskstorage.indexing.IndexFeatures; +import com.thinkaurelius.titan.diskstorage.indexing.IndexMutation; +import com.thinkaurelius.titan.diskstorage.indexing.IndexProvider; +import com.thinkaurelius.titan.diskstorage.indexing.IndexQuery; +import com.thinkaurelius.titan.diskstorage.indexing.KeyInformation; +import com.thinkaurelius.titan.diskstorage.indexing.RawQuery; +import com.thinkaurelius.titan.diskstorage.util.DefaultTransaction; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; +import com.thinkaurelius.titan.graphdb.database.serialize.AttributeUtil; +import com.thinkaurelius.titan.graphdb.database.serialize.attribute.AbstractDecimal; +import com.thinkaurelius.titan.graphdb.query.TitanPredicate; +import com.thinkaurelius.titan.graphdb.query.condition.And; +import com.thinkaurelius.titan.graphdb.query.condition.Condition; +import com.thinkaurelius.titan.graphdb.query.condition.Not; +import com.thinkaurelius.titan.graphdb.query.condition.Or; +import com.thinkaurelius.titan.graphdb.query.condition.PredicateCondition; +import com.thinkaurelius.titan.graphdb.types.ParameterType; +import org.apache.commons.lang.StringUtils; +import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.LBHttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.UUID; + +import static com.thinkaurelius.titan.core.attribute.Cmp.*; +import static com.thinkaurelius.titan.core.schema.Mapping.*; + +/** + * NOTE: Copied from titan for supporting sol5. Do not change + */ +@PreInitializeConfigOptions +public class Solr5Index implements IndexProvider { + + private static final Logger logger = LoggerFactory.getLogger(Solr5Index.class); + + + private static final String DEFAULT_ID_FIELD = "id"; + + private enum Mode { + HTTP, CLOUD; + + public static Mode parse(String mode) { + for (Mode m : Mode.values()) { + if (m.toString().equalsIgnoreCase(mode)) return m; + } + throw new IllegalArgumentException("Unrecognized mode: "+mode); + } + } + + public static final ConfigNamespace SOLR_NS = + new ConfigNamespace(GraphDatabaseConfiguration.INDEX_NS, "solr", "Solr index configuration"); + + public static final ConfigOption SOLR_MODE = new ConfigOption(SOLR_NS,"mode", + "The operation mode for Solr which is either via HTTP (`http`) or using SolrCloud (`cloud`)", + ConfigOption.Type.GLOBAL_OFFLINE, "cloud"); + + public static final ConfigOption DYNAMIC_FIELDS = new ConfigOption(SOLR_NS,"dyn-fields", + "Whether to use dynamic fields (which appends the data type to the field name). If dynamic fields is disabled" + + "the user must map field names and define them explicitly in the schema.", + ConfigOption.Type.GLOBAL_OFFLINE, true); + + public static final ConfigOption KEY_FIELD_NAMES = new ConfigOption(SOLR_NS,"key-field-names", + "Field name that uniquely identifies each document in Solr. Must be specified as a list of `collection=field`.", + ConfigOption.Type.GLOBAL, String[].class); + + public static final ConfigOption TTL_FIELD = new ConfigOption(SOLR_NS,"ttl_field", + "Name of the TTL field for Solr collections.", + ConfigOption.Type.GLOBAL_OFFLINE, "ttl"); + + /** SolrCloud Configuration */ + + public static final ConfigOption ZOOKEEPER_URL = new ConfigOption(SOLR_NS,"zookeeper-url", + "URL of the Zookeeper instance coordinating the SolrCloud cluster", + ConfigOption.Type.MASKABLE, "localhost:2181"); + + public static final ConfigOption NUM_SHARDS = new ConfigOption(SOLR_NS,"num-shards", + "Number of shards for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + public static final ConfigOption MAX_SHARDS_PER_NODE = new ConfigOption(SOLR_NS,"max-shards-per-node", + "Maximum number of shards per node. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + public static final ConfigOption REPLICATION_FACTOR = new ConfigOption(SOLR_NS,"replication-factor", + "Replication factor for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + + /** HTTP Configuration */ + + public static final ConfigOption HTTP_URLS = new ConfigOption(SOLR_NS,"http-urls", + "List of URLs to use to connect to Solr Servers (LBHttpSolrClient is used), don't add core or collection name to the URL.", + ConfigOption.Type.MASKABLE, new String[] { "http://localhost:8983/solr" }); + + public static final ConfigOption HTTP_CONNECTION_TIMEOUT = new ConfigOption(SOLR_NS,"http-connection-timeout", + "Solr HTTP connection timeout.", + ConfigOption.Type.MASKABLE, 5000); + + public static final ConfigOption HTTP_ALLOW_COMPRESSION = new ConfigOption(SOLR_NS,"http-compression", + "Enable/disable compression on the HTTP connections made to Solr.", + ConfigOption.Type.MASKABLE, false); + + public static final ConfigOption HTTP_MAX_CONNECTIONS_PER_HOST = new ConfigOption(SOLR_NS,"http-max-per-host", + "Maximum number of HTTP connections per Solr host.", + ConfigOption.Type.MASKABLE, 20); + + public static final ConfigOption HTTP_GLOBAL_MAX_CONNECTIONS = new ConfigOption(SOLR_NS,"http-max", + "Maximum number of HTTP connections in total to all Solr servers.", + ConfigOption.Type.MASKABLE, 100); + + public static final ConfigOption WAIT_SEARCHER = new ConfigOption(SOLR_NS, "wait-searcher", + "When mutating - wait for the index to reflect new mutations before returning. This can have a negative impact on performance.", + ConfigOption.Type.LOCAL, false); + + + + private static final IndexFeatures SOLR_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL() + .setDefaultStringMapping(TEXT).supportedStringMappings(TEXT, STRING).build(); + + private final SolrClient solrClient; + private final Configuration configuration; + private final Mode mode; + private final boolean dynFields; + private final Map keyFieldIds; + private final String ttlField; + private final int maxResults; + private final boolean waitSearcher; + + public Solr5Index(final Configuration config) throws BackendException { + Preconditions.checkArgument(config!=null); + configuration = config; + + mode = Mode.parse(config.get(SOLR_MODE)); + dynFields = config.get(DYNAMIC_FIELDS); + keyFieldIds = parseKeyFieldsForCollections(config); + maxResults = config.get(GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE); + ttlField = config.get(TTL_FIELD); + waitSearcher = config.get(WAIT_SEARCHER); + + if (mode==Mode.CLOUD) { + String zookeeperUrl = config.get(Solr5Index.ZOOKEEPER_URL); + CloudSolrClient cloudServer = new CloudSolrClient(zookeeperUrl, true); + cloudServer.connect(); + solrClient = cloudServer; + } else if (mode==Mode.HTTP) { + HttpClient clientParams = HttpClientUtil.createClient(new ModifiableSolrParams() {{ + add(HttpClientUtil.PROP_ALLOW_COMPRESSION, config.get(HTTP_ALLOW_COMPRESSION).toString()); + add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, config.get(HTTP_CONNECTION_TIMEOUT).toString()); + add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, config.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString()); + add(HttpClientUtil.PROP_MAX_CONNECTIONS, config.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString()); + }}); + + solrClient = new LBHttpSolrClient(clientParams, config.get(HTTP_URLS)); + + + } else { + throw new IllegalArgumentException("Unsupported Solr operation mode: " + mode); + } + } + + private Map parseKeyFieldsForCollections(Configuration config) throws BackendException { + Map keyFieldNames = new HashMap(); + String[] collectionFieldStatements = config.has(KEY_FIELD_NAMES)?config.get(KEY_FIELD_NAMES):new String[0]; + for (String collectionFieldStatement : collectionFieldStatements) { + String[] parts = collectionFieldStatement.trim().split("="); + if (parts.length != 2) { + throw new PermanentBackendException("Unable to parse the collection name / key field name pair. It should be of the format collection=field"); + } + String collectionName = parts[0]; + String keyFieldName = parts[1]; + keyFieldNames.put(collectionName, keyFieldName); + } + return keyFieldNames; + } + + private String getKeyFieldId(String collection) { + String field = keyFieldIds.get(collection); + if (field==null) field = DEFAULT_ID_FIELD; + return field; + } + + /** + * Unlike the ElasticSearch Index, which is schema free, Solr requires a schema to + * support searching. This means that you will need to modify the solr schema with the + * appropriate field definitions in order to work properly. If you have a running instance + * of Solr and you modify its schema with new fields, don't forget to re-index! + * @param store Index store + * @param key New key to register + * @param information Datatype to register for the key + * @param tx enclosing transaction + * @throws com.thinkaurelius.titan.diskstorage.BackendException + */ + @Override + public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException { + if (mode==Mode.CLOUD) { + CloudSolrClient client = (CloudSolrClient) solrClient; + try { + createCollectionIfNotExists(client, configuration, store); + } catch (IOException e) { + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + throw new PermanentBackendException(e); + } catch (InterruptedException e) { + throw new PermanentBackendException(e); + } catch (KeeperException e) { + throw new PermanentBackendException(e); + } + } + //Since all data types must be defined in the schema.xml, pre-registering a type does not work + } + + @Override + public void mutate(Map> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + logger.debug("Mutating SOLR"); + try { + for (Map.Entry> stores : mutations.entrySet()) { + String collectionName = stores.getKey(); + String keyIdField = getKeyFieldId(collectionName); + + List deleteIds = new ArrayList(); + Collection changes = new ArrayList(); + + for (Map.Entry entry : stores.getValue().entrySet()) { + String docId = entry.getKey(); + IndexMutation mutation = entry.getValue(); + Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); + Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); + Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); + + //Handle any deletions + if (mutation.hasDeletions()) { + if (mutation.isDeleted()) { + logger.trace("Deleting entire document {}", docId); + deleteIds.add(docId); + } else { + HashSet fieldDeletions = Sets.newHashSet(mutation.getDeletions()); + if (mutation.hasAdditions()) { + for (IndexEntry indexEntry : mutation.getAdditions()) { + fieldDeletions.remove(indexEntry); + } + } + deleteIndividualFieldsFromIndex(collectionName, keyIdField, docId, fieldDeletions); + } + } + + if (mutation.hasAdditions()) { + int ttl = mutation.determineTTL(); + + SolrInputDocument doc = new SolrInputDocument(); + doc.setField(keyIdField, docId); + + boolean isNewDoc = mutation.isNew(); + + if (isNewDoc) + logger.trace("Adding new document {}", docId); + + for (IndexEntry e : mutation.getAdditions()) { + final Object fieldValue = convertValue(e.value); + doc.setField(e.field, isNewDoc + ? fieldValue : new HashMap(1) {{ put("set", fieldValue); }}); + } + if (ttl>0) { + Preconditions.checkArgument(isNewDoc,"Solr only supports TTL on new documents [%s]",docId); + doc.setField(ttlField, String.format("+%dSECONDS", ttl)); + } + changes.add(doc); + } + } + + commitDeletes(collectionName, deleteIds); + commitDocumentChanges(collectionName, changes); + } + } catch (Exception e) { + throw storageException(e); + } + } + + private Object convertValue(Object value) throws BackendException { + if (value instanceof Geoshape) + return GeoToWktConverter.convertToWktString((Geoshape) value); + // in order to serialize/deserialize properly Solr will have to have an + // access to Titan source which has Decimal type, so for now we simply convert to + // double and let Solr do the same thing or fail. + if (value instanceof AbstractDecimal) + return ((AbstractDecimal) value).doubleValue(); + if (value instanceof UUID) + return value.toString(); + return value; + } + + @Override + public void restore(Map>> documents, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + try { + for (Map.Entry>> stores : documents.entrySet()) { + final String collectionName = stores.getKey(); + + List deleteIds = new ArrayList(); + List newDocuments = new ArrayList(); + + for (Map.Entry> entry : stores.getValue().entrySet()) { + final String docID = entry.getKey(); + final List content = entry.getValue(); + + if (content == null || content.isEmpty()) { + if (logger.isTraceEnabled()) + logger.trace("Deleting document [{}]", docID); + + deleteIds.add(docID); + continue; + } + + newDocuments.add(new SolrInputDocument() {{ + setField(getKeyFieldId(collectionName), docID); + + for (IndexEntry addition : content) { + Object fieldValue = addition.value; + setField(addition.field, convertValue(fieldValue)); + } + }}); + } + + commitDeletes(collectionName, deleteIds); + commitDocumentChanges(collectionName, newDocuments); + } + } catch (Exception e) { + throw new TemporaryBackendException("Could not restore Solr index", e); + } + } + + private void deleteIndividualFieldsFromIndex(String collectionName, String keyIdField, String docId, HashSet fieldDeletions) throws SolrServerException, IOException { + if (fieldDeletions.isEmpty()) return; + + Map fieldDeletes = new HashMap(1) {{ put("set", null); }}; + + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(keyIdField, docId); + StringBuilder sb = new StringBuilder(); + for (IndexEntry fieldToDelete : fieldDeletions) { + doc.addField(fieldToDelete.field, fieldDeletes); + sb.append(fieldToDelete).append(","); + } + + if (logger.isTraceEnabled()) + logger.trace("Deleting individual fields [{}] for document {}", sb.toString(), docId); + + UpdateRequest singleDocument = newUpdateRequest(); + singleDocument.add(doc); + solrClient.request(singleDocument, collectionName); + } + + private void commitDocumentChanges(String collectionName, Collection documents) throws SolrServerException, IOException { + if (documents.size() == 0) return; + + try { + solrClient.request(newUpdateRequest().add(documents), collectionName); + } catch (HttpSolrClient.RemoteSolrException rse) { + logger.error("Unable to save documents to Solr as one of the shape objects stored were not compatible with Solr.", rse); + logger.error("Details in failed document batch: "); + for (SolrInputDocument d : documents) { + Collection fieldNames = d.getFieldNames(); + for (String name : fieldNames) { + logger.error(name + ":" + d.getFieldValue(name).toString()); + } + } + + throw rse; + } + } + + private void commitDeletes(String collectionName, List deleteIds) throws SolrServerException, IOException { + if (deleteIds.size() == 0) return; + solrClient.request(newUpdateRequest().deleteById(deleteIds), collectionName); + } + + @Override + public List query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + List result; + String collection = query.getStore(); + String keyIdField = getKeyFieldId(collection); + SolrQuery solrQuery = new SolrQuery("*:*"); + String queryFilter = buildQueryFilter(query.getCondition(), informations.get(collection)); + solrQuery.addFilterQuery(queryFilter); + if (!query.getOrder().isEmpty()) { + List orders = query.getOrder(); + for (IndexQuery.OrderEntry order1 : orders) { + String item = order1.getKey(); + SolrQuery.ORDER order = order1.getOrder() == Order.ASC ? SolrQuery.ORDER.asc : SolrQuery.ORDER.desc; + solrQuery.addSort(new SolrQuery.SortClause(item, order)); + } + } + solrQuery.setStart(0); + if (query.hasLimit()) { + solrQuery.setRows(query.getLimit()); + } else { + solrQuery.setRows(maxResults); + } + try { + QueryResponse response = solrClient.query(collection, solrQuery); + + if (logger.isDebugEnabled()) + logger.debug("Executed query [{}] in {} ms", query.getCondition(), response.getElapsedTime()); + + int totalHits = response.getResults().size(); + + if (!query.hasLimit() && totalHits >= maxResults) + logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); + + result = new ArrayList(totalHits); + for (SolrDocument hit : response.getResults()) { + result.add(hit.getFieldValue(keyIdField).toString()); + } + } catch (IOException e) { + logger.error("Query did not complete : ", e); + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + logger.error("Unable to query Solr index.", e); + throw new PermanentBackendException(e); + } + return result; + } + + @Override + public Iterable> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + List> result; + String collection = query.getStore(); + String keyIdField = getKeyFieldId(collection); + SolrQuery solrQuery = new SolrQuery(query.getQuery()) + .addField(keyIdField) + .setIncludeScore(true) + .setStart(query.getOffset()) + .setRows(query.hasLimit() ? query.getLimit() : maxResults); + + try { + QueryResponse response = solrClient.query(collection, solrQuery); + if (logger.isDebugEnabled()) + logger.debug("Executed query [{}] in {} ms", query.getQuery(), response.getElapsedTime()); + + int totalHits = response.getResults().size(); + if (!query.hasLimit() && totalHits >= maxResults) { + logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); + } + result = new ArrayList>(totalHits); + + for (SolrDocument hit : response.getResults()) { + double score = Double.parseDouble(hit.getFieldValue("score").toString()); + result.add(new RawQuery.Result(hit.getFieldValue(keyIdField).toString(), score)); + } + } catch (IOException e) { + logger.error("Query did not complete : ", e); + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + logger.error("Unable to query Solr index.", e); + throw new PermanentBackendException(e); + } + return result; + } + + private static String escapeValue(Object value) { + return ClientUtils.escapeQueryChars(value.toString()); + } + + public String buildQueryFilter(Condition condition, KeyInformation.StoreRetriever informations) { + if (condition instanceof PredicateCondition) { + PredicateCondition atom = (PredicateCondition) condition; + Object value = atom.getValue(); + String key = atom.getKey(); + TitanPredicate titanPredicate = atom.getPredicate(); + + if (value instanceof Number) { + String queryValue = escapeValue(value); + Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on numeric types: " + titanPredicate); + Cmp numRel = (Cmp) titanPredicate; + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + case LESS_THAN: + //use right curly to mean up to but not including value + return (key + ":[* TO " + queryValue + "}"); + case LESS_THAN_EQUAL: + return (key + ":[* TO " + queryValue + "]"); + case GREATER_THAN: + //use left curly to mean greater than but not including value + return (key + ":{" + queryValue + " TO *]"); + case GREATER_THAN_EQUAL: + return (key + ":[" + queryValue + " TO *]"); + default: throw new IllegalArgumentException("Unexpected relation: " + numRel); + } + } else if (value instanceof String) { + Mapping map = getStringMapping(informations.get(key)); + assert map== TEXT || map== STRING; + if (map== TEXT && !titanPredicate.toString().startsWith("CONTAINS")) + throw new IllegalArgumentException("Text mapped string values only support CONTAINS queries and not: " + titanPredicate); + if (map== STRING && titanPredicate.toString().startsWith("CONTAINS")) + throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + titanPredicate); + + //Special case + if (titanPredicate == Text.CONTAINS) { + //e.g. - if terms tomorrow and world were supplied, and fq=text:(tomorrow world) + //sample data set would return 2 documents: one where text = Tomorrow is the World, + //and the second where text = Hello World. Hence, we are decomposing the query string + //and building an AND query explicitly because we need AND semantics + value = ((String) value).toLowerCase(); + List terms = Text.tokenize((String) value); + + if (terms.isEmpty()) { + return ""; + } else if (terms.size() == 1) { + return (key + ":(" + escapeValue(terms.get(0)) + ")"); + } else { + And andTerms = new And(); + for (String term : terms) { + andTerms.add(new PredicateCondition(key, titanPredicate, term)); + } + return buildQueryFilter(andTerms, informations); + } + } + if (titanPredicate == Text.PREFIX || titanPredicate == Text.CONTAINS_PREFIX) { + return (key + ":" + escapeValue(value) + "*"); + } else if (titanPredicate == Text.REGEX || titanPredicate == Text.CONTAINS_REGEX) { + return (key + ":/" + value + "/"); + } else if (titanPredicate == EQUAL) { + return (key + ":\"" + escapeValue(value) + "\""); + } else if (titanPredicate == NOT_EQUAL) { + return ("-" + key + ":\"" + escapeValue(value) + "\""); + } else { + throw new IllegalArgumentException("Relation is not supported for string value: " + titanPredicate); + } + } else if (value instanceof Geoshape) { + Geoshape geo = (Geoshape)value; + if (geo.getType() == Geoshape.Type.CIRCLE) { + Geoshape.Point center = geo.getPoint(); + return ("{!geofilt sfield=" + key + + " pt=" + center.getLatitude() + "," + center.getLongitude() + + " d=" + geo.getRadius() + "} distErrPct=0"); //distance in kilometers + } else if (geo.getType() == Geoshape.Type.BOX) { + Geoshape.Point southwest = geo.getPoint(0); + Geoshape.Point northeast = geo.getPoint(1); + return (key + ":[" + southwest.getLatitude() + "," + southwest.getLongitude() + + " TO " + northeast.getLatitude() + "," + northeast.getLongitude() + "]"); + } else if (geo.getType() == Geoshape.Type.POLYGON) { + List coordinates = getPolygonPoints(geo); + StringBuilder poly = new StringBuilder(key + ":\"IsWithin(POLYGON(("); + for (Geoshape.Point coordinate : coordinates) { + poly.append(coordinate.getLongitude()).append(" ").append(coordinate.getLatitude()).append(", "); + } + //close the polygon with the first coordinate + poly.append(coordinates.get(0).getLongitude()).append(" ").append(coordinates.get(0).getLatitude()); + poly.append(")))\" distErrPct=0"); + return (poly.toString()); + } + } else if (value instanceof Date) { + String queryValue = escapeValue(toIsoDate((Date)value)); + Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on date types: " + titanPredicate); + Cmp numRel = (Cmp) titanPredicate; + + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + case LESS_THAN: + //use right curly to mean up to but not including value + return (key + ":[* TO " + queryValue + "}"); + case LESS_THAN_EQUAL: + return (key + ":[* TO " + queryValue + "]"); + case GREATER_THAN: + //use left curly to mean greater than but not including value + return (key + ":{" + queryValue + " TO *]"); + case GREATER_THAN_EQUAL: + return (key + ":[" + queryValue + " TO *]"); + default: throw new IllegalArgumentException("Unexpected relation: " + numRel); + } + } else if (value instanceof Boolean) { + Cmp numRel = (Cmp) titanPredicate; + String queryValue = escapeValue(value); + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + default: + throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL"); + } + } else if (value instanceof UUID) { + if (titanPredicate == EQUAL) { + return (key + ":\"" + escapeValue(value) + "\""); + } else if (titanPredicate == NOT_EQUAL) { + return ("-" + key + ":\"" + escapeValue(value) + "\""); + } else { + throw new IllegalArgumentException("Relation is not supported for uuid value: " + titanPredicate); + } + } else throw new IllegalArgumentException("Unsupported type: " + value); + } else if (condition instanceof Not) { + String sub = buildQueryFilter(((Not)condition).getChild(),informations); + if (StringUtils.isNotBlank(sub)) return "-("+sub+")"; + else return ""; + } else if (condition instanceof And) { + int numChildren = ((And) condition).size(); + StringBuilder sb = new StringBuilder(); + for (Condition c : condition.getChildren()) { + String sub = buildQueryFilter(c, informations); + + if (StringUtils.isBlank(sub)) + continue; + + // we don't have to add "+" which means AND iff + // a. it's a NOT query, + // b. expression is a single statement in the AND. + if (!sub.startsWith("-") && numChildren > 1) + sb.append("+"); + + sb.append(sub).append(" "); + } + return sb.toString(); + } else if (condition instanceof Or) { + StringBuilder sb = new StringBuilder(); + int element=0; + for (Condition c : condition.getChildren()) { + String sub = buildQueryFilter(c,informations); + if (StringUtils.isBlank(sub)) continue; + if (element==0) sb.append("("); + else sb.append(" OR "); + sb.append(sub); + element++; + } + if (element>0) sb.append(")"); + return sb.toString(); + } else { + throw new IllegalArgumentException("Invalid condition: " + condition); + } + return null; + } + + private String toIsoDate(Date value) { + TimeZone tz = TimeZone.getTimeZone("UTC"); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + df.setTimeZone(tz); + return df.format(value); + } + + private List getPolygonPoints(Geoshape polygon) { + List locations = new ArrayList(); + + int index = 0; + boolean hasCoordinates = true; + while (hasCoordinates) { + try { + locations.add(polygon.getPoint(index)); + } catch (ArrayIndexOutOfBoundsException ignore) { + //just means we asked for a point past the size of the list + //of known coordinates + hasCoordinates = false; + } + } + + return locations; + } + + /** + * Solr handles all transactions on the server-side. That means all + * commit, optimize, or rollback applies since the last commit/optimize/rollback. + * Solr documentation recommends best way to update Solr is in one process to avoid + * race conditions. + * + * @return New Transaction Handle + * @throws com.thinkaurelius.titan.diskstorage.BackendException + */ + @Override + public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException { + return new DefaultTransaction(config); + } + + @Override + public void close() throws BackendException { + logger.trace("Shutting down connection to Solr", solrClient); + try { + solrClient.close(); + } catch (IOException e) { + throw new TemporaryBackendException(e); + } + } + + @Override + public void clearStorage() throws BackendException { + try { + if (mode!=Mode.CLOUD) throw new UnsupportedOperationException("Operation only supported for SolrCloud"); + logger.debug("Clearing storage from Solr: {}", solrClient); + ZkStateReader zkStateReader = ((CloudSolrClient) solrClient).getZkStateReader(); + zkStateReader.updateClusterState(true); + ClusterState clusterState = zkStateReader.getClusterState(); + for (String collection : clusterState.getCollections()) { + logger.debug("Clearing collection [{}] in Solr",collection); + UpdateRequest deleteAll = newUpdateRequest(); + deleteAll.deleteByQuery("*:*"); + solrClient.request(deleteAll, collection); + } + + } catch (SolrServerException e) { + logger.error("Unable to clear storage from index due to server error on Solr.", e); + throw new PermanentBackendException(e); + } catch (IOException e) { + logger.error("Unable to clear storage from index due to low-level I/O error.", e); + throw new PermanentBackendException(e); + } catch (Exception e) { + logger.error("Unable to clear storage from index due to general error.", e); + throw new PermanentBackendException(e); + } + } + + @Override + public boolean supports(KeyInformation information, TitanPredicate titanPredicate) { + Class dataType = information.getDataType(); + Mapping mapping = getMapping(information); + if (mapping!= DEFAULT && !AttributeUtil.isString(dataType)) return false; + + if (Number.class.isAssignableFrom(dataType)) { + return titanPredicate instanceof Cmp; + } else if (dataType == Geoshape.class) { + return titanPredicate == Geo.WITHIN; + } else if (AttributeUtil.isString(dataType)) { + switch(mapping) { + case DEFAULT: + case TEXT: + return titanPredicate == Text.CONTAINS || titanPredicate == Text.CONTAINS_PREFIX || titanPredicate == Text.CONTAINS_REGEX; + case STRING: + return titanPredicate == EQUAL || titanPredicate== NOT_EQUAL || titanPredicate==Text.REGEX || titanPredicate==Text.PREFIX; + // case TEXTSTRING: + // return (titanPredicate instanceof Text) || titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL; + } + } else if (dataType == Date.class) { + if (titanPredicate instanceof Cmp) return true; + } else if (dataType == Boolean.class) { + return titanPredicate == EQUAL || titanPredicate == NOT_EQUAL; + } else if (dataType == UUID.class) { + return titanPredicate == EQUAL || titanPredicate== NOT_EQUAL; + } + return false; + } + + @Override + public boolean supports(KeyInformation information) { + Class dataType = information.getDataType(); + Mapping mapping = getMapping(information); + if (Number.class.isAssignableFrom(dataType) || dataType == Geoshape.class || dataType == Date.class || dataType == Boolean.class || dataType == UUID.class) { + if (mapping== DEFAULT) return true; + } else if (AttributeUtil.isString(dataType)) { + if (mapping== DEFAULT || mapping== TEXT || mapping== STRING) return true; + } + return false; + } + + @Override + public String mapKey2Field(String key, KeyInformation keyInfo) { + Preconditions.checkArgument(!StringUtils.containsAny(key, new char[]{' '}),"Invalid key name provided: %s",key); + if (!dynFields) return key; + if (ParameterType.MAPPED_NAME.hasParameter(keyInfo.getParameters())) return key; + String postfix; + Class datatype = keyInfo.getDataType(); + if (AttributeUtil.isString(datatype)) { + Mapping map = getStringMapping(keyInfo); + switch (map) { + case TEXT: postfix = "_t"; break; + case STRING: postfix = "_s"; break; + default: throw new IllegalArgumentException("Unsupported string mapping: " + map); + } + } else if (AttributeUtil.isWholeNumber(datatype)) { + if (datatype.equals(Long.class)) postfix = "_l"; + else postfix = "_i"; + } else if (AttributeUtil.isDecimal(datatype)) { + if (datatype.equals(Float.class)) postfix = "_f"; + else postfix = "_d"; + } else if (datatype.equals(Geoshape.class)) { + postfix = "_g"; + } else if (datatype.equals(Date.class)) { + postfix = "_dt"; + } else if (datatype.equals(Boolean.class)) { + postfix = "_b"; + } else if (datatype.equals(UUID.class)) { + postfix = "_uuid"; + } else throw new IllegalArgumentException("Unsupported data type ["+datatype+"] for field: " + key); + return key+postfix; + } + + @Override + public IndexFeatures getFeatures() { + return SOLR_FEATURES; + } + + /* + ################# UTILITY METHODS ####################### + */ + + private static Mapping getStringMapping(KeyInformation information) { + assert AttributeUtil.isString(information.getDataType()); + Mapping map = getMapping(information); + if (map== DEFAULT) map = TEXT; + return map; + } + + private UpdateRequest newUpdateRequest() { + UpdateRequest req = new UpdateRequest(); + req.setAction(UpdateRequest.ACTION.COMMIT, true, true); + if (waitSearcher) { + req.setAction(UpdateRequest.ACTION.COMMIT, true, true); + } + return req; + } + + private BackendException storageException(Exception solrException) { + return new TemporaryBackendException("Unable to complete query on Solr.", solrException); + } + + private static void createCollectionIfNotExists(CloudSolrClient client, Configuration config, String collection) + throws IOException, SolrServerException, KeeperException, InterruptedException { + if (!checkIfCollectionExists(client, collection)) { + Integer numShards = config.get(NUM_SHARDS); + Integer maxShardsPerNode = config.get(MAX_SHARDS_PER_NODE); + Integer replicationFactor = config.get(REPLICATION_FACTOR); + + CollectionAdminRequest.Create createRequest = new CollectionAdminRequest.Create(); + + createRequest.setConfigName(collection); + createRequest.setCollectionName(collection); + createRequest.setNumShards(numShards); + createRequest.setMaxShardsPerNode(maxShardsPerNode); + createRequest.setReplicationFactor(replicationFactor); + + CollectionAdminResponse createResponse = createRequest.process(client); + if (createResponse.isSuccess()) { + logger.trace("Collection {} successfully created.", collection); + } else { + throw new SolrServerException(Joiner.on("\n").join(createResponse.getErrorMessages())); + } + } + + waitForRecoveriesToFinish(client, collection); + } + + /** + * Checks if the collection has already been created in Solr. + */ + private static boolean checkIfCollectionExists(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { + ZkStateReader zkStateReader = server.getZkStateReader(); + zkStateReader.updateClusterState(true); + ClusterState clusterState = zkStateReader.getClusterState(); + return clusterState.getCollectionOrNull(collection) != null; + } + + /** + * Wait for all the collection shards to be ready. + */ + private static void waitForRecoveriesToFinish(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { + ZkStateReader zkStateReader = server.getZkStateReader(); + try { + boolean cont = true; + + while (cont) { + boolean sawLiveRecovering = false; + zkStateReader.updateClusterState(true); + ClusterState clusterState = zkStateReader.getClusterState(); + Map slices = clusterState.getSlicesMap(collection); + Preconditions.checkNotNull("Could not find collection:" + collection, slices); + + for (Map.Entry entry : slices.entrySet()) { + Map shards = entry.getValue().getReplicasMap(); + for (Map.Entry shard : shards.entrySet()) { + String state = shard.getValue().getStr(ZkStateReader.STATE_PROP); + if ((state.equals(ZkStateReader.RECOVERING) + || state.equals(ZkStateReader.SYNC) || state + .equals(ZkStateReader.DOWN)) + && clusterState.liveNodesContain(shard.getValue().getStr( + ZkStateReader.NODE_NAME_PROP))) { + sawLiveRecovering = true; + } + } + } + if (!sawLiveRecovering) { + cont = false; + } else { + Thread.sleep(1000); + } + } + } finally { + logger.info("Exiting solr wait"); + } + } + + private static class GeoToWktConverter { + /** + * {@link com.thinkaurelius.titan.core.attribute.Geoshape} stores Points in the String format: point[X.0,Y.0]. + * Solr needs it to be in Well-Known Text format: POINT(X.0 Y.0) + */ + static String convertToWktString(Geoshape fieldValue) throws BackendException { + if (fieldValue.getType() == Geoshape.Type.POINT) { + Geoshape.Point point = fieldValue.getPoint(); + return "POINT(" + point.getLongitude() + " " + point.getLatitude() + ")"; + } else { + throw new PermanentBackendException("Cannot index " + fieldValue.getType()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/test/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediatorTest.java ---------------------------------------------------------------------- diff --git a/titan/src/test/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediatorTest.java b/titan/src/test/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediatorTest.java new file mode 100644 index 0000000..d0fd401 --- /dev/null +++ b/titan/src/test/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediatorTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.locking; + +import com.thinkaurelius.titan.diskstorage.hbase.HBaseTransaction; +import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider; +import com.thinkaurelius.titan.diskstorage.util.time.Timestamps; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.util.KeyColumn; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +public class LocalLockMediatorTest { + + private static final String LOCK_NAMESPACE = "test"; + private static final StaticBuffer LOCK_ROW = StaticArrayBuffer.of(new byte[]{1}); + private static final StaticBuffer LOCK_COL = StaticArrayBuffer.of(new byte[]{1}); + private static final KeyColumn kc = new KeyColumn(LOCK_ROW, LOCK_COL); + private static final HBaseTransaction mockTx1 = Mockito.mock(HBaseTransaction.class); + private static final HBaseTransaction mockTx2 = Mockito.mock(HBaseTransaction.class); + + @Test + public void testLock() throws InterruptedException { + TimestampProvider times = Timestamps.MICRO; + LocalLockMediator llm = + new LocalLockMediator(LOCK_NAMESPACE, times); + + //Expire immediately + Assert.assertTrue(llm.lock(kc, mockTx1, times.getTime(0, TimeUnit.NANOSECONDS))); + Assert.assertTrue(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS))); + + llm = new LocalLockMediator(LOCK_NAMESPACE, times); + + //Expire later + Assert.assertTrue(llm.lock(kc, mockTx1, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS))); + //So second lock should fail on same keyCol + Assert.assertFalse(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS))); + + //Unlock + Assert.assertTrue(llm.unlock(kc, mockTx1)); + //Now locking should succeed + Assert.assertTrue(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS))); + } +}