lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From no...@apache.org
Subject lucene-solr:master: SOLR-8349: Allow sharing of large in memory data structures across cores
Date Mon, 18 Apr 2016 10:21:40 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/master 4751b83c9 -> 9a1880aee


SOLR-8349: Allow sharing of large in memory data structures across cores


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9a1880ae
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9a1880ae
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9a1880ae

Branch: refs/heads/master
Commit: 9a1880aee821d4e6e96a8ff2fb15062b1e4c9eb1
Parents: 4751b83
Author: Noble Paul <noble@apache.org>
Authored: Mon Apr 18 15:51:19 2016 +0530
Committer: Noble Paul <noble@apache.org>
Committed: Mon Apr 18 15:51:19 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../org/apache/solr/core/BlobRepository.java    | 220 +++++++++++--------
 .../java/org/apache/solr/core/PluginBag.java    |  35 ++-
 .../src/java/org/apache/solr/core/SolrCore.java |  32 +++
 .../solr/configsets/resource-sharing/schema.xml |  25 +++
 .../configsets/resource-sharing/solrconfig.xml  |  51 +++++
 .../solr/core/BlobRepositoryCloudTest.java      | 138 ++++++++++++
 .../solr/core/BlobRepositoryMockingTest.java    | 165 ++++++++++++++
 .../apache/solr/core/TestDynamicLoading.java    |   4 +-
 .../component/ResourceSharingTestComponent.java | 149 +++++++++++++
 10 files changed, 724 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5d81091..6fc116b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -96,6 +96,9 @@ New Features
 * SOLR-8962: Add sort Streaming Expression. The expression takes a single input stream and
a 
   comparator and outputs tuples in stable order of the comparator. (Dennis Gove)
 
+* SOLR-8349: Allow sharing of large in memory data structures across cores (Gus Heck, noble)
+
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/java/org/apache/solr/core/BlobRepository.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/BlobRepository.java b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
index 09461f0..0f3d1c3 100644
--- a/solr/core/src/java/org/apache/solr/core/BlobRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
@@ -31,9 +31,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
+import java.util.regex.Pattern;
 
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -46,18 +46,21 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.util.CryptoKeys;
 import org.apache.solr.util.SimplePostTool;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.SolrException.ErrorCode.SERVICE_UNAVAILABLE;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+
 /**
  * The purpose of this class is to store the Jars loaded in memory and to keep only one copy
of the Jar in a single node.
  */
 public class BlobRepository {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   static final Random RANDOM;
+  static final Pattern BLOB_KEY_PATTERN_CHECKER = Pattern.compile(".*/\\d+");
 
   static {
     // We try to make things reproducible in the context of our tests by initializing the
random instance
@@ -71,81 +74,113 @@ public class BlobRepository {
   }
 
   private final CoreContainer coreContainer;
-  private Map<String, BlobContent> blobs = new ConcurrentHashMap<>();
+  private Map<String, BlobContent> blobs = createMap();
 
-  public BlobRepository(CoreContainer coreContainer) {
-    this.coreContainer = coreContainer;
+  // for unit tests to override
+  ConcurrentHashMap<String, BlobContent> createMap() {
+    return new ConcurrentHashMap<>();
   }
 
-  public static ByteBuffer getFileContent(BlobContent blobContent, String entryName) throws
IOException {
-    ByteArrayInputStream zipContents = new ByteArrayInputStream(blobContent.buffer.array(),
blobContent.buffer.arrayOffset(), blobContent.buffer.limit());
-    ZipInputStream zis = new ZipInputStream(zipContents);
-    try {
-      ZipEntry entry;
-      while ((entry = zis.getNextEntry()) != null) {
-        if (entryName == null || entryName.equals(entry.getName())) {
-          SimplePostTool.BAOS out = new SimplePostTool.BAOS();
-          byte[] buffer = new byte[2048];
-          int size;
-          while ((size = zis.read(buffer, 0, buffer.length)) != -1) {
-            out.write(buffer, 0, size);
-          }
-          out.close();
-          return out.getByteBuffer();
-        }
-      }
-    } finally {
-      zis.closeEntry();
-    }
-    return null;
+  public BlobRepository(CoreContainer coreContainer) {
+    this.coreContainer = coreContainer;
   }
 
+  // I wanted to {@link SolrCore#loadDecodeAndCacheBlob(String, Decoder)} below but precommit
complains
   /**
-   * Returns the contents of a jar and increments a reference count. Please return the same
object to decrease the refcount
+   * Returns the contents of a blob containing a ByteBuffer and increments a reference count.
Please return the 
+   * same object to decrease the refcount. This is normally used for storing jar files, and
binary raw data.
+   * If you are caching Java Objects you want to use {@code SolrCore#loadDecodeAndCacheBlob(String,
Decoder)}
    *
    * @param key it is a combination of blobname and version like blobName/version
-   * @return The reference of a jar
+   * @return The reference of a blob
    */
-  public BlobContentRef getBlobIncRef(String key) {
-    BlobContent aBlob = blobs.get(key);
-    if (aBlob == null) {
-      if (this.coreContainer.isZooKeeperAware()) {
-        Replica replica = getSystemCollReplica();
-        String url = replica.getStr(BASE_URL_PROP) + "/.system/blob/" + key + "?wt=filestream";
-
-        HttpClient httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
-        HttpGet httpGet = new HttpGet(url);
-        ByteBuffer b;
-        try {
-          HttpResponse entity = httpClient.execute(httpGet, HttpClientUtil.createNewHttpClientRequestContext());
-          int statusCode = entity.getStatusLine().getStatusCode();
-          if (statusCode != 200) {
-            throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "no such blob or version
available: " + key);
-          }
-          b = SimplePostTool.inputStreamToByteArray(entity.getEntity().getContent());
-        } catch (Exception e) {
-          if (e instanceof SolrException) {
-            throw (SolrException) e;
-          } else {
-            throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "could not load :
" + key, e);
+  public BlobContentRef<ByteBuffer> getBlobIncRef(String key) {
+   return getBlobIncRef(key, () -> addBlob(key));
+  }
+  
+  /**
+   * Internal method that returns the contents of a blob and increments a reference count.
Please return the same 
+   * object to decrease the refcount. Only the decoded content will be cached when this method
is used. Component 
+   * authors attempting to share objects across cores should use 
+   * {@code SolrCore#loadDecodeAndCacheBlob(String, Decoder)} which ensures that a proper
close hook is also created.
+   *
+   * @param key it is a combination of blob name and version like blobName/version
+   * @param decoder a decoder that knows how to interpret the bytes from the blob
+   * @return The reference of a blob
+   */
+  BlobContentRef<Object> getBlobIncRef(String key, Decoder<Object> decoder) {
+    return getBlobIncRef(key.concat(decoder.getName()), () -> addBlob(key,decoder));
+  }
+
+  // do the actual work returning the appropriate type...
+  private <T> BlobContentRef<T> getBlobIncRef(String key, Callable<BlobContent<T>>
blobCreator) {
+    BlobContent<T> aBlob;
+    if (this.coreContainer.isZooKeeperAware()) {
+      synchronized (blobs) {
+        aBlob = blobs.get(key);
+        if (aBlob == null) {
+          try {
+            aBlob = blobCreator.call();
+          } catch (Exception e) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading failed:
"+e.getMessage(), e);
           }
-        } finally {
-          httpGet.releaseConnection();
         }
-        blobs.put(key, aBlob = new BlobContent(key, b));
-      } else {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Jar loading is not
supported in non-cloud mode");
-        // todo
       }
-
+    } else {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading is not
supported in non-cloud mode");
+      // todo
     }
-
-    BlobContentRef ref = new BlobContentRef(aBlob);
+    BlobContentRef<T> ref = new BlobContentRef<>(aBlob);
     synchronized (aBlob.references) {
       aBlob.references.add(ref);
     }
     return ref;
+  }
 
+  // For use cases sharing raw bytes
+  private BlobContent<ByteBuffer> addBlob(String key) {
+    ByteBuffer b = fetchBlob(key);
+    BlobContent<ByteBuffer> aBlob  = new BlobContent<>(key, b);
+    blobs.put(key, aBlob);
+    return aBlob;
+  }
+
+  // for use cases sharing java objects
+  private BlobContent<Object> addBlob(String key, Decoder<Object> decoder) {
+    ByteBuffer b = fetchBlob(key);
+    String  keyPlusName = key + decoder.getName();
+    BlobContent<Object> aBlob = new BlobContent<>(keyPlusName, b, decoder);
+    blobs.put(keyPlusName, aBlob);
+    return aBlob;
+  }
+  
+  /**
+   *  Package local for unit tests only please do not use elsewhere
+   */
+  ByteBuffer fetchBlob(String key) {
+    Replica replica = getSystemCollReplica();
+    String url = replica.getStr(BASE_URL_PROP) + "/.system/blob/" + key + "?wt=filestream";
+
+    HttpClient httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
+    HttpGet httpGet = new HttpGet(url);
+    ByteBuffer b;
+    try {
+      HttpResponse entity = httpClient.execute(httpGet);
+      int statusCode = entity.getStatusLine().getStatusCode();
+      if (statusCode != 200) {
+        throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "no such blob or version
available: " + key);
+      }
+      b = SimplePostTool.inputStreamToByteArray(entity.getEntity().getContent());
+    } catch (Exception e) {
+      if (e instanceof SolrException) {
+        throw (SolrException) e;
+      } else {
+        throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "could not load : " +
key, e);
+      }
+    } finally {
+      httpGet.releaseConnection();
+    }
+    return b;
   }
 
   private Replica getSystemCollReplica() {
@@ -193,61 +228,60 @@ public class BlobRepository {
         blobs.remove(ref.blob.key);
       }
     }
-
   }
 
-  public static class BlobContent {
-    private final String key;
-    private Map<String, Object> decodedObjects = null;
-    // TODO move this off-heap
-    private final ByteBuffer buffer;
+  public static class BlobContent<T> {
+    public final String key;
+    private final T content; // holds byte buffer or cached object, holding both is a waste
of memory
     // ref counting mechanism
     private final Set<BlobContentRef> references = new HashSet<>();
 
+    public BlobContent(String key, ByteBuffer buffer, Decoder<T> decoder) {
+      this.key = key;
+      this.content = decoder.decode(new ByteBufferInputStream(buffer));
+    }
 
+    @SuppressWarnings("unchecked")
     public BlobContent(String key, ByteBuffer buffer) {
       this.key = key;
-      this.buffer = buffer;
+      this.content = (T) buffer; 
     }
 
     /**
-     * This method decodes the byte[] to a custom Object
-     *
-     * @param key     The key is used to store the decoded Object. it is possible to have
multiple
-     *                decoders for the same blob (may be unusual).
-     * @param decoder A decoder instance
-     * @return the decoded Object . If it was already decoded, then return from the cache
+     * Get the cached object. 
+     * 
+     * @return the object representing the content that is cached.
      */
-    public <T> T decodeAndCache(String key, Decoder<T> decoder) {
-      if (decodedObjects == null) {
-        synchronized (this) {
-          if (decodedObjects == null) decodedObjects = new ConcurrentHashMap<>();
-        }
-      }
-
-      Object t = decodedObjects.get(key);
-      if (t != null) return (T) t;
-      t = decoder.decode(new ByteBufferInputStream(buffer));
-      decodedObjects.put(key, t);
-      return (T) t;
-
-    }
-
-    public String checkSignature(String base64Sig, CryptoKeys keys) {
-      return keys.verify(base64Sig, buffer);
+    public T get() {
+      return this.content;
     }
 
   }
 
   public interface Decoder<T> {
 
+    /**
+     * A name by which to distinguish this decoding. This only needs to be implemented if
you want to support
+     * decoding the same blob content with more than one decoder.
+     * 
+     * @return The name of the decoding, defaults to empty string.
+     */
+    default String getName() { return ""; }
+
+    /**
+     * A routine that knows how to convert the stream of bytes from the blob into a Java
object.
+     * 
+     * @param inputStream the bytes from a blob
+     * @return A Java object of the specified type.
+     */
     T decode(InputStream inputStream);
   }
 
-  public static class BlobContentRef {
-    public final BlobContent blob;
 
-    private BlobContentRef(BlobContent blob) {
+  public static class BlobContentRef<T> {
+    public final BlobContent<T> blob;
+
+    private BlobContentRef(BlobContent<T> blob) {
       this.blob = blob;
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/java/org/apache/solr/core/PluginBag.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java
index 0defdad..412bd93 100644
--- a/solr/core/src/java/org/apache/solr/core/PluginBag.java
+++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.core;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.ByteBuffer;
@@ -28,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 import org.apache.lucene.analysis.util.ResourceLoader;
 import org.apache.lucene.analysis.util.ResourceLoaderAware;
@@ -38,6 +41,7 @@ import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.handler.component.SearchComponent;
 import org.apache.solr.request.SolrRequestHandler;
 import org.apache.solr.util.CryptoKeys;
+import org.apache.solr.util.SimplePostTool;
 import org.apache.solr.util.plugin.NamedListInitializedPlugin;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
 import org.apache.solr.util.plugin.SolrCoreAware;
@@ -386,7 +390,7 @@ public class PluginBag<T> implements AutoCloseable {
    */
   public static class RuntimeLib implements PluginInfoInitialized, AutoCloseable {
     private String name, version, sig;
-    private BlobRepository.BlobContentRef jarContent;
+    private BlobRepository.BlobContentRef<ByteBuffer> jarContent;
     private final CoreContainer coreContainer;
     private boolean verified = false;
 
@@ -430,10 +434,35 @@ public class PluginBag<T> implements AutoCloseable {
     public ByteBuffer getFileContent(String entryName) throws IOException {
       if (jarContent == null)
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "jar not available:
" + name + "/" + version);
-      return BlobRepository.getFileContent(jarContent.blob, entryName);
+      return getFileContent(jarContent.blob, entryName);
 
     }
 
+    public ByteBuffer getFileContent(BlobRepository.BlobContent<ByteBuffer> blobContent,
 String entryName) throws IOException {
+      ByteBuffer buff = blobContent.get();
+      ByteArrayInputStream zipContents = new ByteArrayInputStream(buff.array(), buff.arrayOffset(),
buff.limit());
+      ZipInputStream zis = new ZipInputStream(zipContents);
+      try {
+        ZipEntry entry;
+        while ((entry = zis.getNextEntry()) != null) {
+          if (entryName == null || entryName.equals(entry.getName())) {
+            SimplePostTool.BAOS out = new SimplePostTool.BAOS();
+            byte[] buffer = new byte[2048];
+            int size;
+            while ((size = zis.read(buffer, 0, buffer.length)) != -1) {
+              out.write(buffer, 0, size);
+            }
+            out.close();
+            return out.getByteBuffer();
+          }
+        }
+      } finally {
+        zis.closeEntry();
+      }
+      return null;
+    }
+
+
     @Override
     public void close() throws Exception {
       if (jarContent != null) coreContainer.getBlobRepository().decrementBlobRefCount(jarContent);
@@ -472,7 +501,7 @@ public class PluginBag<T> implements AutoCloseable {
       }
 
       try {
-        String matchedKey = jarContent.blob.checkSignature(sig, new CryptoKeys(keys));
+        String matchedKey = new CryptoKeys(keys).verify(sig, jarContent.blob.get());
         if (matchedKey == null)
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No key matched signature
for jar : " + name + " version: " + version);
         log.info("Jar {} signed with {} successfully verified", name, matchedKey);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index faac0a2..bb0cd05 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -2569,6 +2569,38 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
     }
     return implicits;
   }
+
+  /**
+   * Convenience method to load a blob. This method minimizes the degree to which component
and other code needs 
+   * to depend on the structure of solr's object graph and ensures that a proper close hook
is registered. This method 
+   * should normally be called in {@link SolrCoreAware#inform(SolrCore)}, and should never
be called during request
+   * processing. The Decoder will only run on the first invocations, subsequent invocations
will return the 
+   * cached object. 
+   * 
+   * @param key A key in the format of name/version for a blob stored in the .system blob
store via the Blob Store API
+   * @param decoder a decoder with which to convert the blob into a Java Object representation
(first time only)
+   * @return a reference to the blob that has already cached the decoded version.
+   */
+  public BlobRepository.BlobContentRef loadDecodeAndCacheBlob(String key, BlobRepository.Decoder<Object>
decoder) {
+    // make sure component authors don't give us oddball keys with no version...
+    if (!BlobRepository.BLOB_KEY_PATTERN_CHECKER.matcher(key).matches()) {
+      throw new IllegalArgumentException("invalid key format, must end in /N where N is the
version number");
+    }
+    CoreContainer coreContainer = getCoreDescriptor().getCoreContainer();
+    // define the blob
+    BlobRepository.BlobContentRef blobRef = coreContainer.getBlobRepository().getBlobIncRef(key,
decoder);
+    addCloseHook(new CloseHook() {
+      @Override
+      public void preClose(SolrCore core) {
+      }
+
+      @Override
+      public void postClose(SolrCore core) {
+        core.getCoreDescriptor().getCoreContainer().getBlobRepository().decrementBlobRefCount(blobRef);
+      }
+    });
+    return blobRef;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test-files/solr/configsets/resource-sharing/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/resource-sharing/schema.xml b/solr/core/src/test-files/solr/configsets/resource-sharing/schema.xml
new file mode 100644
index 0000000..1288cf4
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/resource-sharing/schema.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <types>
+  <fieldType name="string" class="solr.StrField"/>
+ </types>
+ <fields>
+  <dynamicField name="*" type="string" indexed="true" stored="true" />
+ </fields>
+</schema>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test-files/solr/configsets/resource-sharing/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/resource-sharing/solrconfig.xml b/solr/core/src/test-files/solr/configsets/resource-sharing/solrconfig.xml
new file mode 100644
index 0000000..1dd92fe
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/resource-sharing/solrconfig.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+
+  </updateHandler>
+  <searchComponent name="testComponent" class="org.apache.solr.handler.component.ResourceSharingTestComponent"
/>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+    <arr name="first-components">
+      <str>testComponent</str>
+    </arr>
+  </requestHandler>
+</config>
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java b/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java
new file mode 100644
index 0000000..3e51b36
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java
@@ -0,0 +1,138 @@
+package org.apache.solr.core;
+
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Set;
+
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class BlobRepositoryCloudTest extends SolrCloudTestCase {
+
+  public static final Path TEST_PATH = getFile("solr/configsets").toPath();
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(1)  // only sharing *within* a node
+        .addConfig("configname", TEST_PATH.resolve("resource-sharing"))
+        .configure();
+//    Thread.sleep(2000);
+    HashMap<String, String> params = new HashMap<>();
+    cluster.createCollection(".system", 1, 1, null, params);
+//    Thread.sleep(2000);
+    // test component will fail if it cant' find a blob with this data by this name
+    postBlob("testResource", "foo,bar\nbaz,bam");
+//    Thread.sleep(2000);
+    // if these don't load we probably failed to post the blob above
+    cluster.createCollection("col1", 1, 1, "configname", params);
+    cluster.createCollection("col2", 1, 1, "configname", params);
+//    Thread.sleep(2000);
+    SolrInputDocument document = new SolrInputDocument();
+    document.addField("id", "1");
+    document.addField("text", "col1");
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    solrClient.add("col1", document);
+    solrClient.commit("col1");
+    document = new SolrInputDocument();
+    document.addField("id", "1");
+    document.addField("text", "col2");
+    solrClient.add("col2", document);
+    solrClient.commit("col2");
+    Thread.sleep(2000);
+
+  }
+
+  @Test
+  public void test() throws Exception {
+    // This test relies on the installation of ResourceSharingTestComponent which has 2 useful
properties:
+    // 1. it will fail to initialize if it doesn't find a 2 line CSV like foo,bar\nbaz,bam
thus validating
+    //    that we are properly pulling data from the blob store
+    // 2. It replaces any q for a query request to /select with "text:<name>" where
<name> is the name
+    //    of the last collection to run a query. It does this by caching a shared resource
of type
+    //    ResourceSharingTestComponent.TestObject, and the following sequence is proof that
either
+    //    collection can tell if it was (or was not) the last collection to issue a query
by 
+    //    consulting the shared object
+    assertLastQueryNotToCollection("col1");
+    assertLastQueryNotToCollection("col2");
+    assertLastQueryNotToCollection("col1");
+    assertLastQueryToCollection("col1");
+    assertLastQueryNotToCollection("col2");
+    assertLastQueryToCollection("col2");
+  }
+
+  // TODO: move this up to parent class? Probably accepting entity, or with alternative signatures
+  private static void postBlob(String name, String string) throws IOException {
+    HttpPost post = new HttpPost(findLiveNodeURI() + "/.system/blob/" + name);
+    StringEntity csv = new StringEntity(string, ContentType.create("application/octet-stream"));
+    post.setEntity(csv);
+    try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+      httpclient.execute(post);
+    }
+  }
+
+  // TODO: move this up to parent class?
+  private static String findLiveNodeURI() {
+    ZkTestServer zkServer = cluster.getZkServer();
+    ZKDatabase zkDatabase = zkServer.getZKDatabase();
+    DataTree dataTree = zkDatabase.getDataTree();
+    DataNode node = dataTree.getNode("/solr/live_nodes");
+    Set<String> children = node.getChildren();
+    String liveNode = children.iterator().next();
+    String[] split = liveNode.split("_");
+    String host = split[0];
+    String name = split[1];
+    return "http://" + host + "/" + name;
+  }
+
+  private void assertLastQueryToCollection(String collection) throws SolrServerException,
IOException {
+    assertEquals(1, getSolrDocuments(collection).size());
+  }
+
+  private void assertLastQueryNotToCollection(String collection) throws SolrServerException,
IOException {
+    assertEquals(0, getSolrDocuments(collection).size());
+  }
+
+  private SolrDocumentList getSolrDocuments(String collection) throws SolrServerException,
IOException {
+    SolrQuery query = new SolrQuery("*:*");
+    CloudSolrClient client = cluster.getSolrClient();
+    QueryResponse resp1 = client.query(collection, query);
+    return resp1.getResults();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test/org/apache/solr/core/BlobRepositoryMockingTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/BlobRepositoryMockingTest.java b/solr/core/src/test/org/apache/solr/core/BlobRepositoryMockingTest.java
new file mode 100644
index 0000000..e82915f
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/BlobRepositoryMockingTest.java
@@ -0,0 +1,165 @@
+package org.apache.solr.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.common.SolrException;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class BlobRepositoryMockingTest {
+
+  private static final Charset UTF8 = Charset.forName("UTF-8");
+  private static final String[][] PARSED = new String[][]{{"foo", "bar", "baz"}, {"bang",
"boom", "bash"}};
+  private static final String BLOBSTR = "foo,bar,baz\nbang,boom,bash";
+  private CoreContainer mockContainer = EasyMock.createMock(CoreContainer.class);
+  @SuppressWarnings("unchecked")
+  private ConcurrentHashMap<String, BlobRepository.BlobContent> mapMock = EasyMock.createMock(ConcurrentHashMap.class);
+  @SuppressWarnings("unchecked")
+  private BlobRepository.Decoder<Object> decoderMock = EasyMock.createMock(BlobRepository.Decoder.class);;
+  @SuppressWarnings("unchecked")
+  private BlobRepository.BlobContent<Object> blobContentMock = EasyMock.createMock(BlobRepository.BlobContent.class);
+  
+  private Object[] mocks = new Object[] {
+      mockContainer,
+      decoderMock,
+      blobContentMock,
+      mapMock
+  };
+  
+  BlobRepository repository;
+  ByteBuffer blobData = ByteBuffer.wrap(BLOBSTR.getBytes(UTF8));
+  boolean blobFetched = false;
+  String blobKey = "";
+
+
+  @Before
+  public void setUp() throws IllegalAccessException, NoSuchFieldException {
+    blobFetched = false;
+    blobKey = "";
+    EasyMock.reset(mocks);
+    repository = new BlobRepository(mockContainer) {
+      @Override
+      ByteBuffer fetchBlob(String key) {
+        blobKey = key;
+        blobFetched = true;
+        return blobData;
+      }
+
+      @Override
+      ConcurrentHashMap<String, BlobContent> createMap() {
+        return mapMock;
+      }
+
+    };
+  }
+
+  @After
+  public void tearDown() {
+    EasyMock.verify(mocks);
+  }
+
+  @Test (expected = SolrException.class)
+  public void testCloudOnly() {
+    expect(mockContainer.isZooKeeperAware()).andReturn(false);
+    EasyMock.replay(mocks);
+    BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetBlobIncrRefString() {
+    expect(mockContainer.isZooKeeperAware()).andReturn(true);
+    expect(mapMock.get("foo!")).andReturn(null);
+    expect(mapMock.put(eq("foo!"), anyObject(BlobRepository.BlobContent.class))).andReturn(null);
+    EasyMock.replay(mocks);
+    BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
+    assertTrue("foo!".equals(blobKey));
+    assertTrue(blobFetched);
+    assertNotNull(ref.blob);
+    assertEquals(blobData, ref.blob.get());
+  }  
+  
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCachedAlready() {
+    expect(mockContainer.isZooKeeperAware()).andReturn(true);
+    expect(mapMock.get("foo!")).andReturn(new BlobRepository.BlobContent<BlobRepository>("foo!",
blobData));
+    EasyMock.replay(mocks);
+    BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
+    assertEquals("",blobKey);
+    assertFalse(blobFetched);
+    assertNotNull(ref.blob);
+    assertEquals(blobData, ref.blob.get());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetBlobIncrRefStringDecoder() {
+    expect(mockContainer.isZooKeeperAware()).andReturn(true);
+    expect(mapMock.get("foo!mocked")).andReturn(null);
+    expect(mapMock.put(eq("foo!mocked"), anyObject(BlobRepository.BlobContent.class))).andReturn(null);
+    
+    EasyMock.replay(mocks);
+    BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!", new BlobRepository.Decoder<Object>()
{
+      @Override
+      public Object decode(InputStream inputStream) {
+        StringWriter writer = new StringWriter();
+        try {
+          IOUtils.copy(inputStream, writer, UTF8);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        
+        assertEquals(BLOBSTR, writer.toString());
+        return PARSED;
+      }
+
+      @Override
+      public String getName() {
+        return "mocked";
+      }
+    });
+    assertEquals("foo!",blobKey);
+    assertTrue(blobFetched);
+    assertNotNull(ref.blob);
+    assertEquals(PARSED, ref.blob.get());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java b/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java
index 6570e4a..f7832ef 100644
--- a/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java
+++ b/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java
@@ -104,7 +104,7 @@ public class TestDynamicLoading extends AbstractFullDistribZkTestBase
{
     Map map = TestSolrConfigHandler.getRespMap("/test1?wt=json", client);
 
     assertNotNull(TestBlobHandler.getAsString(map), map = (Map) map.get("error"));
-    assertEquals(TestBlobHandler.getAsString(map), ".system collection not available", map.get("msg"));
+    assertTrue(TestBlobHandler.getAsString(map), map.get("msg").toString().contains(".system
collection not available"));
 
 
     TestBlobHandler.createSystemCollection(getHttpSolrClient(baseURL, randomClient.getHttpClient()));
@@ -114,7 +114,7 @@ public class TestDynamicLoading extends AbstractFullDistribZkTestBase
{
 
 
     assertNotNull(map = (Map) map.get("error"));
-    assertEquals("full output " + TestBlobHandler.getAsString(map), "no such blob or version
available: colltest/1" , map.get("msg"));
+    assertTrue("full output " + TestBlobHandler.getAsString(map), map.get("msg").toString().contains("no
such blob or version available: colltest/1" ));
     payload = " {\n" +
         "  'set' : {'watched': {" +
         "                    'x':'X val',\n" +

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
b/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
new file mode 100644
index 0000000..8223fe5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
@@ -0,0 +1,149 @@
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.BlobRepository;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class ResourceSharingTestComponent extends SearchComponent implements SolrCoreAware
{
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private SolrCore core;
+  private volatile BlobRepository.BlobContent<TestObject> blob;
+
+  @SuppressWarnings("SynchronizeOnNonFinalField")
+  @Override
+  public void prepare(ResponseBuilder rb) throws IOException {
+    SolrParams params = rb.req.getParams();
+    ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+    String q = "text:" + getTestObj().getLastCollection();
+    mParams.set("q", q); // search for the last collection name.
+    // This should cause the param to show up in the response...
+    rb.req.setParams(mParams);
+    getTestObj().setLastCollection(core.getCoreDescriptor().getCollectionName());
+  }
+
+  @Override
+  public void process(ResponseBuilder rb) throws IOException {}
+
+  @Override
+  public String getDescription() {
+    return "ResourceSharingTestComponent";
+  }
+
+  @Override
+  public String getSource() {
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  TestObject getTestObj() {
+    return this.blob.get();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void inform(SolrCore core) {
+    log.info("Informing test component...");
+    this.core = core;
+    this.blob =  core.loadDecodeAndCacheBlob(getKey(), new DumbCsvDecoder()).blob;
+    log.info("Test component informed!");
+  }
+
+  private String getKey() {
+    return getResourceName() + "/" + getResourceVersion();
+  }
+
+  public String getResourceName() {
+    return "testResource";
+  }
+
+  public String getResourceVersion() {
+    return "1";
+  }
+
+  class DumbCsvDecoder implements BlobRepository.Decoder<Object> {
+    private final Map<String, String> dict = new HashMap<>();
+    
+    public DumbCsvDecoder() {}
+    
+    void processSimpleCsvRow(String string) {
+      String[] row = string.split(","); // dumbest csv parser ever... :)
+      getDict().put(row[0], row[1]);
+    }
+
+    public Map<String, String> getDict() {
+      return dict;
+    }
+
+    @Override
+    public TestObject decode(InputStream inputStream) {
+      // loading a tiny csv like:
+      // 
+      // foo,bar
+      // baz,bam
+
+      try (Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream,
Charset.forName("UTF-8"))).lines()) {
+          lines.forEach(this::processSimpleCsvRow);
+      } catch (Exception e) {
+        log.error("failed to read dictionary {}", getResourceName() );
+        throw new RuntimeException("Cannot load  dictionary " , e);
+      }
+      
+      assertEquals("bar", dict.get("foo"));
+      assertEquals("bam", dict.get("baz"));
+      log.info("Loaded {}  using {}", getDict().size(), this.getClass().getClassLoader());
+      
+      // if we get here we have seen the data from the blob and all we need is to test that
two collections
+      // are able to see the same object..
+      return new TestObject();
+    }
+  }
+
+  
+  public static class TestObject {
+    public static final String NEVER_UPDATED = "never updated";
+    private volatile String lastCollection = NEVER_UPDATED;
+
+    public String getLastCollection() {
+      return this.lastCollection;
+    }
+
+    public void setLastCollection(String lastCollection) {
+      this.lastCollection = lastCollection;
+    }
+  }
+  
+}


Mime
View raw message