lucene-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ethan Tao (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SOLR-2694) LogUpdateProcessor not thread safe
Date Mon, 21 May 2012 18:57:41 GMT

    [ https://issues.apache.org/jira/browse/SOLR-2694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13280359#comment-13280359
] 

Ethan Tao commented on SOLR-2694:
---------------------------------

Hi, we are having the same issue with latest snapshot. 
When updating the same set of docs repeatedly, we can easily reproduce it. We also have 10
secs commit in our env.
I checked the code with LogUpdateProcessor, the code is not thread safe with "toLog" object
as concurrent threads may be calling other methods when finish() is called. The problem is
when "sb.append(toLog)" is called, StringBuilder starts to use namedlist to build the string
object. If other methods are called to modify the objects reside within "toLog", the exception
will be triggered. These objects include deletes, adds, etc. Also, the keys to these object
have 7 different types (commit/rollback, ..etc.), any size change to the SimpleOrderedMap
(i.e., toLog) object may also trigger the issue during the call to "sb.append(toLog)" in finish().

We did quick experiment by modifying the LogUpdateProcessorFactory file to synchronize on
toLog in all methods. Then change our chain definition in solrconfig.xml to call our own LogUpdateProcessor
instead. The issue went away. I am attaching the custom code below for your reference.

Can someone please re-open and fix the issue? 
Thanks.

-Ethan 

class LogUpdateProcessor extends UpdateRequestProcessor {

...

    @Override
    public void processAdd(AddUpdateCommand cmd) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }

        // call delegate first so we can log things like the version that get set later
        if (next != null) next.processAdd(cmd);

        synchronized (toLog) {

            // Add a list of added id's to the response
            if (adds == null) {
                adds = new ArrayList<String>();
                toLog.add("add",adds);
            }

            if (adds.size() < maxNumToLog) {
                long version = cmd.getVersion();
                String msg = cmd.getPrintableId();
                if (version != 0) msg = msg + " (" + version + ')';
                adds.add(msg);
            }


            numAdds++;

        }
    }

    @Override
    public void processDelete( DeleteUpdateCommand cmd ) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }
        if (next != null) next.processDelete(cmd);

        synchronized (toLog) {
            if (cmd.isDeleteById()) {
                if (deletes == null) {
                    deletes = new ArrayList<String>();
                    toLog.add("delete",deletes);
                }
                if (deletes.size() < maxNumToLog) {
                    long version = cmd.getVersion();
                    String msg = cmd.getId();
                    if (version != 0) msg = msg + " (" + version + ')';
                    deletes.add(msg);
                }
            } else {
                if (toLog.size() < maxNumToLog) {
                    long version = cmd.getVersion();
                    String msg = cmd.query;
                    if (version != 0) msg = msg + " (" + version + ')';
                    toLog.add("deleteByQuery", msg);
                }
            }
            numDeletes++;
        }

    }

    @Override
    public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }
        if (next != null) next.processMergeIndexes(cmd);

        synchronized (toLog) {
            toLog.add("mergeIndexes", cmd.toString());
        }
    }

    @Override
    public void processCommit( CommitUpdateCommand cmd ) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }
        if (next != null) next.processCommit(cmd);


        synchronized (toLog) {
            final String msg = cmd.optimize ? "optimize" : "commit";
            toLog.add(msg, "");
        }
    }

    /**
     * @since Solr 1.4
     */
    @Override
    public void processRollback( RollbackUpdateCommand cmd ) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }
        if (next != null) next.processRollback(cmd);

        synchronized (toLog) {
            toLog.add("rollback", "");
        }
    }


    @Override
    public void finish() throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE finish()"); }
        if (next != null) next.finish();

        // LOG A SUMMARY WHEN ALL DONE (INFO LEVEL)

        NamedList<Object> stdLog = rsp.getToLog();

        StringBuilder sb = new StringBuilder(req.getCore().getLogId());

        for (int i=0; i<stdLog.size(); i++) {
            String name = stdLog.getName(i);
            Object val = stdLog.getVal(i);
            if (name != null) {
                sb.append(name).append('=');
            }
            sb.append(val).append(' ');
        }

        stdLog.clear();   // make it so SolrCore.exec won't log this again

        synchronized (toLog) {

            // if id lists were truncated, show how many more there were
            if (adds != null && numAdds > maxNumToLog) {
                adds.add("... (" + numAdds + " adds)");
            }
            if (deletes != null && numDeletes > maxNumToLog) {
                deletes.add("... (" + numDeletes + " deletes)");
            }
            long elapsed = rsp.getEndTime() - req.getStartTime();

            sb.append(toLog).append(" 0 ").append(elapsed);

        }

        log.info(sb.toString());
    }
}
                
> LogUpdateProcessor not thread safe
> ----------------------------------
>
>                 Key: SOLR-2694
>                 URL: https://issues.apache.org/jira/browse/SOLR-2694
>             Project: Solr
>          Issue Type: Bug
>          Components: update
>    Affects Versions: 1.4.1, 3.1, 3.2, 3.3, 4.0
>            Reporter: Jan H√łydahl
>
> Using the LogUpdateProcessor while feeding in multiple parallell threads does not work,
as LogUpdateProcessor is not threadsafe.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org


Mime
View raw message