lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [38/50] lucene-solr:jira/solr-11458-2: SOLR-11542: Rename TimePartitionedUpdateProcessor to TimeRoutedAliasUpdateProcessor
Date Mon, 04 Dec 2017 17:49:22 GMT
SOLR-11542: Rename TimePartitionedUpdateProcessor to TimeRoutedAliasUpdateProcessor


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7877f5a5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7877f5a5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7877f5a5

Branch: refs/heads/jira/solr-11458-2
Commit: 7877f5a511a60e44f2dabd45ac1d6f84626b1161
Parents: 9c0ca9b
Author: David Smiley <dsmiley@apache.org>
Authored: Thu Nov 30 23:25:14 2017 -0500
Committer: David Smiley <dsmiley@apache.org>
Committed: Thu Nov 30 23:25:14 2017 -0500

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +-
 .../DistributedUpdateProcessorFactory.java      |   2 +-
 .../TimePartitionedUpdateProcessor.java         | 294 -------------------
 .../TimeRoutedAliasUpdateProcessor.java         | 294 +++++++++++++++++++
 .../TimePartitionedUpdateProcessorTest.java     | 275 -----------------
 .../TimeRoutedAliasUpdateProcessorTest.java     | 275 +++++++++++++++++
 6 files changed, 572 insertions(+), 572 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 737ef60..849a49b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -91,11 +91,11 @@ New Features
 * SOLR-11487: Collection Aliases may now have metadata (currently an internal feature).
   (Gus Heck, David Smiley)
 
-* SOLR-11542: New TimePartitionedUpdateProcessor URP that routes documents to another collection
+* SOLR-11542: New TimeRoutedAliasUpdateProcessor URP that routes documents to another collection
   in the same Alias defined set based on a time field (currently an internal feature).
   (David Smiley)
 
-* SOLR-9743: A new  UTILIZENODE command (noble)
+* SOLR-9743: A new UTILIZENODE command (noble)
 
 * SOLR-11202: Implement a set-property command for AutoScaling API. (ab, shalin)
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index c706e0c..1930c08 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -50,7 +50,7 @@ public class DistributedUpdateProcessorFactory
   public UpdateRequestProcessor getInstance(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
     // note: will sometimes return DURP (no overhead) instead of wrapping
-    return TimePartitionedUpdateProcessor.wrap(req, rsp,
+    return TimeRoutedAliasUpdateProcessor.wrap(req, rsp,
         new DistributedUpdateProcessor(req, rsp, next));
   }
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java
deleted file mode 100644
index e485a3d..0000000
--- a/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.update.processor;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.SolrCmdDistributor;
-import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-
-/**
- * Distributes update requests to rolling series of collections partitioned by a timestamp field.
- *
- * Depends on this core having a special core property that points to the alias name that this collection is a part of.
- * And further requires certain metadata on the Alias.
- *
- * @since 7.2.0
- */
-public class TimePartitionedUpdateProcessor extends UpdateRequestProcessor {
-  //TODO do we make this more generic to others who want to partition collections using something else?
-
-  // TODO auto add new collection partitions when cross a timestamp boundary.  That needs to be coordinated to avoid
-  //   race conditions, remembering that even the lead collection might have multiple instances of this URP
-  //   (multiple shards or perhaps just multiple streams thus instances of this URP)
-
-  public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
-  public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop
-  public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata
-
-  // This format must be compatible with collection name limitations
-  private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
-      .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
-      .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
-      .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
-      .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
-      .toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC);
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final String thisCollection;
-  private final String aliasName;
-  private final String routeField;
-
-  private final SolrCmdDistributor cmdDistrib;
-  private final ZkController zkController;
-  private final SolrParams outParamsToLeader;
-
-  private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection.  Sorted descending
-  private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
-
-  public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
-    final String timePartitionAliasName = req.getCore().getCoreDescriptor()
-        .getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null);
-    final DistribPhase shardDistribPhase =
-        DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
-    final DistribPhase aliasDistribPhase =
-        DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
-    if (timePartitionAliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
-      // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
-      //    TODO this may eventually not be true but at the moment it is
-      // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
-      return next;
-    } else {
-      return new TimePartitionedUpdateProcessor(req, rsp, next, timePartitionAliasName, aliasDistribPhase);
-    }
-  }
-
-  protected TimePartitionedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next,
-                                           String aliasName,
-                                           DistribPhase aliasDistribPhase) {
-    super(next);
-    assert aliasDistribPhase == DistribPhase.NONE;
-    final SolrCore core = req.getCore();
-    this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    this.aliasName = aliasName;
-    CoreContainer cc = core.getCoreContainer();
-    zkController = cc.getZkController();
-    cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
-
-    final Map<String, String> aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName);
-    if (aliasMetadata == null) {
-      throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
-    }
-    routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
-
-    ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
-    // Don't distribute these params; they will be distributed from the local processCommit separately.
-    //   (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
-    outParams.remove(UpdateParams.OPTIMIZE);
-    outParams.remove(UpdateParams.COMMIT);
-    outParams.remove(UpdateParams.SOFT_COMMIT);
-    outParams.remove(UpdateParams.PREPARE_COMMIT);
-    outParams.remove(UpdateParams.ROLLBACK);
-    // Add these...
-    //  Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
-    outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
-    //  Signal this is a distributed search from this URP (see #wrap())
-    outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
-    outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
-    outParamsToLeader = outParams;
-  }
-
-  @Override
-  public void processAdd(AddUpdateCommand cmd) throws IOException {
-    final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField);
-    final String targetCollection = findTargetCollectionGivenRouteKey(routeValue);
-    if (targetCollection == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue);
-    }
-    if (thisCollection.equals(targetCollection)) {
-      // pass on through; we've reached the right collection
-      super.processAdd(cmd);
-    } else {
-      // send to the right collection
-      SolrCmdDistributor.Node targetLeaderNode = lookupShardLeaderOfCollection(targetCollection);
-      cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
-    }
-  }
-
-  protected String findTargetCollectionGivenRouteKey(Object routeKey) {
-    final Instant docTimestamp;
-    if (routeKey instanceof Instant) {
-      docTimestamp = (Instant) routeKey;
-    } else if (routeKey instanceof Date) {
-      docTimestamp = ((Date)routeKey).toInstant();
-    } else if (routeKey instanceof CharSequence) {
-      docTimestamp = Instant.parse((CharSequence)routeKey);
-    } else {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
-    }
-    final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
-    if (this.parsedCollectionsAliases != aliases) {
-      if (this.parsedCollectionsAliases != null) {
-        log.info("Observing possibly updated alias {}", aliasName);
-      }
-      this.parsedCollectionsDesc = doParseCollections(aliases);
-      this.parsedCollectionsAliases = aliases;
-    }
-    // iterates in reverse chronological order
-    //    We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
-    for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
-      Instant colStartTime = entry.getKey();
-      if (!docTimestamp.isBefore(colStartTime)) {  // i.e. docTimeStamp is >= the colStartTime
-        return entry.getValue(); //found it
-      }
-    }
-    return null;
-  }
-
-  /** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */
-  private List<Map.Entry<Instant,String>> doParseCollections(Aliases aliases) {
-    final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
-    if (collections == null) {
-      throw newAliasMustExistException();
-    }
-    // note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later
-    List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
-    for (String collection : collections) {
-      Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
-      result.add(new AbstractMap.SimpleImmutableEntry<>(colStartTime, collection));
-    }
-    result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key
-    return result;
-  }
-
-  private SolrException newAliasMustExistException() {
-    throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-        "Collection " + thisCollection + " created for use with alias " + aliasName + " which doesn't exist anymore." +
-            " You cannot write to this unless the alias exists.");
-  }
-
-  static Instant parseInstantFromCollectionName(String aliasName, String collection) {
-    final String dateTimePart = collection.substring(aliasName.length() + 1);
-    return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
-  }
-
-  @Override
-  public void processDelete(DeleteUpdateCommand cmd) throws IOException {
-    final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
-    cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
-  }
-
-  @Override
-  public void processCommit(CommitUpdateCommand cmd) throws IOException {
-    final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
-    cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
-    cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly?  It doesn't.
-  }
-
-// Not supported by SolrCmdDistributor and is sketchy any way
-//  @Override
-//  public void processRollback(RollbackUpdateCommand cmd) throws IOException {
-//  }
-
-  @Override
-  public void finish() throws IOException {
-    try {
-      cmdDistrib.finish();
-      final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
-      if (!errors.isEmpty()) {
-        throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
-      }
-    } finally {
-      super.finish();
-    }
-  }
-
-  @Override
-  protected void doClose() {
-    try {
-      cmdDistrib.close();
-    } finally {
-      super.doClose();
-    }
-  }
-
-  private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
-    final Aliases aliases = zkController.getZkStateReader().getAliases();
-    List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
-    if (collections == null) {
-      throw newAliasMustExistException();
-    }
-    return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
-  }
-
-  private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
-    //TODO consider router to get the right slice.  Refactor common code in CloudSolrClient & DistributedUrp
-    final Collection<Slice> activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlices();
-    if (activeSlices.isEmpty()) {
-      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
-    }
-    final Slice slice = activeSlices.iterator().next();
-    //TODO when should we do StdNode vs RetryNode?
-    final Replica leader = slice.getLeader();
-    if (leader == null) {
-      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-          "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
-    }
-    return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
-        collection, null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
new file mode 100644
index 0000000..9148912
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * Distributes update requests to rolling series of collections partitioned by a timestamp field.
+ *
+ * Depends on this core having a special core property that points to the alias name that this collection is a part of.
+ * And further requires certain metadata on the Alias.
+ *
+ * @since 7.2.0
+ */
+public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
+  //TODO do we make this more generic to others who want to partition collections using something else?
+
+  // TODO auto add new collection partitions when cross a timestamp boundary.  That needs to be coordinated to avoid
+  //   race conditions, remembering that even the lead collection might have multiple instances of this URP
+  //   (multiple shards or perhaps just multiple streams thus instances of this URP)
+
+  public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
+  public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop
+  public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata
+
+  // This format must be compatible with collection name limitations
+  private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
+      .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
+      .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+      .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+      .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+      .toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC);
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final String thisCollection;
+  private final String aliasName;
+  private final String routeField;
+
+  private final SolrCmdDistributor cmdDistrib;
+  private final ZkController zkController;
+  private final SolrParams outParamsToLeader;
+
+  private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection.  Sorted descending
+  private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
+
+  public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+    final String timePartitionAliasName = req.getCore().getCoreDescriptor()
+        .getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null);
+    final DistribPhase shardDistribPhase =
+        DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+    final DistribPhase aliasDistribPhase =
+        DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
+    if (timePartitionAliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
+      // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
+      //    TODO this may eventually not be true but at the moment it is
+      // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
+      return next;
+    } else {
+      return new TimeRoutedAliasUpdateProcessor(req, rsp, next, timePartitionAliasName, aliasDistribPhase);
+    }
+  }
+
+  protected TimeRoutedAliasUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next,
+                                           String aliasName,
+                                           DistribPhase aliasDistribPhase) {
+    super(next);
+    assert aliasDistribPhase == DistribPhase.NONE;
+    final SolrCore core = req.getCore();
+    this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    this.aliasName = aliasName;
+    CoreContainer cc = core.getCoreContainer();
+    zkController = cc.getZkController();
+    cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
+
+    final Map<String, String> aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName);
+    if (aliasMetadata == null) {
+      throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
+    }
+    routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
+
+    ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
+    // Don't distribute these params; they will be distributed from the local processCommit separately.
+    //   (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
+    outParams.remove(UpdateParams.OPTIMIZE);
+    outParams.remove(UpdateParams.COMMIT);
+    outParams.remove(UpdateParams.SOFT_COMMIT);
+    outParams.remove(UpdateParams.PREPARE_COMMIT);
+    outParams.remove(UpdateParams.ROLLBACK);
+    // Add these...
+    //  Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
+    outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
+    //  Signal this is a distributed search from this URP (see #wrap())
+    outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+    outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
+    outParamsToLeader = outParams;
+  }
+
+  @Override
+  public void processAdd(AddUpdateCommand cmd) throws IOException {
+    final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField);
+    final String targetCollection = findTargetCollectionGivenRouteKey(routeValue);
+    if (targetCollection == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue);
+    }
+    if (thisCollection.equals(targetCollection)) {
+      // pass on through; we've reached the right collection
+      super.processAdd(cmd);
+    } else {
+      // send to the right collection
+      SolrCmdDistributor.Node targetLeaderNode = lookupShardLeaderOfCollection(targetCollection);
+      cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
+    }
+  }
+
+  protected String findTargetCollectionGivenRouteKey(Object routeKey) {
+    final Instant docTimestamp;
+    if (routeKey instanceof Instant) {
+      docTimestamp = (Instant) routeKey;
+    } else if (routeKey instanceof Date) {
+      docTimestamp = ((Date)routeKey).toInstant();
+    } else if (routeKey instanceof CharSequence) {
+      docTimestamp = Instant.parse((CharSequence)routeKey);
+    } else {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
+    }
+    final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
+    if (this.parsedCollectionsAliases != aliases) {
+      if (this.parsedCollectionsAliases != null) {
+        log.info("Observing possibly updated alias {}", aliasName);
+      }
+      this.parsedCollectionsDesc = doParseCollections(aliases);
+      this.parsedCollectionsAliases = aliases;
+    }
+    // iterates in reverse chronological order
+    //    We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
+    for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
+      Instant colStartTime = entry.getKey();
+      if (!docTimestamp.isBefore(colStartTime)) {  // i.e. docTimeStamp is >= the colStartTime
+        return entry.getValue(); //found it
+      }
+    }
+    return null;
+  }
+
+  /** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */
+  private List<Map.Entry<Instant,String>> doParseCollections(Aliases aliases) {
+    final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
+    if (collections == null) {
+      throw newAliasMustExistException();
+    }
+    // note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later
+    List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
+    for (String collection : collections) {
+      Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
+      result.add(new AbstractMap.SimpleImmutableEntry<>(colStartTime, collection));
+    }
+    result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key
+    return result;
+  }
+
+  private SolrException newAliasMustExistException() {
+    throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+        "Collection " + thisCollection + " created for use with alias " + aliasName + " which doesn't exist anymore." +
+            " You cannot write to this unless the alias exists.");
+  }
+
+  static Instant parseInstantFromCollectionName(String aliasName, String collection) {
+    final String dateTimePart = collection.substring(aliasName.length() + 1);
+    return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
+  }
+
+  @Override
+  public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+    final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
+    cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
+  }
+
+  @Override
+  public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
+    cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
+    cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly?  It doesn't.
+  }
+
+// Not supported by SolrCmdDistributor and is sketchy any way
+//  @Override
+//  public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+//  }
+
+  @Override
+  public void finish() throws IOException {
+    try {
+      cmdDistrib.finish();
+      final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
+      if (!errors.isEmpty()) {
+        throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
+      }
+    } finally {
+      super.finish();
+    }
+  }
+
+  @Override
+  protected void doClose() {
+    try {
+      cmdDistrib.close();
+    } finally {
+      super.doClose();
+    }
+  }
+
+  private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
+    final Aliases aliases = zkController.getZkStateReader().getAliases();
+    List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
+    if (collections == null) {
+      throw newAliasMustExistException();
+    }
+    return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
+  }
+
+  private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
+    //TODO consider router to get the right slice.  Refactor common code in CloudSolrClient & DistributedUrp
+    final Collection<Slice> activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlices();
+    if (activeSlices.isEmpty()) {
+      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
+    }
+    final Slice slice = activeSlices.iterator().next();
+    //TODO when should we do StdNode vs RetryNode?
+    final Replica leader = slice.getLeader();
+    if (leader == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+          "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
+    }
+    return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
+        collection, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java
deleted file mode 100644
index eca6fbb..0000000
--- a/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.update.processor;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.function.UnaryOperator;
-
-import org.apache.lucene.util.IOUtils;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
-import org.apache.solr.client.solrj.request.V2Request;
-import org.apache.solr.client.solrj.response.FieldStatsInfo;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.response.SolrQueryResponse;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TimePartitionedUpdateProcessorTest extends SolrCloudTestCase {
-
-  static final String configName = "timeConfig";
-  static final String alias = "myalias";
-  static final String timeField = "timestamp";
-  static final String intField = "integer_i";
-
-  static SolrClient solrClient;
-
-  private int lastDocId = 0;
-  private int numDocsDeletedOrFailed = 0;
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    configureCluster(2).configure();
-    solrClient = getCloudSolrClient(cluster);
-  }
-
-  @AfterClass
-  public static void finish() throws Exception {
-    IOUtils.close(solrClient);
-  }
-
-  @Test
-  public void test() throws Exception {
-    // First create a config using REST API.  To do this, we create a collection with the name of the eventual config.
-    // We configure it, and ultimately delete it the collection, leaving a config with the same name behind.
-    // Then when we create the "real" collections referencing this config.
-    CollectionAdminRequest.createCollection(configName, 1, 1).process(solrClient);
-    // manipulate the config...
-    checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
-        .withMethod(SolrRequest.METHOD.POST)
-        .withPayload("{" +
-            "  'set-user-property' : {'timePartitionAliasName':'" + alias + "'}," + // no data driven
-            "  'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
-            "  'add-updateprocessor' : {" +
-            "    'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
-            "  }," +
-            "  'add-updateprocessor' : {" + // for testing
-            "    'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
-            "    'fieldName':'" + intField + "'" +
-            "  }," +
-            "}").build()));
-    checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
-        .withMethod(SolrRequest.METHOD.POST)
-        .withPayload("{" +
-            "  'set' : {" +
-            "    '_UPDATE' : {'processor':'inc,tolerant'}" +
-            "  }" +
-            "}").build()));
-    CollectionAdminRequest.deleteCollection(configName).process(solrClient);
-
-    // start with one collection and an alias for it
-    final String col23rd = alias + "_2017-10-23";
-    CollectionAdminRequest.createCollection(col23rd, configName, 1, 1)
-        .withProperty(TimePartitionedUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias)
-        .process(solrClient);
-
-    assertEquals("We only expect 2 configSets",
-        Arrays.asList("_default", configName), new ConfigSetAdminRequest.List().process(solrClient).getConfigSets());
-
-    CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient);
-    //TODO use SOLR-11617 client API to set alias metadata
-    final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
-    UnaryOperator<Aliases> op = a -> a.cloneWithCollectionAliasMetadata(alias, TimePartitionedUpdateProcessor.ROUTER_FIELD_METADATA, timeField);
-    zkStateReader.aliasesHolder.applyModificationAndExportToZk(op);
-
-
-    // now we index a document
-    solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z")));
-    solrClient.commit(alias);
-    //assertDocRoutedToCol(lastDocId, col23rd);
-    assertInvariants();
-
-    // a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there)
-    try {
-      final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z")));
-      final Object errors = resp.getResponseHeader().get("errors");
-      assertTrue(errors != null && errors.toString().contains("couldn't be routed"));
-    } catch (SolrException e) {
-      assertTrue(e.getMessage().contains("couldn't be routed"));
-    }
-    numDocsDeletedOrFailed++;
-
-    // add another collection, add to alias  (soonest comes first)
-    final String col24th = alias + "_2017-10-24";
-    CollectionAdminRequest.createCollection(col24th, configName,  2, 2) // more shards and replicas now
-        .setMaxShardsPerNode(2)
-        .withProperty("timePartitionAliasName", alias)
-        .process(solrClient);
-    CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient);
-
-    // index 3 documents in a random fashion
-    addDocsAndCommit(
-        newDoc(Instant.parse("2017-10-23T00:00:00Z")),
-        newDoc(Instant.parse("2017-10-24T01:00:00Z")),
-        newDoc(Instant.parse("2017-10-24T02:00:00Z"))
-    );
-    assertInvariants();
-
-    // assert that the IncrementURP has updated all '0' to '1'
-    final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
-    assertEquals(checkIncResults.toString(), 0, checkIncResults.getNumFound());
-
-    //delete a random document id; ensure we don't find it
-    int idToDelete = 1 + random().nextInt(lastDocId);
-    if (idToDelete == 2) { // #2 didn't make it
-      idToDelete++;
-    }
-    solrClient.deleteById(alias, Integer.toString(idToDelete));
-    solrClient.commit(alias);
-    numDocsDeletedOrFailed++;
-    assertInvariants();
-  }
-
-  private void checkNoError(NamedList<Object> response) {
-    Object errors = response.get("errorMessages");
-    assertNull("" + errors, errors);
-  }
-
-  /** Adds these documents and commits, returning when they are committed.
-   * We randomly go about this in different ways. */
-  private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception {
-    // we assume these are not old docs!
-
-    // this is a list of the collections & the alias name.  Use to pick randomly where to send.
-    //   (it doesn't matter where we send docs since the alias is honored at the URP level)
-    List<String> collections = new ArrayList<>();
-    collections.add(alias);
-    collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
-
-    int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
-    int numDocsBefore = queryNumDocs();
-    if (random().nextBoolean()) {
-      // send in separate requests
-      for (SolrInputDocument solrInputDocument : solrInputDocuments) {
-        String col = collections.get(random().nextInt(collections.size()));
-        solrClient.add(col, solrInputDocument, commitWithin);
-      }
-    } else {
-      // send in a batch.
-      String col = collections.get(random().nextInt(collections.size()));
-      solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin);
-    }
-    String col = collections.get(random().nextInt(collections.size()));
-    if (commitWithin == -1) {
-      solrClient.commit(col);
-    } else {
-      // check that it all got committed eventually
-      int numDocs = queryNumDocs();
-      if (numDocs == numDocsBefore + solrInputDocuments.length) {
-        System.err.println("Docs committed sooner than expected.  Bug or slow test env?");
-        return;
-      }
-      // wait until it's committed, plus some play time for commit to become visible
-      Thread.sleep(commitWithin + 200);
-      numDocs = queryNumDocs();
-      assertEquals("not committed.  Bug or a slow test?",
-          numDocsBefore + solrInputDocuments.length, numDocs);
-    }
-  }
-
-  private int queryNumDocs() throws SolrServerException, IOException {
-    return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound();
-  }
-
-  private void assertInvariants() throws IOException, SolrServerException {
-    final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
-
-    final List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
-    assert !cols.isEmpty();
-
-    int totalNumFound = 0;
-    Instant colEndInstant = null; // exclusive end
-    for (String col : cols) {
-      final Instant colStartInstant = TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, col);
-      //TODO do this in parallel threads
-      final QueryResponse colStatsResp = solrClient.query(col, params(
-          "q", "*:*",
-          "rows", "0",
-          "stats", "true",
-          "stats.field", timeField));
-      long numFound = colStatsResp.getResults().getNumFound();
-      if (numFound > 0) {
-        totalNumFound += numFound;
-        final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(timeField);
-        assertTrue(colStartInstant.toEpochMilli() <= ((Date)timestampStats.getMin()).getTime());
-        if (colEndInstant != null) {
-          assertTrue(colEndInstant.toEpochMilli() > ((Date)timestampStats.getMax()).getTime());
-        }
-      }
-
-      colEndInstant = colStartInstant; // next older segment will max out at our current start time
-    }
-    assertEquals(expectNumFound, totalNumFound);
-  }
-
-  private SolrInputDocument newDoc(Instant timestamp) {
-    return sdoc("id", Integer.toString(++lastDocId),
-        timeField, timestamp.toString(),
-        intField, "0"); // always 0
-  }
-
-  @Test
-  public void testParse() {
-    assertEquals(Instant.parse("2017-10-02T03:04:05Z"),
-      TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04_05"));
-    assertEquals(Instant.parse("2017-10-02T03:04:00Z"),
-      TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04"));
-    assertEquals(Instant.parse("2017-10-02T03:00:00Z"),
-      TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03"));
-    assertEquals(Instant.parse("2017-10-02T00:00:00Z"),
-      TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
-  }
-
-  public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
-
-    @Override
-    public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
-      return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
-          (src) -> Integer.valueOf(src.toString()) + 1);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
new file mode 100644
index 0000000..f7f200f
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.function.UnaryOperator;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.FieldStatsInfo;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
+
+  static final String configName = "timeConfig";
+  static final String alias = "myalias";
+  static final String timeField = "timestamp";
+  static final String intField = "integer_i";
+
+  static SolrClient solrClient;
+
+  private int lastDocId = 0;
+  private int numDocsDeletedOrFailed = 0;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(2).configure();
+    solrClient = getCloudSolrClient(cluster);
+  }
+
+  @AfterClass
+  public static void finish() throws Exception {
+    IOUtils.close(solrClient);
+  }
+
+  @Test
+  public void test() throws Exception {
+    // First create a config using REST API.  To do this, we create a collection with the name of the eventual config.
+    // We configure it, and ultimately delete it the collection, leaving a config with the same name behind.
+    // Then when we create the "real" collections referencing this config.
+    CollectionAdminRequest.createCollection(configName, 1, 1).process(solrClient);
+    // manipulate the config...
+    checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
+        .withMethod(SolrRequest.METHOD.POST)
+        .withPayload("{" +
+            "  'set-user-property' : {'timePartitionAliasName':'" + alias + "'}," + // no data driven
+            "  'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
+            "  'add-updateprocessor' : {" +
+            "    'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
+            "  }," +
+            "  'add-updateprocessor' : {" + // for testing
+            "    'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
+            "    'fieldName':'" + intField + "'" +
+            "  }," +
+            "}").build()));
+    checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
+        .withMethod(SolrRequest.METHOD.POST)
+        .withPayload("{" +
+            "  'set' : {" +
+            "    '_UPDATE' : {'processor':'inc,tolerant'}" +
+            "  }" +
+            "}").build()));
+    CollectionAdminRequest.deleteCollection(configName).process(solrClient);
+
+    // start with one collection and an alias for it
+    final String col23rd = alias + "_2017-10-23";
+    CollectionAdminRequest.createCollection(col23rd, configName, 1, 1)
+        .withProperty(TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias)
+        .process(solrClient);
+
+    assertEquals("We only expect 2 configSets",
+        Arrays.asList("_default", configName), new ConfigSetAdminRequest.List().process(solrClient).getConfigSets());
+
+    CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient);
+    //TODO use SOLR-11617 client API to set alias metadata
+    final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+    UnaryOperator<Aliases> op = a -> a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField);
+    zkStateReader.aliasesHolder.applyModificationAndExportToZk(op);
+
+
+    // now we index a document
+    solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z")));
+    solrClient.commit(alias);
+    //assertDocRoutedToCol(lastDocId, col23rd);
+    assertInvariants();
+
+    // a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there)
+    try {
+      final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z")));
+      final Object errors = resp.getResponseHeader().get("errors");
+      assertTrue(errors != null && errors.toString().contains("couldn't be routed"));
+    } catch (SolrException e) {
+      assertTrue(e.getMessage().contains("couldn't be routed"));
+    }
+    numDocsDeletedOrFailed++;
+
+    // add another collection, add to alias  (soonest comes first)
+    final String col24th = alias + "_2017-10-24";
+    CollectionAdminRequest.createCollection(col24th, configName,  2, 2) // more shards and replicas now
+        .setMaxShardsPerNode(2)
+        .withProperty("timePartitionAliasName", alias)
+        .process(solrClient);
+    CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient);
+
+    // index 3 documents in a random fashion
+    addDocsAndCommit(
+        newDoc(Instant.parse("2017-10-23T00:00:00Z")),
+        newDoc(Instant.parse("2017-10-24T01:00:00Z")),
+        newDoc(Instant.parse("2017-10-24T02:00:00Z"))
+    );
+    assertInvariants();
+
+    // assert that the IncrementURP has updated all '0' to '1'
+    final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
+    assertEquals(checkIncResults.toString(), 0, checkIncResults.getNumFound());
+
+    //delete a random document id; ensure we don't find it
+    int idToDelete = 1 + random().nextInt(lastDocId);
+    if (idToDelete == 2) { // #2 didn't make it
+      idToDelete++;
+    }
+    solrClient.deleteById(alias, Integer.toString(idToDelete));
+    solrClient.commit(alias);
+    numDocsDeletedOrFailed++;
+    assertInvariants();
+  }
+
+  private void checkNoError(NamedList<Object> response) {
+    Object errors = response.get("errorMessages");
+    assertNull("" + errors, errors);
+  }
+
+  /** Adds these documents and commits, returning when they are committed.
+   * We randomly go about this in different ways. */
+  private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception {
+    // we assume these are not old docs!
+
+    // this is a list of the collections & the alias name.  Use to pick randomly where to send.
+    //   (it doesn't matter where we send docs since the alias is honored at the URP level)
+    List<String> collections = new ArrayList<>();
+    collections.add(alias);
+    collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
+
+    int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
+    int numDocsBefore = queryNumDocs();
+    if (random().nextBoolean()) {
+      // send in separate requests
+      for (SolrInputDocument solrInputDocument : solrInputDocuments) {
+        String col = collections.get(random().nextInt(collections.size()));
+        solrClient.add(col, solrInputDocument, commitWithin);
+      }
+    } else {
+      // send in a batch.
+      String col = collections.get(random().nextInt(collections.size()));
+      solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin);
+    }
+    String col = collections.get(random().nextInt(collections.size()));
+    if (commitWithin == -1) {
+      solrClient.commit(col);
+    } else {
+      // check that it all got committed eventually
+      int numDocs = queryNumDocs();
+      if (numDocs == numDocsBefore + solrInputDocuments.length) {
+        System.err.println("Docs committed sooner than expected.  Bug or slow test env?");
+        return;
+      }
+      // wait until it's committed, plus some play time for commit to become visible
+      Thread.sleep(commitWithin + 200);
+      numDocs = queryNumDocs();
+      assertEquals("not committed.  Bug or a slow test?",
+          numDocsBefore + solrInputDocuments.length, numDocs);
+    }
+  }
+
+  private int queryNumDocs() throws SolrServerException, IOException {
+    return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound();
+  }
+
+  private void assertInvariants() throws IOException, SolrServerException {
+    final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
+
+    final List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+    assert !cols.isEmpty();
+
+    int totalNumFound = 0;
+    Instant colEndInstant = null; // exclusive end
+    for (String col : cols) {
+      final Instant colStartInstant = TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, col);
+      //TODO do this in parallel threads
+      final QueryResponse colStatsResp = solrClient.query(col, params(
+          "q", "*:*",
+          "rows", "0",
+          "stats", "true",
+          "stats.field", timeField));
+      long numFound = colStatsResp.getResults().getNumFound();
+      if (numFound > 0) {
+        totalNumFound += numFound;
+        final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(timeField);
+        assertTrue(colStartInstant.toEpochMilli() <= ((Date)timestampStats.getMin()).getTime());
+        if (colEndInstant != null) {
+          assertTrue(colEndInstant.toEpochMilli() > ((Date)timestampStats.getMax()).getTime());
+        }
+      }
+
+      colEndInstant = colStartInstant; // next older segment will max out at our current start time
+    }
+    assertEquals(expectNumFound, totalNumFound);
+  }
+
+  private SolrInputDocument newDoc(Instant timestamp) {
+    return sdoc("id", Integer.toString(++lastDocId),
+        timeField, timestamp.toString(),
+        intField, "0"); // always 0
+  }
+
+  @Test
+  public void testParse() {
+    assertEquals(Instant.parse("2017-10-02T03:04:05Z"),
+      TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04_05"));
+    assertEquals(Instant.parse("2017-10-02T03:04:00Z"),
+      TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04"));
+    assertEquals(Instant.parse("2017-10-02T03:00:00Z"),
+      TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03"));
+    assertEquals(Instant.parse("2017-10-02T00:00:00Z"),
+      TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
+  }
+
+  public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
+
+    @Override
+    public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+      return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
+          (src) -> Integer.valueOf(src.toString()) + 1);
+    }
+  }
+
+}


Mime
View raw message