lucene-solr-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Cutter <ryancut...@gmail.com>
Subject Re: Streaming Expression joins not returning all results
Date Mon, 16 May 2016 14:51:40 GMT
Thanks for all this info, Joel.  I found if I artificially limit the
triples stream to 3M and use the /export handler with only 2 workers, I can
get results in @ 20 seconds and Solr doesn't tip over.  That seems to be
the best config for this local/single instance.

It's also clear I'm not using streaming expressions optimally so I need to
do some more thinking!  I don't want to stream all 26M triples (much less
billions of docs) just for a simple join in which I expect a couple hundred
results.  I wanted to see if I could directly port a SQL join into this
framework using normalized Solr docs and single streaming expression.  I'll
do some more tinkering.

Thanks again, Ryan

On Sun, May 15, 2016 at 4:14 PM, Joel Bernstein <joelsolr@gmail.com> wrote:

> One other thing to keep in is how the partitioning is done when you add the
> partitionKeys.
>
> Partitioning is done using the HashQParserPlugin, which builds a filter for
> each worker. Under the covers this is using the normal filter query
> mechanism. So after the filters are built and cached they are effectively
> free from a performance standpoint. But on the first run they need to be
> built and they need to be rebuilt after each commit. These means several
> things:
>
> 1) If you have 8 workers then 8 filters need to be computed. The workers
> call down to the shards in parallel so the filters will build in parallel.
> But this can take time and the larger the index, the more time it takes.
>
> 2) Like all filters, the partitioning filters can be pre-computed using
> warming queries. You can check the logs and look for the {!hash ...} filter
> queries to see the syntax. But basically you would need a warming query for
> each worker ID.
>
> 3) If you don't pre-warm the partitioning filters then there will be a
> performance penalty the first time they are computed. The next query will
> be much faster.
>
> 4) This is another area where having more shards helps with performance,
> because having fewer documents per shard, means faster times building the
> partition filters.
>
> In the future we'll switch to segment level partitioning filters, so that
> following each commit only the new segments need to be built. But this is
> still on the TODO list.
>
>
> Joel Bernstein
> http://joelsolr.blogspot.com/
>
> On Sun, May 15, 2016 at 5:38 PM, Joel Bernstein <joelsolr@gmail.com>
> wrote:
>
> > Ah, you also used 4 shards. That means with 8 workers there were 32
> > concurrent queries against the /select handler each requesting 100,000
> > rows. That's a really heavy load!
> >
> > You can still try out the approach from my last email on the 4 shards
> > setup, as you add workers gradually you'll gradually ramp up the
> > parallelism on the machine. With a single worker you'll have 4 shards
> > working in parallel. With 8 works you'll have 32 threads working
> parallel.
> >
> > Joel Bernstein
> > http://joelsolr.blogspot.com/
> >
> > On Sun, May 15, 2016 at 5:23 PM, Joel Bernstein <joelsolr@gmail.com>
> > wrote:
> >
> >> Hi Ryan,
> >>
> >> The rows=100000 on the /select handler is likely going to cause problems
> >> with 8 workers. This is calling the /select handler with 8 concurrent
> >> workers each retrieving 100,000 rows. The /select handler bogs down as
> the
> >> number of rows increases. So using the rows parameter with the /select
> >> handler is really not a strategy for limiting the size of the join. To
> >> limit the size of the join you would need to place some kind of filter
> on
> >> the query and still use the /export handler.
> >>
> >> The /export handler was developed to handle large exports and not get
> >> bogged down.
> >>
> >> You may want to start just getting an understanding of how much data a
> >> single node can export, and how long it takes.
> >>
> >> 1) Try running a single *:* search() using the /export handler on the
> >> triple collection. Time how long it takes. If you run into problems
> getting
> >> this to complete then attach a memory profiler. It may be that 8 gigs is
> >> not enough to hold the docValues in memory and process the query. The
> >> /export handler does not use more memory as the result set rises, so the
> >> /export handler should be able process the entire query (30,000,000
> docs).
> >> But it does take a lot of memory to hold the docValues fields in memory.
> >> This query will likely take some time to complete though as you are
> sorting
> >> and exporting 30,000,000 million docs from a single node.
> >>
> >> 2) Then try running the same *:* search() against the /export handler in
> >> parallel() gradually increasing the number of workers. Time how long it
> >> takes as you add workers and watch the load it places on the server.
> >> Eventually you'll max out your performance.
> >>
> >>
> >> Then you'll start to get an idea of how fast a single node can sort and
> >> export data.
> >>
> >>
> >>
> >>
> >> Joel Bernstein
> >> http://joelsolr.blogspot.com/
> >>
> >> On Sat, May 14, 2016 at 4:14 PM, Ryan Cutter <ryancutter@gmail.com>
> >> wrote:
> >>
> >>> Hello, I'm running Solr on my laptop with -Xmx8g and gave each
> >>> collection 4
> >>> shards and 2 replicas.
> >>>
> >>> Even grabbing 100k triple documents (like the following) is taking 20
> >>> seconds to complete and prone to fall over.  I could try this in a
> proper
> >>> cluster with multiple hosts and more sharding, etc.  I just thought I
> was
> >>> tinkering with a small enough data set to use locally.
> >>>
> >>> parallel(
> >>>     triple,
> >>>     innerJoin(
> >>>       search(triple, q=*:*, fl="subject_id,type_id", sort="type_id
> asc",
> >>> partitionKeys="type_id", rows="100000"),
> >>>       search(triple_type, q=*:*, fl="triple_type_id",
> >>> sort="triple_type_id
> >>> asc", partitionKeys="triple_type_id", qt="/export"),
> >>>       on="type_id=triple_type_id"
> >>>     ),
> >>>     sort="subject_id asc",
> >>>     workers="8")
> >>>
> >>>
> >>> When Solr does crash, it's leaving messages like this.
> >>>
> >>> ERROR - 2016-05-14 20:00:53.892; [c:triple s:shard3 r:core_node2
> >>> x:triple_shard3_replica2] org.apache.solr.common.SolrException;
> >>> null:java.io.IOException: java.util.concurrent.TimeoutException: Idle
> >>> timeout expired: 50001/50000 ms
> >>>
> >>> at
> >>>
> >>>
> org.eclipse.jetty.util.SharedBlockingCallback$Blocker.block(SharedBlockingCallback.java:226)
> >>>
> >>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:164)
> >>>
> >>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:530)
> >>>
> >>> at
> >>>
> >>>
> org.apache.solr.response.QueryResponseWriterUtil$1.write(QueryResponseWriterUtil.java:54)
> >>>
> >>> at java.io.OutputStream.write(OutputStream.java:116)
> >>>
> >>> at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> >>>
> >>> at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> >>>
> >>> at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> >>>
> >>> at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
> >>>
> >>> at org.apache.solr.util.FastWriter.flush(FastWriter.java:140)
> >>>
> >>> at org.apache.solr.util.FastWriter.write(FastWriter.java:54)
> >>>
> >>> at
> >>>
> >>>
> org.apache.solr.response.JSONWriter.writeMapCloser(JSONResponseWriter.java:420)
> >>>
> >>> at
> >>>
> >>>
> org.apache.solr.response.JSONWriter.writeSolrDocument(JSONResponseWriter.java:364)
> >>>
> >>> at
> >>>
> >>>
> org.apache.solr.response.TextResponseWriter.writeDocuments(TextResponseWriter.java:246)
> >>>
> >>> at
> >>>
> >>>
> org.apache.solr.response.TextResponseWriter.writeVal(TextResponseWriter.java:150)
> >>>
> >>> at
> >>>
> >>>
> org.apache.solr.response.JSONWriter.writeNamedListAsMapWithDups(JSONResponseWriter.java:183)
> >>>
> >>> On Fri, May 13, 2016 at 5:50 PM, Joel Bernstein <joelsolr@gmail.com>
> >>> wrote:
> >>>
> >>> > Also the hashJoin is going to read the entire entity table into
> >>> memory. If
> >>> > that's a large index that could be using lots of memory.
> >>> >
> >>> > 25 million docs should be ok to /export from one node, as long as you
> >>> have
> >>> > enough memory to load the docValues for the fields for sorting and
> >>> > exporting.
> >>> >
> >>> > Breaking down the query into it's parts will show where the issue is.
> >>> Also
> >>> > adding more heap might give you enough memory.
> >>> >
> >>> > In my testing the max docs per second I've seen the /export handler
> >>> push
> >>> > from a single node is 650,000. In order to get 650,000 docs per
> second
> >>> on
> >>> > one node you have to partition the stream with workers. In my testing
> >>> it
> >>> > took 8 workers hitting one node to achieve the 650,000 docs per
> second.
> >>> >
> >>> > But the numbers get big as the cluster grows. With 20 shards and 4
> >>> replicas
> >>> > and 32 workers, you could export 52,000,000 docs per-second. With 40
> >>> > shards, 5 replicas and 40 workers you could export 130,000,000 docs
> per
> >>> > second.
> >>> >
> >>> > So with large clusters you could do very large distributed joins with
> >>> > sub-second performance.
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > Joel Bernstein
> >>> > http://joelsolr.blogspot.com/
> >>> >
> >>> > On Fri, May 13, 2016 at 8:11 PM, Ryan Cutter <ryancutter@gmail.com>
> >>> wrote:
> >>> >
> >>> > > Thanks very much for the advice.  Yes, I'm running in a very basic
> >>> single
> >>> > > shard environment.  I thought that 25M docs was small enough to
not
> >>> > require
> >>> > > anything special but I will try scaling like you suggest and let
> you
> >>> know
> >>> > > what happens.
> >>> > >
> >>> > > Cheers, Ryan
> >>> > >
> >>> > > On Fri, May 13, 2016 at 4:53 PM, Joel Bernstein <
> joelsolr@gmail.com>
> >>> > > wrote:
> >>> > >
> >>> > > > I would try breaking down the second query to see when the
> problems
> >>> > > occur.
> >>> > > >
> >>> > > > 1) Start with just a single *:* search from one of the
> collections.
> >>> > > > 2) Then test the innerJoin. The innerJoin won't take much
memory
> as
> >>> > it's
> >>> > > a
> >>> > > > streaming merge join.
> >>> > > > 3) Then try the full thing.
> >>> > > >
> >>> > > > If you're running a large join like this all on one host
then you
> >>> might
> >>> > > not
> >>> > > > have enough memory for the docValues and the two joins. In
> general
> >>> > > > streaming is designed to scale by adding servers. It scales
3
> ways:
> >>> > > >
> >>> > > > 1) Adding shards, splits up the index for more pushing power.
> >>> > > > 2) Adding workers, partitions the streams and splits up the
join
> /
> >>> > merge
> >>> > > > work.
> >>> > > > 3) Adding replicas, when you have workers you will add pushing
> >>> power by
> >>> > > > adding replicas. This is because workers will fetch partitions
of
> >>> the
> >>> > > > streams from across the entire cluster. So ALL replicas will
be
> >>> pushing
> >>> > > at
> >>> > > > once.
> >>> > > >
> >>> > > > So, imagine a setup with 20 shards, 4 replicas, and 20 workers.
> >>> You can
> >>> > > > perform massive joins quickly.
> >>> > > >
> >>> > > > But for you're scenario and available hardware you can experiment
> >>> with
> >>> > > > different cluster sizes.
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > Joel Bernstein
> >>> > > > http://joelsolr.blogspot.com/
> >>> > > >
> >>> > > > On Fri, May 13, 2016 at 7:27 PM, Ryan Cutter <
> ryancutter@gmail.com
> >>> >
> >>> > > wrote:
> >>> > > >
> >>> > > > > qt="/export" immediately fixed the query in Question
#1.  Sorry
> >>> for
> >>> > > > missing
> >>> > > > > that in the docs!
> >>> > > > >
> >>> > > > > The second query (with /export) crashes the server so
I was
> >>> going to
> >>> > > look
> >>> > > > > at parallelization if you think that's a good idea.
 It also
> >>> seems
> >>> > > unwise
> >>> > > > > to joining into 26M docs so maybe I can reconfigure
the query
> to
> >>> run
> >>> > > > along
> >>> > > > > a more happy path :-)  The schema is very RDBMS-centric
so
> maybe
> >>> that
> >>> > > > just
> >>> > > > > won't ever work in this framework.
> >>> > > > >
> >>> > > > > Here's the log but it's not very helpful.
> >>> > > > >
> >>> > > > >
> >>> > > > > INFO  - 2016-05-13 23:18:13.214; [c:triple s:shard1
> r:core_node1
> >>> > > > > x:triple_shard1_replica1] org.apache.solr.core.SolrCore;
> >>> > > > > [triple_shard1_replica1]  webapp=/solr path=/export
> >>> > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> params={q=*:*&distrib=false&fl=triple_id,subject_id,type_id&sort=type_id+asc&wt=json&version=2.2}
> >>> > > > > hits=26305619 status=0 QTime=61
> >>> > > > >
> >>> > > > > INFO  - 2016-05-13 23:18:13.747; [c:triple_type s:shard1
> >>> r:core_node1
> >>> > > > > x:triple_type_shard1_replica1] org.apache.solr.core.SolrCore;
> >>> > > > > [triple_type_shard1_replica1]  webapp=/solr path=/export
> >>> > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> params={q=*:*&distrib=false&fl=triple_type_id,triple_type_label&sort=triple_type_id+asc&wt=json&version=2.2}
> >>> > > > > hits=702 status=0 QTime=2
> >>> > > > >
> >>> > > > > INFO  - 2016-05-13 23:18:48.504; [   ]
> >>> > > > > org.apache.solr.common.cloud.ConnectionManager; Watcher
> >>> > > > > org.apache.solr.common.cloud.ConnectionManager@6ad0f304
> >>> > > > > name:ZooKeeperConnection Watcher:localhost:9983 got
event
> >>> > WatchedEvent
> >>> > > > > state:Disconnected type:None path:null path:null type:None
> >>> > > > >
> >>> > > > > INFO  - 2016-05-13 23:18:48.504; [   ]
> >>> > > > > org.apache.solr.common.cloud.ConnectionManager; zkClient
has
> >>> > > disconnected
> >>> > > > >
> >>> > > > > ERROR - 2016-05-13 23:18:51.316; [c:triple s:shard1
> r:core_node1
> >>> > > > > x:triple_shard1_replica1] org.apache.solr.common.SolrException;
> >>> > > > null:Early
> >>> > > > > Client Disconnect
> >>> > > > >
> >>> > > > > WARN  - 2016-05-13 23:18:51.431; [   ]
> >>> > > > > org.apache.zookeeper.ClientCnxn$SendThread; Session
> >>> 0x154ac66c81e0002
> >>> > > for
> >>> > > > > server localhost/0:0:0:0:0:0:0:1:9983, unexpected error,
> closing
> >>> > socket
> >>> > > > > connection and attempting reconnect
> >>> > > > >
> >>> > > > > java.io.IOException: Connection reset by peer
> >>> > > > >
> >>> > > > >         at sun.nio.ch.FileDispatcherImpl.read0(Native
Method)
> >>> > > > >
> >>> > > > >         at
> >>> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> >>> > > > >
> >>> > > > >         at
> >>> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> >>> > > > >
> >>> > > > >         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> >>> > > > >
> >>> > > > >         at
> >>> > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> >>> > > > >
> >>> > > > >         at
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
> >>> > > > >
> >>> > > > >         at
> >>> > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
> >>> > > > >
> >>> > > > >         at
> >>> > > > >
> >>> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> >>> > > > >
> >>> > > > > On Fri, May 13, 2016 at 3:09 PM, Joel Bernstein <
> >>> joelsolr@gmail.com>
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > A couple of other things:
> >>> > > > > >
> >>> > > > > > 1) Your innerJoin can parallelized across workers
to improve
> >>> > > > performance.
> >>> > > > > > Take a look at the docs on the parallel function
for the
> >>> details.
> >>> > > > > >
> >>> > > > > > 2) It looks like you might be doing graph operations
with
> >>> joins.
> >>> > You
> >>> > > > > might
> >>> > > > > > to take a look at the gatherNodes function coming
in 6.1:
> >>> > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62693238
> >>> > > > > >
> >>> > > > > > Joel Bernstein
> >>> > > > > > http://joelsolr.blogspot.com/
> >>> > > > > >
> >>> > > > > > On Fri, May 13, 2016 at 5:57 PM, Joel Bernstein
<
> >>> > joelsolr@gmail.com>
> >>> > > > > > wrote:
> >>> > > > > >
> >>> > > > > > > When doing things that require all the results
(like joins)
> >>> you
> >>> > > need
> >>> > > > to
> >>> > > > > > > specify the /export handler in the search
function.
> >>> > > > > > >
> >>> > > > > > > qt="/export"
> >>> > > > > > >
> >>> > > > > > > The search function defaults to the /select
handler which
> is
> >>> > > designed
> >>> > > > > to
> >>> > > > > > > return the top N results. The /export handler
always
> returns
> >>> all
> >>> > > > > results
> >>> > > > > > > that match the query. Also keep in mind that
the /export
> >>> handler
> >>> > > > > requires
> >>> > > > > > > that sort fields and fl fields have docValues
set.
> >>> > > > > > >
> >>> > > > > > > Joel Bernstein
> >>> > > > > > > http://joelsolr.blogspot.com/
> >>> > > > > > >
> >>> > > > > > > On Fri, May 13, 2016 at 5:36 PM, Ryan Cutter
<
> >>> > ryancutter@gmail.com
> >>> > > >
> >>> > > > > > wrote:
> >>> > > > > > >
> >>> > > > > > >> Question #1:
> >>> > > > > > >>
> >>> > > > > > >> triple_type collection has a few hundred
docs and triple
> >>> has 25M
> >>> > > > docs.
> >>> > > > > > >>
> >>> > > > > > >> When I search for a particular subject_id
in triple which
> I
> >>> know
> >>> > > has
> >>> > > > > 14
> >>> > > > > > >> results and do not pass in 'rows' params,
it returns 0
> >>> results:
> >>> > > > > > >>
> >>> > > > > > >> innerJoin(
> >>> > > > > > >>     search(triple, q=subject_id:1656521,
> >>> > > > > > >> fl="triple_id,subject_id,type_id",
> >>> > > > > > >> sort="type_id asc"),
> >>> > > > > > >>     search(triple_type, q=*:*,
> >>> > > > fl="triple_type_id,triple_type_label",
> >>> > > > > > >> sort="triple_type_id asc"),
> >>> > > > > > >>     on="type_id=triple_type_id"
> >>> > > > > > >> )
> >>> > > > > > >>
> >>> > > > > > >> When I do the same search with rows=10000,
it returns 14
> >>> > results:
> >>> > > > > > >>
> >>> > > > > > >> innerJoin(
> >>> > > > > > >>     search(triple, q=subject_id:1656521,
> >>> > > > > > >> fl="triple_id,subject_id,type_id",
> >>> > > > > > >> sort="type_id asc", rows=10000),
> >>> > > > > > >>     search(triple_type, q=*:*,
> >>> > > > fl="triple_type_id,triple_type_label",
> >>> > > > > > >> sort="triple_type_id asc", rows=10000),
> >>> > > > > > >>     on="type_id=triple_type_id"
> >>> > > > > > >> )
> >>> > > > > > >>
> >>> > > > > > >> Am I doing this right?  Is there a magic
number to pass
> into
> >>> > rows
> >>> > > > > which
> >>> > > > > > >> says "give me all the results which match
this query"?
> >>> > > > > > >>
> >>> > > > > > >>
> >>> > > > > > >> Question #2:
> >>> > > > > > >>
> >>> > > > > > >> Perhaps related to the first question
but I want to run
> the
> >>> > > > > innerJoin()
> >>> > > > > > >> without the subject_id - rather have it
use the results of
> >>> > another
> >>> > > > > > query.
> >>> > > > > > >> But this does not return any results.
 I'm saying "search
> >>> for
> >>> > this
> >>> > > > > > entity
> >>> > > > > > >> based on id then use that result's entity_id
as the
> >>> subject_id
> >>> > to
> >>> > > > look
> >>> > > > > > >> through the triple/triple_type collections:
> >>> > > > > > >>
> >>> > > > > > >> hashJoin(
> >>> > > > > > >>     innerJoin(
> >>> > > > > > >>         search(triple, q=*:*,
> >>> fl="triple_id,subject_id,type_id",
> >>> > > > > > >> sort="type_id asc"),
> >>> > > > > > >>         search(triple_type, q=*:*,
> >>> > > > > > fl="triple_type_id,triple_type_label",
> >>> > > > > > >> sort="triple_type_id asc"),
> >>> > > > > > >>         on="type_id=triple_type_id"
> >>> > > > > > >>     ),
> >>> > > > > > >>     hashed=search(entity,
> >>> > > > > > >>
> >>> q=id:"urn:sid:entity:455dfa1aa27eedad21ac2115797c1580bb3b3b4e",
> >>> > > > > > >> fl="entity_id,entity_label", sort="entity_id
asc"),
> >>> > > > > > >>     on="subject_id=entity_id"
> >>> > > > > > >> )
> >>> > > > > > >>
> >>> > > > > > >> Am I using doing this hashJoin right?
> >>> > > > > > >>
> >>> > > > > > >> Thanks very much, Ryan
> >>> > > > > > >>
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message