lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbern...@apache.org
Subject [lucene-solr] 01/03: SOLR-13625: Add CsvStream, TsvStream Streaming Expressions and supporting Stream Evaluators
Date Tue, 30 Jul 2019 15:32:22 GMT
This is an automated email from the ASF dual-hosted git repository.

jbernste pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit eb280c4808f587817cb0038c108d7fd5f7ea862c
Author: Joel Bernstein <jbernste@apache.org>
AuthorDate: Thu Jul 25 10:57:30 2019 -0400

    SOLR-13625: Add CsvStream, TsvStream Streaming Expressions and supporting Stream Evaluators
---
 .../java/org/apache/solr/client/solrj/io/Lang.java |   5 +
 .../solr/client/solrj/io/eval/DateEvaluator.java   |  73 +++++++++
 .../solr/client/solrj/io/eval/DoubleEvaluator.java |  50 ++++++
 .../solr/client/solrj/io/eval/LongEvaluator.java   |  50 ++++++
 .../solr/client/solrj/io/stream/CsvStream.java     | 171 +++++++++++++++++++++
 .../solr/client/solrj/io/stream/TsvStream.java     |  84 ++++++++++
 .../org/apache/solr/client/solrj/io/TestLang.java  |   2 +-
 .../solrj/io/stream/StreamDecoratorTest.java       | 169 ++++++++++++++++++++
 8 files changed, 603 insertions(+), 1 deletion(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index d64e320..60ce437 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -292,6 +292,11 @@ public class Lang {
         .withFunctionName("isNull", IsNullEvaluator.class)
         .withFunctionName("matches", MatchesEvaluator.class)
         .withFunctionName("projectToBorder", ProjectToBorderEvaluator.class)
+        .withFunctionName("parseCSV", CsvStream.class)
+        .withFunctionName("parseTSV", TsvStream.class)
+        .withFunctionName("double", DoubleEvaluator.class)
+        .withFunctionName("long", LongEvaluator.class)
+        .withFunctionName("dateTime", DateEvaluator.class)
 
         // Boolean Stream Evaluators
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DateEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DateEvaluator.java
new file mode 100644
index 0000000..582e3b6
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DateEvaluator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.eval;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class DateEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
+  protected static final long serialVersionUID = 1L;
+
+  private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'",
Locale.US);
+  private SimpleDateFormat parseFormat;
+
+
+  static {
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
+
+  public DateEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
+    super(expression, factory);
+  }
+
+  @Override
+  public Object doWork(Object values[]) throws IOException {
+    String sdate = values[0].toString();
+    String template = values[1].toString();
+
+    if(sdate.startsWith("\"")) {
+      sdate =sdate.replace("\"", "");
+    }
+
+    if(template.startsWith("\"")) {
+      template =template.replace("\"", "");
+    }
+
+
+    if(parseFormat == null) {
+      String timeZone = "UTC";
+      if(values.length == 3) {
+        timeZone = values[2].toString();
+      }
+      parseFormat = new SimpleDateFormat(template, Locale.US);
+      parseFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+    }
+
+    try {
+      Date date = parseFormat.parse(sdate);
+      return dateFormat.format(date);
+    } catch(Exception e) {
+      throw new IOException(e);
+    }
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DoubleEvaluator.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DoubleEvaluator.java
new file mode 100644
index 0000000..7fce45f
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DoubleEvaluator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.eval;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class DoubleEvaluator extends RecursiveObjectEvaluator implements OneValueWorker {
+  protected static final long serialVersionUID = 1L;
+
+  public DoubleEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
+    super(expression, factory);
+
+    if(1 != containedEvaluators.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting
exactly 1 value but found %d",expression,containedEvaluators.size()));
+    }
+  }
+
+  @Override
+  public Object doWork(Object value){
+    if(null == value){
+      return null;
+    }
+    else if(value instanceof List){
+      return ((List<?>)value).stream().map(innerValue -> doWork(innerValue)).collect(Collectors.toList());
+    }
+    else{
+      return Double.valueOf(value.toString());
+    }
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/LongEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/LongEvaluator.java
new file mode 100644
index 0000000..4547d8c
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/LongEvaluator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.eval;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class LongEvaluator extends RecursiveObjectEvaluator implements OneValueWorker {
+  protected static final long serialVersionUID = 1L;
+
+  public LongEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
+    super(expression, factory);
+
+    if(1 != containedEvaluators.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting
exactly 1 value but found %d",expression,containedEvaluators.size()));
+    }
+  }
+
+  @Override
+  public Object doWork(Object value){
+    if(null == value){
+      return null;
+    }
+    else if(value instanceof List){
+      return ((List<?>)value).stream().map(innerValue -> doWork(innerValue)).collect(Collectors.toList());
+    }
+    else{
+      return Long.valueOf(value.toString());
+    }
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java
new file mode 100644
index 0000000..b8c479e
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class CsvStream extends TupleStream implements Expressible {
+
+  private static final long serialVersionUID = 1;
+  private String[] headers;
+  private String currentFile;
+  private int lineNumber;
+
+  protected TupleStream originalStream;
+
+  public CsvStream(StreamExpression expression,StreamFactory factory) throws IOException
{
+    // grab all parameters out
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression,
Expressible.class, TupleStream.class);
+
+    // validate expression contains only what we want.
+    if(expression.getParameters().size() != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands
found", expression));
+    }
+
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting
a single stream but found %d",expression, streamExpressions.size()));
+    }
+
+    init(factory.constructStream(streamExpressions.get(0)));
+  }
+
+  private void init(TupleStream stream) throws IOException{
+    this.originalStream = stream;
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException{
+    return toExpression(factory, true);
+  }
+
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws
IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    if(includeStreams){
+      // streams
+      if(originalStream instanceof Expressible){
+        expression.addParameter(((Expressible)originalStream).toExpression(factory));
+      }
+      else{
+        throw new IOException("This CsvStream contains a non-expressible TupleStream - it
cannot be converted to an expression");
+      }
+    }
+    else{
+      expression.addParameter("<stream>");
+    }
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    return new StreamExplanation(getStreamNodeId().toString())
+        .withChildren(new Explanation[] {
+            originalStream.toExplanation(factory)
+            // we're not including that this is wrapped with a ReducerStream stream because
that's just an implementation detail
+        })
+        .withFunctionName(factory.getFunctionName(this.getClass()))
+        .withImplementingClass(this.getClass().getName())
+        .withExpressionType(ExpressionType.STREAM_DECORATOR)
+        .withExpression(toExpression(factory, false).toString());
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.originalStream.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList<TupleStream>();
+    l.add(originalStream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    originalStream.open();
+  }
+
+  public void close() throws IOException {
+    originalStream.close();
+  }
+
+  public Tuple read() throws IOException {
+    Tuple tuple = originalStream.read();
+    ++lineNumber;
+    if(tuple.EOF) {
+      return tuple;
+    } else {
+      String file = tuple.getString("file");
+      String line = tuple.getString("line");
+      if (file.equals(currentFile)) {
+        String[] fields = split(line);
+        if(fields.length != headers.length) {
+          throw new IOException("Headers and lines must have the same number of fields [file:"+file+"
line number:"+lineNumber+"]");
+        }
+        Tuple out = new Tuple(new HashMap());
+        for(int i=0; i<headers.length; i++) {
+          if(fields[i] != null && fields[i].length() > 0) {
+            out.put(headers[i], fields[i]);
+          }
+        }
+        return out;
+      } else {
+        this.currentFile = file;
+        this.headers = split(line);
+        this.lineNumber = 1; //New file so reset the lineNumber
+        return read();
+      }
+    }
+  }
+
+  protected String[] split(String line) {
+    String[] fields = line.split(",(?=([^\"]|\"[^\"]*\")*$)",-1);
+    for(int i=0; i<fields.length; i++) {
+      String f = fields[i];
+      if(f.startsWith("\"") && f.endsWith("\"")) {
+        f = f.substring(1, f.length()-1);
+        fields[i] = f;
+      }
+    }
+
+    return fields;
+  }
+
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return originalStream.getStreamSort();
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TsvStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TsvStream.java
new file mode 100644
index 0000000..7b5d0d8
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TsvStream.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class TsvStream extends CsvStream implements Expressible {
+
+  private static final long serialVersionUID = 1;
+
+  public TsvStream(StreamExpression expression,StreamFactory factory) throws IOException
{
+    super(expression, factory);
+  }
+
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException{
+    return toExpression(factory, true);
+  }
+
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws
IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    if(includeStreams){
+      // streams
+      if(originalStream instanceof Expressible){
+        expression.addParameter(((Expressible)originalStream).toExpression(factory));
+      }
+      else{
+        throw new IOException("This TsvStream contains a non-expressible TupleStream - it
cannot be converted to an expression");
+      }
+    }
+    else{
+      expression.addParameter("<stream>");
+    }
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    return new StreamExplanation(getStreamNodeId().toString())
+        .withChildren(new Explanation[] {
+            originalStream.toExplanation(factory)
+            // we're not including that this is wrapped with a ReducerStream stream because
that's just an implementation detail
+        })
+        .withFunctionName(factory.getFunctionName(this.getClass()))
+        .withImplementingClass(this.getClass().getName())
+        .withExpressionType(ExpressionType.STREAM_DECORATOR)
+        .withExpression(toExpression(factory, false).toString());
+  }
+
+  protected String[] split(String line) {
+    String[] parts = line.split("\\t", -1);
+    for(String s : parts) {
+      System.out.println("part:"+s+":"+line.length()+":"+line+":");
+    }
+    return parts;
+  }
+
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 60ac6ec..f2e6a42 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -76,7 +76,7 @@ public class TestLang extends SolrTestCase {
       "getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
       "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim",
"export",
       "zplot", "natural", "repeat", "movingMAD", "hashRollup", "noop", "var", "stddev", "recNum",
"isNull",
-      "notNull", "matches", "projectToBorder"};
+      "notNull", "matches", "projectToBorder", "double", "long", "parseCSV", "parseTSV",
"dateTime"};
 
   @Test
   public void testLang() {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index 504b74b..65815e1 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -3151,6 +3151,175 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     }
   }
 
+
+  @Test
+  public void testParseCSV() throws Exception {
+    String expr = "parseCSV(list(tuple(file=\"file1\", line=\"a,b,c\"), " +
+        "                        tuple(file=\"file1\", line=\"1,2,3\")," +
+        "                        tuple(file=\"file1\", line=\"\\\"hello, world\\\",9000,20\"),"
+
+        "                        tuple(file=\"file2\", line=\"field_1,field_2,field_3\"),
"+
+        "                        tuple(file=\"file2\", line=\"8,9,\")))";
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", expr);
+    paramsLoc.set("qt", "/stream");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
+    TupleStream solrStream = new SolrStream(url, paramsLoc);
+
+    StreamContext context = new StreamContext();
+    solrStream.setStreamContext(context);
+    List<Tuple> tuples = getTuples(solrStream);
+    assertEquals(tuples.size(),  3);
+    assertEquals(tuples.get(0).getString("a"), "1");
+    assertEquals(tuples.get(0).getString("b"), "2");
+    assertEquals(tuples.get(0).getString("c"), "3");
+
+    assertEquals(tuples.get(1).getString("a"), "hello, world");
+    assertEquals(tuples.get(1).getString("b"), "9000");
+    assertEquals(tuples.get(1).getString("c"), "20");
+
+    assertEquals(tuples.get(2).getString("field_1"), "8");
+    assertEquals(tuples.get(2).getString("field_2"), "9");
+    assertNull(tuples.get(2).get("field_3"));
+  }
+
+
+  @Test
+  public void testParseTSV() throws Exception {
+    String expr = "parseTSV(list(tuple(file=\"file1\", line=\"a\tb\tc\"), " +
+        "                        tuple(file=\"file1\", line=\"1\t2\t3\")," +
+        "                        tuple(file=\"file1\", line=\"hello, world\t9000\t20\"),"
+
+        "                        tuple(file=\"file2\", line=\"field_1\tfield_2\tfield_3\"),
"+
+        "                        tuple(file=\"file2\", line=\"8\t\t9\")))";
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", expr);
+    paramsLoc.set("qt", "/stream");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
+    TupleStream solrStream = new SolrStream(url, paramsLoc);
+
+    StreamContext context = new StreamContext();
+    solrStream.setStreamContext(context);
+    List<Tuple> tuples = getTuples(solrStream);
+    assertEquals(tuples.size(),  3);
+    assertEquals(tuples.get(0).getString("a"), "1");
+    assertEquals(tuples.get(0).getString("b"), "2");
+    assertEquals(tuples.get(0).getString("c"), "3");
+
+    assertEquals(tuples.get(1).getString("a"), "hello, world");
+    assertEquals(tuples.get(1).getString("b"), "9000");
+    assertEquals(tuples.get(1).getString("c"), "20");
+
+    assertEquals(tuples.get(2).getString("field_1"), "8");
+    assertNull(tuples.get(2).get("field_2"));
+    assertEquals(tuples.get(2).getString("field_3"), "9");
+
+  }
+
+
+  @Test
+  public void testDateTime() throws Exception {
+    String expr = "select(list(tuple(a=20001011:10:11:01), tuple(a=20071011:14:30:20)), dateTime(a,
yyyyMMdd:kk:mm:ss) as date)";
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", expr);
+    paramsLoc.set("qt", "/stream");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
+    TupleStream solrStream = new SolrStream(url, paramsLoc);
+
+    StreamContext context = new StreamContext();
+    solrStream.setStreamContext(context);
+    List<Tuple> tuples = getTuples(solrStream);
+    String date = (String)tuples.get(0).get("date");
+    assertEquals(date, "2000-10-11T10:11:01Z");
+    date = (String)tuples.get(1).get("date");
+    assertEquals(date, "2007-10-11T14:30:20Z");
+  }
+
+  @Test
+  public void testDateTimeTZ() throws Exception {
+    String expr = "select(list(tuple(a=20001011), tuple(a=20071011)), dateTime(a, yyyyMMdd,
UTC) as date, dateTime(a, yyyyMMdd, EST) as date1, dateTime(a, yyyyMMdd) as date2)";
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", expr);
+    paramsLoc.set("qt", "/stream");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
+    TupleStream solrStream = new SolrStream(url, paramsLoc);
+
+    StreamContext context = new StreamContext();
+    solrStream.setStreamContext(context);
+    List<Tuple> tuples = getTuples(solrStream);
+    String date = (String)tuples.get(0).get("date");
+    String date1 = (String)tuples.get(0).get("date1");
+    String date2 = (String)tuples.get(0).get("date2");
+
+    assertEquals(date, "2000-10-11T00:00:00Z");
+    assertEquals(date1, "2000-10-11T05:00:00Z");
+    assertEquals(date2, "2000-10-11T00:00:00Z");
+
+
+    date = (String)tuples.get(1).get("date");
+    date1 = (String)tuples.get(1).get("date1");
+    date2 = (String)tuples.get(1).get("date2");
+
+    assertEquals(date, "2007-10-11T00:00:00Z");
+    assertEquals(date1, "2007-10-11T05:00:00Z");
+    assertEquals(date2, "2007-10-11T00:00:00Z");
+  }
+
+
+
+  @Test
+  public void testDoubleLong() throws Exception {
+    String expr = "select(tuple(d=\"1.1\", l=\"5000\"), double(d) as d, long(l) as l)";
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", expr);
+    paramsLoc.set("qt", "/stream");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
+    TupleStream solrStream = new SolrStream(url, paramsLoc);
+
+    StreamContext context = new StreamContext();
+    solrStream.setStreamContext(context);
+    List<Tuple> tuples = getTuples(solrStream);
+    assertEquals(tuples.size(),  1);
+    assertTrue(tuples.get(0).get("d") instanceof Double);
+    assertTrue(tuples.get(0).get("l") instanceof Long);
+
+    assertEquals(tuples.get(0).getDouble("d"), 1.1D, 0);
+    assertEquals(tuples.get(0).getLong("l").longValue(), 5000L);
+
+  }
+
+
+  public void testDoubleLongArray() throws Exception {
+    String expr = "let(a=list(tuple(d=\"1.1\", l=\"5000\"), tuple(d=\"1.3\", l=\"7000\")),"
+
+        "              b=col(a, d)," +
+        "              c=col(a, l)," +
+        "              tuple(doubles=double(b)," +
+        "                    longs=long(c)))";
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", expr);
+    paramsLoc.set("qt", "/stream");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
+    TupleStream solrStream = new SolrStream(url, paramsLoc);
+
+    StreamContext context = new StreamContext();
+    solrStream.setStreamContext(context);
+    List<Tuple> tuples = getTuples(solrStream);
+    assertEquals(tuples.size(),  1);
+
+    List<Double> doubles = (List<Double>)tuples.get(0).get("doubles");
+    List<Long> longs = (List<Long>)tuples.get(0).get("longs");
+    assertEquals(doubles.get(0), 1.1, 0);
+    assertEquals(doubles.get(1), 1.3, 0);
+
+    assertEquals(longs.get(0).longValue(), 5000L);
+    assertEquals(longs.get(1).longValue(), 7000L);
+  }
+
+
   @Test
   public void testCommitStream() throws Exception {
 


Mime
View raw message