Repository: nifi
Updated Branches:
refs/heads/master 84cecfbee -> 9f919b9b6
NIFI-4583 Restructure nifi-solr-processors
Make all methods static
NIFI-4583 Restructure nifi-solr-processors R2
This closes #2285.
Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9f919b9b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9f919b9b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9f919b9b
Branch: refs/heads/master
Commit: 9f919b9b650bbec647d1d5acbc1a63b721ce84e3
Parents: 84cecfb
Author: U-WOODMARK\johannes.peter <johannes.peter@wmc-w8e-nb-176.woodmark.de>
Authored: Tue Nov 21 16:38:18 2017 +0100
Committer: Koji Kawamura <ijokarumawak@apache.org>
Committed: Wed Jan 3 10:10:42 2018 +0900
----------------------------------------------------------------------
.../nifi-solr-processors/pom.xml | 11 +
.../apache/nifi/processors/solr/GetSolr.java | 79 ++----
.../processors/solr/PutSolrContentStream.java | 15 +
.../nifi/processors/solr/SolrProcessor.java | 180 +-----------
.../apache/nifi/processors/solr/SolrUtils.java | 284 +++++++++++++++++++
.../nifi/processors/solr/TestGetSolr.java | 130 +++++----
.../solr/TestPutSolrContentStream.java | 54 ++--
7 files changed, 431 insertions(+), 322 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f919b9b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
old mode 100644
new mode 100755
index f35927e..920ce73
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
@@ -132,6 +132,17 @@
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.xmlunit</groupId>
+ <artifactId>xmlunit-matchers</artifactId>
+ <version>2.2.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f919b9b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
index 1871f1c..3435ce4 100755
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
@@ -20,7 +20,6 @@ package org.apache.nifi.processors.solr;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -29,7 +28,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -37,7 +35,6 @@ import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
@@ -62,11 +59,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.record.ListRecordSet;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StringUtils;
@@ -75,12 +67,24 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CursorMarkParams;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+
@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
@@ -367,13 +371,13 @@ public class GetSolr extends SolrProcessor {
doc.removeFields(dateField);
}
}
- flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response));
+ flowFile = session.write(flowFile, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response));
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml");
} else {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = writerFactory.getSchema(null, null);
- final RecordSet recordSet = solrDocumentsToRecordSet(response.getResults(), schema);
+ final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
@@ -408,57 +412,6 @@ public class GetSolr extends SolrProcessor {
}
}
- /**
- * Writes each SolrDocument to a record.
- */
- private RecordSet solrDocumentsToRecordSet(final List<SolrDocument> docs, final RecordSchema schema) {
- final List<Record> lr = new ArrayList<Record>();
-
- for (SolrDocument doc : docs) {
- final Map<String, Object> recordValues = new LinkedHashMap<>();
- for (RecordField field : schema.getFields()){
- final Object fieldValue = doc.getFieldValue(field.getFieldName());
- if (fieldValue != null) {
- if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){
- recordValues.put(field.getFieldName(), ((List<Object>) fieldValue).toArray());
- } else {
- recordValues.put(field.getFieldName(), fieldValue);
- }
- }
- }
- lr.add(new MapRecord(schema, recordValues));
- }
- return new ListRecordSet(schema, lr);
- }
- /**
- * Writes each SolrDocument in XML format to the OutputStream.
- */
- private class QueryResponseOutputStreamCallback implements OutputStreamCallback {
- private QueryResponse response;
-
- public QueryResponseOutputStreamCallback(QueryResponse response) {
- this.response = response;
- }
- @Override
- public void process(OutputStream out) throws IOException {
- IOUtils.write("<docs>", out, StandardCharsets.UTF_8);
- for (SolrDocument doc : response.getResults()) {
- final String xml = ClientUtils.toXML(toSolrInputDocument(doc));
- IOUtils.write(xml, out, StandardCharsets.UTF_8);
- }
- IOUtils.write("</docs>", out, StandardCharsets.UTF_8);
- }
-
- public SolrInputDocument toSolrInputDocument(SolrDocument d) {
- final SolrInputDocument doc = new SolrInputDocument();
-
- for (String name : d.getFieldNames()) {
- doc.addField(name, d.getFieldValue(name));
- }
-
- return doc;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f919b9b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
old mode 100644
new mode 100755
index dc1830c..69bc071
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -57,6 +57,21 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+
@Tags({"Apache", "Solr", "Put", "Send"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f919b9b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
old mode 100644
new mode 100755
index cd29f7c..7c732e9
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
@@ -19,34 +19,31 @@
package org.apache.nifi.processors.solr;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.conn.scheme.Scheme;
-import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import javax.net.ssl.SSLContext;
import javax.security.auth.login.Configuration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_STANDARD;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
/**
* A base class for processors that interact with Apache Solr.
@@ -54,121 +51,6 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class SolrProcessor extends AbstractProcessor {
- public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
- "Cloud", "Cloud", "A SolrCloud instance.");
-
- public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
- "Standard", "Standard", "A stand-alone Solr instance.");
-
- public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
- .Builder().name("Solr Type")
- .description("The type of Solr instance, Cloud or Standard.")
- .required(true)
- .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
- .defaultValue(SOLR_TYPE_STANDARD.getValue())
- .build();
-
- public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor
- .Builder().name("Solr Location")
- .description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " +
- "or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
- .expressionLanguageSupported(true)
- .build();
-
- public static final PropertyDescriptor COLLECTION = new PropertyDescriptor
- .Builder().name("Collection")
- .description("The Solr collection name, only used with a Solr Type of Cloud")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
-
- public static final PropertyDescriptor JAAS_CLIENT_APP_NAME = new PropertyDescriptor
- .Builder().name("JAAS Client App Name")
- .description("The name of the JAAS configuration entry to use when performing Kerberos authentication to Solr. If this property is " +
- "not provided, Kerberos authentication will not be attempted. The value must match an entry in the file specified by the " +
- "system property java.security.auth.login.config.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor
- .Builder().name("Username")
- .description("The username to use when Solr is configured with basic authentication.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
- .expressionLanguageSupported(true)
- .build();
-
- public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor
- .Builder().name("Password")
- .description("The password to use when Solr is configured with basic authentication.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
- .expressionLanguageSupported(true)
- .sensitive(true)
- .build();
-
- public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
-
- public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor
- .Builder().name("Solr Socket Timeout")
- .description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("10 seconds")
- .build();
-
- public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor
- .Builder().name("Solr Connection Timeout")
- .description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("10 seconds")
- .build();
-
- public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new PropertyDescriptor
- .Builder().name("Solr Maximum Connections")
- .description("The maximum number of total connections allowed from the Solr client to Solr.")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("10")
- .build();
-
- public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor
- .Builder().name("Solr Maximum Connections Per Host")
- .description("The maximum number of connections allowed from the Solr client to a single Solr host.")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("5")
- .build();
-
- public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new PropertyDescriptor
- .Builder().name("ZooKeeper Client Timeout")
- .description("The amount of time to wait for data on a connection to ZooKeeper, only used with a Solr Type of Cloud.")
- .required(false)
- .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
- .defaultValue("10 seconds")
- .build();
-
- public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new PropertyDescriptor
- .Builder().name("ZooKeeper Connection Timeout")
- .description("The amount of time to wait when establishing a connection to ZooKeeper, only used with a Solr Type of Cloud.")
- .required(false)
- .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
- .defaultValue("10 seconds")
- .build();
-
private volatile SolrClient solrClient;
private volatile String solrLocation;
private volatile String basicUsername;
@@ -205,47 +87,7 @@ public abstract class SolrProcessor extends AbstractProcessor {
* @return an HttpSolrClient or CloudSolrClient
*/
protected SolrClient createSolrClient(final ProcessContext context, final String solrLocation) {
- final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
- final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- final String jaasClientAppName = context.getProperty(JAAS_CLIENT_APP_NAME).getValue();
-
- final ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout);
- params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost);
-
- // has to happen before the client is created below so that correct configurer would be set if neeeded
- if (!StringUtils.isEmpty(jaasClientAppName)) {
- System.setProperty("solr.kerberos.jaas.appname", jaasClientAppName);
- HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer());
- }
-
- final HttpClient httpClient = HttpClientUtil.createClient(params);
-
- if (sslContextService != null) {
- final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
- final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext);
- final Scheme httpsScheme = new Scheme("https", 443, sslSocketFactory);
- httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme);
- }
-
- if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
- return new HttpSolrClient(solrLocation, httpClient);
- } else {
- final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
- final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-
- CloudSolrClient cloudSolrClient = new CloudSolrClient(solrLocation, httpClient);
- cloudSolrClient.setDefaultCollection(collection);
- cloudSolrClient.setZkClientTimeout(zkClientTimeout);
- cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout);
- return cloudSolrClient;
- }
+ return SolrUtils.createSolrClient(context, solrLocation);
}
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f919b9b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
new file mode 100755
index 0000000..8977b7c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.ListRecordSet;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class SolrUtils {
+
+ public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
+ "Cloud", "Cloud", "A SolrCloud instance.");
+
+ public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
+ "Standard", "Standard", "A stand-alone Solr instance.");
+
+ public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
+ .Builder().name("Solr Type")
+ .description("The type of Solr instance, Cloud or Standard.")
+ .required(true)
+ .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
+ .defaultValue(SOLR_TYPE_STANDARD.getValue())
+ .build();
+
+ public static final PropertyDescriptor COLLECTION = new PropertyDescriptor
+ .Builder().name("Collection")
+ .description("The Solr collection name, only used with a Solr Type of Cloud")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor
+ .Builder().name("Solr Location")
+ .description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " +
+ "or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor
+ .Builder().name("Username")
+ .description("The username to use when Solr is configured with basic authentication.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor
+ .Builder().name("Password")
+ .description("The password to use when Solr is configured with basic authentication.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+ .expressionLanguageSupported(true)
+ .sensitive(true)
+ .build();
+
+ public static final PropertyDescriptor JAAS_CLIENT_APP_NAME = new PropertyDescriptor
+ .Builder().name("JAAS Client App Name")
+ .description("The name of the JAAS configuration entry to use when performing Kerberos authentication to Solr. If this property is " +
+ "not provided, Kerberos authentication will not be attempted. The value must match an entry in the file specified by the " +
+ "system property java.security.auth.login.config.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor
+ .Builder().name("Solr Socket Timeout")
+ .description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("10 seconds")
+ .build();
+
+ public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor
+ .Builder().name("Solr Connection Timeout")
+ .description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("10 seconds")
+ .build();
+
+ public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new PropertyDescriptor
+ .Builder().name("Solr Maximum Connections")
+ .description("The maximum number of total connections allowed from the Solr client to Solr.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10")
+ .build();
+
+ public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor
+ .Builder().name("Solr Maximum Connections Per Host")
+ .description("The maximum number of connections allowed from the Solr client to a single Solr host.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("5")
+ .build();
+
+ public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new PropertyDescriptor
+ .Builder().name("ZooKeeper Client Timeout")
+ .description("The amount of time to wait for data on a connection to ZooKeeper, only used with a Solr Type of Cloud.")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+ .defaultValue("10 seconds")
+ .build();
+
+ public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new PropertyDescriptor
+ .Builder().name("ZooKeeper Connection Timeout")
+ .description("The amount of time to wait when establishing a connection to ZooKeeper, only used with a Solr Type of Cloud.")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+ .defaultValue("10 seconds")
+ .build();
+
+ public static SolrClient createSolrClient(final PropertyContext context, final String solrLocation) {
+ final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
+ final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final String jaasClientAppName = context.getProperty(JAAS_CLIENT_APP_NAME).getValue();
+
+ final ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout);
+ params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost);
+
+ // has to happen before the client is created below so that correct configurer would be set if neeeded
+ if (!StringUtils.isEmpty(jaasClientAppName)) {
+ System.setProperty("solr.kerberos.jaas.appname", jaasClientAppName);
+ HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer());
+ }
+
+ final HttpClient httpClient = HttpClientUtil.createClient(params);
+
+ if (sslContextService != null) {
+ final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
+ final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext);
+ final Scheme httpsScheme = new Scheme("https", 443, sslSocketFactory);
+ httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme);
+ }
+
+ if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
+ return new HttpSolrClient(solrLocation, httpClient);
+ } else {
+ final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
+ final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+ CloudSolrClient cloudSolrClient = new CloudSolrClient(solrLocation, httpClient);
+ cloudSolrClient.setDefaultCollection(collection);
+ cloudSolrClient.setZkClientTimeout(zkClientTimeout);
+ cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout);
+ return cloudSolrClient;
+ }
+ }
+
+
+
+ /**
+ * Writes each SolrDocument to a record.
+ */
+ public static RecordSet solrDocumentsToRecordSet(final List<SolrDocument> docs, final RecordSchema schema) {
+ final List<Record> lr = new ArrayList<Record>();
+
+ for (SolrDocument doc : docs) {
+ final Map<String, Object> recordValues = new LinkedHashMap<>();
+ for (RecordField field : schema.getFields()){
+ final Object fieldValue = doc.getFieldValue(field.getFieldName());
+ if (fieldValue != null) {
+ if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){
+ recordValues.put(field.getFieldName(), ((List<Object>) fieldValue).toArray());
+ } else {
+ recordValues.put(field.getFieldName(), fieldValue);
+ }
+ }
+ }
+ lr.add(new MapRecord(schema, recordValues));
+ }
+ return new ListRecordSet(schema, lr);
+ }
+
+
+ public static OutputStreamCallback getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) {
+ return new QueryResponseOutputStreamCallback(response);
+ }
+
+ /**
+ * Writes each SolrDocument in XML format to the OutputStream.
+ */
+ private static class QueryResponseOutputStreamCallback implements OutputStreamCallback {
+ private QueryResponse response;
+
+ public QueryResponseOutputStreamCallback(QueryResponse response) {
+ this.response = response;
+ }
+
+ @Override
+ public void process(OutputStream out) throws IOException {
+ IOUtils.write("<docs>", out, StandardCharsets.UTF_8);
+ for (SolrDocument doc : response.getResults()) {
+ final String xml = ClientUtils.toXML(toSolrInputDocument(doc));
+ IOUtils.write(xml, out, StandardCharsets.UTF_8);
+ }
+ IOUtils.write("</docs>", out, StandardCharsets.UTF_8);
+ }
+
+ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
+ final SolrInputDocument doc = new SolrInputDocument();
+
+ for (String name : d.getFieldNames()) {
+ doc.addField(name, d.getFieldValue(name));
+ }
+
+ return doc;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f919b9b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
index cb3d9c5..af5f3dd 100755
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
@@ -18,6 +18,7 @@
*/
package org.apache.nifi.processors.solr;
+import com.google.gson.stream.JsonReader;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
@@ -34,8 +35,11 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.xmlunit.matchers.CompareMatcher;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
@@ -43,6 +47,9 @@ import java.util.Date;
import java.util.Locale;
import java.util.TimeZone;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
public class TestGetSolr {
static final String DEFAULT_SOLR_CORE = "testCollection";
@@ -91,18 +98,23 @@ public class TestGetSolr {
}
}
+ private static TestRunner createDefaultTestRunner(GetSolr processor) {
+ TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
+ runner.setProperty(GetSolr.DATE_FIELD, "created");
+ runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
+ runner.setProperty(SolrUtils.COLLECTION, "testCollection");
+ return runner;
+ }
+
@Test
public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
+ TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.BATCH_SIZE, "20");
- runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
@@ -112,15 +124,9 @@ public class TestGetSolr {
public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
+ TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.SOLR_QUERY, "integer_single:1000");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1");
- runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
@@ -135,12 +141,8 @@ public class TestGetSolr {
public void testValidation() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
+ TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.BATCH_SIZE, "2");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue());
runner.run(1);
@@ -150,14 +152,8 @@ public class TestGetSolr {
public void testCompletenessDespiteUpdates() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
+ TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.BATCH_SIZE, "1");
- runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true);
runner.assertQueueEmpty();
@@ -185,14 +181,8 @@ public class TestGetSolr {
public void testCompletenessDespiteDeletions() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
+ TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.BATCH_SIZE, "1");
- runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true);
runner.assertQueueEmpty();
@@ -227,15 +217,9 @@ public class TestGetSolr {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
+ TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter));
runner.setProperty(GetSolr.BATCH_SIZE, "1");
- runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
SolrInputDocument doc10 = new SolrInputDocument();
doc10.addField("id", "doc10");
@@ -258,14 +242,8 @@ public class TestGetSolr {
public void testPropertyModified() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
+ TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.BATCH_SIZE, "1");
- runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true);
runner.assertQueueEmpty();
@@ -291,14 +269,8 @@ public class TestGetSolr {
public void testStateCleared() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
+ TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.BATCH_SIZE, "1");
- runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true);
runner.assertQueueEmpty();
@@ -317,20 +289,16 @@ public class TestGetSolr {
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10);
runner.clearTransferState();
-
-
}
@Test
public void testRecordWriter() throws IOException, SolrServerException, InitializationException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
- runner.setProperty(GetSolr.DATE_FIELD, "created");
- runner.setProperty(GetSolr.BATCH_SIZE, "2");
- runner.setProperty(GetSolr.COLLECTION, "testCollection");
+ TestRunner runner = createDefaultTestRunner(proc);
+ runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue());
+ runner.setProperty(GetSolr.RETURN_FIELDS, "id,created,integer_single");
+ runner.setProperty(GetSolr.BATCH_SIZE, "10");
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
@@ -345,10 +313,46 @@ public class TestGetSolr {
runner.run(1,true, true);
runner.assertQueueEmpty();
- runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 5);
+ runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
+
+ // Check for valid json
+ JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
+ runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GetSolr.REL_SUCCESS).get(0)))));
+ reader.beginArray();
+ int controlScore = 0;
+ while (reader.hasNext()) {
+ reader.beginObject();
+ while (reader.hasNext()) {
+ if (reader.nextName().equals("integer_single"))
+ controlScore += reader.nextInt();
+ else
+ reader.skipValue();
+ }
+ reader.endObject();
+ }
+ assertEquals(controlScore, 45);
+ }
+
+ @Test
+ public void testForValidXml() throws IOException, SolrServerException, InitializationException {
+ final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
+
+ TestRunner runner = createDefaultTestRunner(proc);
+ runner.setProperty(GetSolr.SOLR_QUERY, "id:doc1");
+ runner.setProperty(GetSolr.RETURN_FIELDS, "id");
+ runner.setProperty(GetSolr.BATCH_SIZE, "10");
+
+ runner.run(1,true, true);
+ runner.assertQueueEmpty();
+ runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
+
+ String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc></docs>";
+ assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GetSolr.REL_SUCCESS).get(0)))));
}
+
// Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends GetSolr {
private SolrClient solrClient;
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f919b9b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
old mode 100644
new mode 100755
index 18948c6..f4eeac2
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
@@ -94,8 +94,8 @@ public class TestPutSolrContentStream {
*/
private static TestRunner createDefaultTestRunner(PutSolrContentStream processor) {
TestRunner runner = TestRunners.newTestRunner(processor);
- runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
- runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
return runner;
}
@@ -250,9 +250,9 @@ public class TestPutSolrContentStream {
final CollectionVerifyingProcessor proc = new CollectionVerifyingProcessor(collection);
final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "localhost:9983");
- runner.setProperty(PutSolrContentStream.COLLECTION, "${solr.collection}");
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:9983");
+ runner.setProperty(SolrUtils.COLLECTION, "${solr.collection}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("solr.collection", collection);
@@ -349,11 +349,11 @@ public class TestPutSolrContentStream {
@Test
public void testSolrTypeCloudShouldRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
- runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
- runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertNotValid();
- runner.setProperty(PutSolrContentStream.COLLECTION, "someCollection1");
+ runner.setProperty(SolrUtils.COLLECTION, "someCollection1");
runner.assertValid();
}
@@ -361,64 +361,64 @@ public class TestPutSolrContentStream {
@Test
public void testSolrTypeStandardShouldNotRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
- runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
- runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid();
}
@Test
public void testHttpsUrlShouldRequireSSLContext() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
- runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
- runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "https://localhost:8443/solr");
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "https://localhost:8443/solr");
runner.assertNotValid();
final SSLContextService sslContextService = new MockSSLContextService();
runner.addControllerService("ssl-context", sslContextService);
runner.enableControllerService(sslContextService);
- runner.setProperty(PutSolrContentStream.SSL_CONTEXT_SERVICE, "ssl-context");
+ runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
runner.assertValid();
}
@Test
public void testHttpUrlShouldNotAllowSSLContext() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
- runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
- runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid();
final SSLContextService sslContextService = new MockSSLContextService();
runner.addControllerService("ssl-context", sslContextService);
runner.enableControllerService(sslContextService);
- runner.setProperty(PutSolrContentStream.SSL_CONTEXT_SERVICE, "ssl-context");
+ runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
runner.assertNotValid();
}
@Test
public void testUsernamePasswordValidation() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
- runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
- runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid();
- runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "user1");
+ runner.setProperty(SolrUtils.BASIC_USERNAME, "user1");
runner.assertNotValid();
- runner.setProperty(PutSolrContentStream.BASIC_PASSWORD, "password");
+ runner.setProperty(SolrUtils.BASIC_PASSWORD, "password");
runner.assertValid();
- runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "");
+ runner.setProperty(SolrUtils.BASIC_USERNAME, "");
runner.assertNotValid();
- runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "${solr.user}");
+ runner.setProperty(SolrUtils.BASIC_USERNAME, "${solr.user}");
runner.assertNotValid();
runner.setVariable("solr.user", "solrRocks");
runner.assertValid();
- runner.setProperty(PutSolrContentStream.BASIC_PASSWORD, "${solr.password}");
+ runner.setProperty(SolrUtils.BASIC_PASSWORD, "${solr.password}");
runner.assertNotValid();
runner.setVariable("solr.password", "solrRocksPassword");
@@ -428,8 +428,8 @@ public class TestPutSolrContentStream {
@Test
public void testJAASClientAppNameValidation() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
- runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
- runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid();
// clear the jaas config system property if it was set
@@ -439,7 +439,7 @@ public class TestPutSolrContentStream {
}
// should be invalid if we have a client name but not config file
- runner.setProperty(PutSolrContentStream.JAAS_CLIENT_APP_NAME, "Client");
+ runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "Client");
runner.assertNotValid();
// should be invalid if we have a client name that is not in the config file
@@ -448,7 +448,7 @@ public class TestPutSolrContentStream {
runner.assertNotValid();
// should be valid now that the name matches up with the config file
- runner.setProperty(PutSolrContentStream.JAAS_CLIENT_APP_NAME, "SolrJClient");
+ runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "SolrJClient");
runner.assertValid();
}
|