Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9380C200BF0 for ; Fri, 30 Dec 2016 21:34:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 92153160B32; Fri, 30 Dec 2016 20:34:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 91D04160B19 for ; Fri, 30 Dec 2016 21:34:20 +0100 (CET) Received: (qmail 91298 invoked by uid 500); 30 Dec 2016 20:34:19 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 91289 invoked by uid 99); 30 Dec 2016 20:34:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Dec 2016 20:34:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9EFF6DFBE6; Fri, 30 Dec 2016 20:34:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbernste@apache.org To: commits@lucene.apache.org Message-Id: <6eca0293730f4e7a8097352b85e3c099@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: lucene-solr:branch_6x: SOLR-9684: Add schedule Streaming Expression Date: Fri, 30 Dec 2016 20:34:19 +0000 (UTC) archived-at: Fri, 30 Dec 2016 20:34:21 -0000 Repository: lucene-solr Updated Branches: refs/heads/branch_6x b2d54f645 -> be119d2aa SOLR-9684: Add schedule Streaming Expression Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/be119d2a Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/be119d2a Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/be119d2a Branch: refs/heads/branch_6x Commit: be119d2aa082e176c88dd72c674dbd406d5ec9a2 Parents: b2d54f6 Author: Joel Bernstein Authored: Fri Dec 30 14:34:00 2016 -0500 Committer: Joel Bernstein Committed: Fri Dec 30 15:34:13 2016 -0500 ---------------------------------------------------------------------- .../org/apache/solr/handler/StreamHandler.java | 1 + .../client/solrj/io/stream/SchedulerStream.java | 161 +++++++++++++++++++ .../solrj/io/stream/StreamExpressionTest.java | 149 +++++++++++++++++ 3 files changed, 311 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/be119d2a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 8c0d6ac..485f692 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -141,6 +141,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("fetch", FetchStream.class) .withFunctionName("executor", ExecutorStream.class) .withFunctionName("null", NullStream.class) + .withFunctionName("schedule", SchedulerStream.class) // metrics .withFunctionName("min", MinMetric.class) .withFunctionName("max", MaxMetric.class) http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/be119d2a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SchedulerStream.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SchedulerStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SchedulerStream.java new file mode 100644 index 0000000..f8506b9 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SchedulerStream.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The scheduler wraps two topics that represent high priority and low priority task queues. + * Each time the scheduler is called it will check to see if there are any high priority tasks in the queue. If there + * are high priority tasks, then the high priority queue will be read until it returns the EOF Tuple. + * + * If there are no tasks in the high priority queue, then the lower priority task queue will be opened and read until the EOF Tuple is + * returned. + * + * The scheduler is designed to be wrapped by the executor function and a daemon function can be used to call the executor iteratively. + **/ + +public class SchedulerStream extends TupleStream implements Expressible { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private PushBackStream highPriorityTasks; + private PushBackStream tasks; + private TupleStream currentStream; + + public SchedulerStream(StreamExpression expression, StreamFactory factory) throws IOException { + // grab all parameters out + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + + + if(2 != streamExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); + } + + TupleStream stream1 = factory.constructStream(streamExpressions.get(0)); + TupleStream stream2 = factory.constructStream(streamExpressions.get(1)); + + if(!(stream1 instanceof TopicStream) || !(stream2 instanceof TopicStream)) { + throw new IOException("The scheduler expects both stream parameters to be topics."); + } + + init(new PushBackStream(stream1), new PushBackStream(stream2)); + } + + private void init(PushBackStream stream1, PushBackStream stream2) throws IOException{ + this.highPriorityTasks = stream1; + this.tasks = stream2; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // stream + if(includeStreams) { + if (highPriorityTasks instanceof Expressible) { + expression.addParameter(((Expressible) highPriorityTasks).toExpression(factory)); + } else { + throw new IOException("The SchedulerStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + if (tasks instanceof Expressible) { + expression.addParameter(((Expressible) tasks).toExpression(factory)); + } else { + throw new IOException("The SchedulerStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + } + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + highPriorityTasks.toExplanation(factory), tasks.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()); + } + + public void setStreamContext(StreamContext streamContext) { + this.highPriorityTasks.setStreamContext(streamContext); + tasks.setStreamContext(streamContext); + } + + public List children() { + List l = new ArrayList(); + l.add(highPriorityTasks); + l.add(tasks); + return l; + } + + public void open() throws IOException { + highPriorityTasks.open(); + Tuple tuple = highPriorityTasks.read(); + if(tuple.EOF) { + highPriorityTasks.close(); + tasks.open(); + currentStream = tasks; + } else { + highPriorityTasks.pushBack(tuple); + currentStream = highPriorityTasks; + } + } + + public void close() throws IOException { + currentStream.close(); + } + + public Tuple read() throws IOException { + return currentStream.read(); + } + + public StreamComparator getStreamSort(){ + return null; + } + + public int getCost() { + return 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/be119d2a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 0c9d5b3..3bfe129 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -2816,6 +2816,155 @@ public class StreamExpressionTest extends SolrCloudTestCase { } @Test + public void testSchedulerStream() throws Exception { + Assume.assumeTrue(!useAlias); + + new UpdateRequest() + .add(id, "0", "a_s", "hello1", "a_i", "0", "a_f", "1") + .add(id, "2", "a_s", "hello1", "a_i", "2", "a_f", "2") + .add(id, "3", "a_s", "hello1", "a_i", "3", "a_f", "3") + .add(id, "4", "a_s", "hello1", "a_i", "4", "a_f", "4") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "5") + .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6") + .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7") + .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8") + .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9") + .add(id, "9", "a_s", "hello1", "a_i", "14", "a_f", "10") + .commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withFunctionName("topic", TopicStream.class) + .withFunctionName("schedule", SchedulerStream.class); + + StreamExpression expression; + TupleStream stream; + List tuples; + + SolrClientCache cache = new SolrClientCache(); + + try { + FieldComparator comp = new FieldComparator("a_i", ComparatorOrder.ASCENDING); + + expression = StreamExpressionParser.parse("schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," + + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))"); + stream = factory.constructStream(expression); + StreamContext context = new StreamContext(); + context.setSolrClientCache(cache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + Collections.sort(tuples, comp); + //The tuples from the first topic (high priority) should be returned. + + assertEquals(tuples.size(), 4); + assertOrder(tuples, 5, 6, 7, 8); + + expression = StreamExpressionParser.parse("schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," + + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))"); + stream = factory.constructStream(expression); + context = new StreamContext(); + context.setSolrClientCache(cache); + stream.setStreamContext(context); + tuples = getTuples(stream); + Collections.sort(tuples, comp); + + //The Tuples from the second topic (Low priority) should be returned. + assertEquals(tuples.size(), 6); + assertOrder(tuples, 0, 1, 2, 3, 4, 9); + + expression = StreamExpressionParser.parse("schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," + + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))"); + stream = factory.constructStream(expression); + context = new StreamContext(); + context.setSolrClientCache(cache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + //Both queus are empty. + assertEquals(tuples.size(), 0); + + } finally { + cache.close(); + } + } + + @Test + public void testParallelSchedulerStream() throws Exception { + Assume.assumeTrue(!useAlias); + + new UpdateRequest() + .add(id, "0", "a_s", "hello1", "a_i", "0", "a_f", "1") + .add(id, "2", "a_s", "hello1", "a_i", "2", "a_f", "2") + .add(id, "3", "a_s", "hello1", "a_i", "3", "a_f", "3") + .add(id, "4", "a_s", "hello1", "a_i", "4", "a_f", "4") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "5") + .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6") + .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7") + .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8") + .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9") + .add(id, "9", "a_s", "hello1", "a_i", "14", "a_f", "10") + .commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withFunctionName("topic", TopicStream.class) + .withFunctionName("parallel", ParallelStream.class) + .withFunctionName("schedule", SchedulerStream.class); + + StreamExpression expression; + TupleStream stream; + List tuples; + + SolrClientCache cache = new SolrClientCache(); + + try { + FieldComparator comp = new FieldComparator("a_i", ComparatorOrder.ASCENDING); + + expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," + + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))"); + stream = factory.constructStream(expression); + StreamContext context = new StreamContext(); + context.setSolrClientCache(cache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + Collections.sort(tuples, comp); + //The tuples from the first topic (high priority) should be returned. + + assertEquals(tuples.size(), 4); + assertOrder(tuples, 5, 6, 7, 8); + + expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," + + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))"); + stream = factory.constructStream(expression); + context = new StreamContext(); + context.setSolrClientCache(cache); + stream.setStreamContext(context); + tuples = getTuples(stream); + Collections.sort(tuples, comp); + + //The Tuples from the second topic (Low priority) should be returned. + assertEquals(tuples.size(), 6); + assertOrder(tuples, 0, 1, 2, 3, 4, 9); + + expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," + + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))"); + stream = factory.constructStream(expression); + context = new StreamContext(); + context.setSolrClientCache(cache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + //Both queus are empty. + assertEquals(tuples.size(), 0); + + } finally { + cache.close(); + } + } + + @Test public void testParallelTopicStream() throws Exception { Assume.assumeTrue(!useAlias);