lucene-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Muir <rcm...@gmail.com>
Subject Re: svn commit: r1303792 [1/2] - in /lucene/dev/branches/branch_3x/solr: contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/ contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ contrib/dataimporthandler/sr
Date Thu, 22 Mar 2012 14:17:02 GMT
Can we add apache license headers to these files? I think I already
added them to trunk.

On Thu, Mar 22, 2012 at 10:11 AM,  <jdyer@apache.org> wrote:
> Author: jdyer
> Date: Thu Mar 22 14:11:16 2012
> New Revision: 1303792
>
> URL: http://svn.apache.org/viewvc?rev=1303792&view=rev
> Log:
> SOLR-2382: Framework for Pluggable caches
>
> Added:
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SortedMapBackedCache.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractDIHCacheTestCase.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/DestroyCountCache.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEphemeralCache.java   (with props)
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSortedMapBackedCache.java   (with props)
>    lucene/dev/branches/branch_3x/solr/webapp/web/WEB-INF/jboss-web.xml   (with props)
> Modified:
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ThreadedContext.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test-files/dih/solr/conf/dataimport-schema.xml
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestCachedSqlEntityProcessor.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContextImpl.java
>    lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilder.java
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java Thu Mar 22 14:11:16 2012
> @@ -188,7 +188,7 @@ public class TestMailEntityProcessor ext
>     Boolean commitCalled;
>
>     public SolrWriterImpl() {
> -      super(null, ".");
> +      super(null);
>     }
>
>     @Override
> @@ -197,11 +197,6 @@ public class TestMailEntityProcessor ext
>     }
>
>     @Override
> -    public void log(int event, String name, Object row) {
> -      // Do nothing
> -    }
> -
> -    @Override
>     public void doDeleteAll() {
>       deleteAllCalled = Boolean.TRUE;
>     }
>
> Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java?rev=1303792&view=auto
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java (added)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java Thu Mar 22 14:11:16 2012
> @@ -0,0 +1,32 @@
> +package org.apache.solr.handler.dataimport;
> +
> +public class CachePropertyUtil {
> +  public static String getAttributeValueAsString(Context context, String attr) {
> +    Object o = context.getSessionAttribute(attr, Context.SCOPE_ENTITY);
> +    if (o == null) {
> +      o = context.getResolvedEntityAttribute(attr);
> +    }
> +    if (o == null && context.getRequestParameters() != null) {
> +      o = context.getRequestParameters().get(attr);
> +    }
> +    if (o == null) {
> +      return null;
> +    }
> +    return o.toString();
> +  }
> +
> +  public static Object getAttributeValue(Context context, String attr) {
> +    Object o = context.getSessionAttribute(attr, Context.SCOPE_ENTITY);
> +    if (o == null) {
> +      o = context.getResolvedEntityAttribute(attr);
> +    }
> +    if (o == null && context.getRequestParameters() != null) {
> +      o = context.getRequestParameters().get(attr);
> +    }
> +    if (o == null) {
> +      return null;
> +    }
> +    return o;
> +  }
> +
> +}
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java Thu Mar 22 14:11:16 2012
> @@ -16,67 +16,27 @@
>  */
>  package org.apache.solr.handler.dataimport;
>
> -import java.util.ArrayList;
> -import java.util.List;
> -import java.util.Map;
> -
>  /**
>  * This class enables caching of data obtained from the DB to avoid too many sql
>  * queries
>  * <p/>
>  * <p>
>  * Refer to <a
> - * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
> - * for more details.
> + * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache
> + * .org/solr/DataImportHandler</a> for more details.
>  * </p>
>  * <p/>
>  * <b>This API is experimental and subject to change</b>
> - *
> + *
>  * @version $Id$
>  * @since solr 1.3
> + * @deprecated - Use SqlEntityProcessor with cacheImpl parameter.
>  */
> +@Deprecated
>  public class CachedSqlEntityProcessor extends SqlEntityProcessor {
> -  private boolean isFirst;
> -
> -  @Override
> -  @SuppressWarnings("unchecked")
> -  public void init(Context context) {
> -    super.init(context);
> -    super.cacheInit();
> -    isFirst = true;
> -  }
> -
> -  @Override
> -  public Map<String, Object> nextRow() {
> -    if (dataSourceRowCache != null)
> -      return getFromRowCacheTransformed();
> -    if (!isFirst)
> -      return null;
> -    String query = context.replaceTokens(context.getEntityAttribute("query"));
> -    isFirst = false;
> -    if (simpleCache != null) {
> -      return getSimpleCacheData(query);
> -    } else {
> -      return getIdCacheData(query);
> +    @Override
> +    protected void initCache(Context context) {
> +      cacheSupport = new DIHCacheSupport(context, "SortedMapBackedCache");
>     }
>
> -  }
> -
> -  @Override
> -  protected List<Map<String, Object>> getAllNonCachedRows() {
> -    List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
> -    String q = getQuery();
> -    initQuery(context.replaceTokens(q));
> -    if (rowIterator == null)
> -      return rows;
> -    while (rowIterator.hasNext()) {
> -      Map<String, Object> arow = rowIterator.next();
> -      if (arow == null) {
> -        break;
> -      } else {
> -        rows.add(arow);
> -      }
> -    }
> -    return rows;
> -  }
>  }
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java Thu Mar 22 14:11:16 2012
> @@ -101,7 +101,7 @@ public class ContextImpl extends Context
>     if (entity.dataSrc != null && docBuilder != null && docBuilder.verboseDebug &&
>              Context.FULL_DUMP.equals(currentProcess())) {
>       //debug is not yet implemented properly for deltas
> -      entity.dataSrc = docBuilder.writer.getDebugLogger().wrapDs(entity.dataSrc);
> +      entity.dataSrc = docBuilder.getDebugLogger().wrapDs(entity.dataSrc);
>     }
>     return entity.dataSrc;
>   }
>
> Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java?rev=1303792&view=auto
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java (added)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java Thu Mar 22 14:11:16 2012
> @@ -0,0 +1,108 @@
> +package org.apache.solr.handler.dataimport;
> +
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +import java.util.Iterator;
> +import java.util.Map;
> +
> +/**
> + * <p>
> + * A cache that allows a DIH entity's data to persist locally prior being joined
> + * to other data and/or indexed.
> + * </p>
> + *
> + * @lucene.experimental
> + */
> +public interface DIHCache extends Iterable<Map<String,Object>> {
> +
> +  /**
> +   * <p>
> +   * Opens the cache using the specified properties. The {@link Context}
> +   * includes any parameters needed by the cache impl. This must be called
> +   * before any read/write operations are permitted.
> +   * <p>
> +   */
> +  public void open(Context context);
> +
> +  /**
> +   * <p>
> +   * Releases resources used by this cache, if possible. The cache is flushed
> +   * but not destroyed.
> +   * </p>
> +   */
> +  public void close();
> +
> +  /**
> +   * <p>
> +   * Persists any pending data to the cache
> +   * </p>
> +   */
> +  public void flush();
> +
> +  /**
> +   * <p>
> +   * Closes the cache, if open. Then removes all data, possibly removing the
> +   * cache entirely from persistent storage.
> +   * </p>
> +   */
> +  public void destroy();
> +
> +  /**
> +   * <p>
> +   * Adds a document. If a document already exists with the same key, both
> +   * documents will exist in the cache, as the cache allows duplicate keys. To
> +   * update a key's documents, first call delete(Object key).
> +   * </p>
> +   *
> +   * @param rec
> +   */
> +  public void add(Map<String,Object> rec);
> +
> +  /**
> +   * <p>
> +   * Returns an iterator, allowing callers to iterate through the entire cache
> +   * in key, then insertion, order.
> +   * </p>
> +   */
> +  public Iterator<Map<String,Object>> iterator();
> +
> +  /**
> +   * <p>
> +   * Returns an iterator, allowing callers to iterate through all documents that
> +   * match the given key in insertion order.
> +   * </p>
> +   */
> +  public Iterator<Map<String,Object>> iterator(Object key);
> +
> +  /**
> +   * <p>
> +   * Delete all documents associated with the given key
> +   * </p>
> +   *
> +   * @param key
> +   */
> +  public void delete(Object key);
> +
> +  /**
> +   * <p>
> +   * Delete all data from the cache,leaving the empty cache intact.
> +   * </p>
> +   */
> +  public void deleteAll();
> +
> +}
>
> Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java?rev=1303792&view=auto
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java (added)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java Thu Mar 22 14:11:16 2012
> @@ -0,0 +1,250 @@
> +package org.apache.solr.handler.dataimport;
> +
> +import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
> +
> +import java.lang.reflect.Constructor;
> +import java.util.ArrayList;
> +import java.util.HashMap;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map;
> +
> +import org.apache.solr.common.SolrException;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class DIHCacheSupport {
> +  private static final Logger log = LoggerFactory
> +      .getLogger(DIHCacheSupport.class);
> +  private String cacheForeignKey;
> +  private String cacheImplName;
> +  private Map<String,DIHCache> queryVsCache = new HashMap<String,DIHCache>();
> +  private Map<String,Iterator<Map<String,Object>>> queryVsCacheIterator;
> +  private Iterator<Map<String,Object>> dataSourceRowCache;
> +  private boolean cacheDoKeyLookup;
> +
> +  public DIHCacheSupport(Context context, String cacheImplName) {
> +    this.cacheImplName = cacheImplName;
> +
> +    String where = context.getEntityAttribute("where");
> +    String cacheKey = context.getEntityAttribute(DIHCacheSupport.CACHE_PRIMARY_KEY);
> +    String lookupKey = context.getEntityAttribute(DIHCacheSupport.CACHE_FOREIGN_KEY);
> +    if (cacheKey != null && lookupKey == null) {
> +      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
> +          "'cacheKey' is specified for the entity "
> +              + context.getEntityAttribute("name")
> +              + " but 'cacheLookup' is missing");
> +
> +    }
> +    if (where == null && cacheKey == null) {
> +      cacheDoKeyLookup = false;
> +    } else {
> +      if (where != null) {
> +        String[] splits = where.split("=");
> +        cacheKey = splits[0];
> +        cacheForeignKey = splits[1].trim();
> +      } else {
> +        cacheForeignKey = lookupKey;
> +      }
> +      cacheDoKeyLookup = true;
> +    }
> +    context.setSessionAttribute(DIHCacheSupport.CACHE_PRIMARY_KEY, cacheKey,
> +        Context.SCOPE_ENTITY);
> +    context.setSessionAttribute(DIHCacheSupport.CACHE_FOREIGN_KEY, cacheForeignKey,
> +        Context.SCOPE_ENTITY);
> +    context.setSessionAttribute(DIHCacheSupport.CACHE_DELETE_PRIOR_DATA,
> +        "true", Context.SCOPE_ENTITY);
> +    context.setSessionAttribute(DIHCacheSupport.CACHE_READ_ONLY, "false",
> +        Context.SCOPE_ENTITY);
> +  }
> +
> +  private DIHCache instantiateCache(Context context) {
> +    DIHCache cache = null;
> +    try {
> +      @SuppressWarnings("unchecked")
> +      Class<DIHCache> cacheClass = DocBuilder.loadClass(cacheImplName, context
> +          .getSolrCore());
> +      Constructor<DIHCache> constr = cacheClass.getConstructor();
> +      cache = constr.newInstance();
> +      cache.open(context);
> +    } catch (Exception e) {
> +      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
> +          "Unable to load Cache implementation:" + cacheImplName, e);
> +    }
> +    return cache;
> +  }
> +
> +  public void initNewParent(Context context) {
> +    queryVsCacheIterator = new HashMap<String,Iterator<Map<String,Object>>>();
> +    for (Map.Entry<String,DIHCache> entry : queryVsCache.entrySet()) {
> +      queryVsCacheIterator.put(entry.getKey(), entry.getValue().iterator());
> +    }
> +  }
> +
> +  public void destroyAll() {
> +    if (queryVsCache != null) {
> +      for (DIHCache cache : queryVsCache.values()) {
> +        cache.destroy();
> +      }
> +    }
> +    queryVsCache = null;
> +    dataSourceRowCache = null;
> +    cacheForeignKey = null;
> +  }
> +
> +  /**
> +   * <p>
> +   * Get all the rows from the datasource for the given query and cache them
> +   * </p>
> +   */
> +  public void populateCache(String query,
> +      Iterator<Map<String,Object>> rowIterator) {
> +    Map<String,Object> aRow = null;
> +    DIHCache cache = queryVsCache.get(query);
> +    while ((aRow = getNextFromCache(query, rowIterator)) != null) {
> +      cache.add(aRow);
> +    }
> +  }
> +
> +  private Map<String,Object> getNextFromCache(String query,
> +      Iterator<Map<String,Object>> rowIterator) {
> +    try {
> +      if (rowIterator == null) return null;
> +      if (rowIterator.hasNext()) return rowIterator.next();
> +      return null;
> +    } catch (Exception e) {
> +      SolrException.log(log, "getNextFromCache() failed for query '" + query
> +          + "'", e);
> +      wrapAndThrow(DataImportHandlerException.WARN, e);
> +      return null;
> +    }
> +  }
> +
> +  public Map<String,Object> getCacheData(Context context, String query,
> +      Iterator<Map<String,Object>> rowIterator) {
> +    if (cacheDoKeyLookup) {
> +      return getIdCacheData(context, query, rowIterator);
> +    } else {
> +      return getSimpleCacheData(context, query, rowIterator);
> +    }
> +  }
> +
> +  /**
> +   * If the where clause is present the cache is sql Vs Map of key Vs List of
> +   * Rows.
> +   *
> +   * @param query
> +   *          the query string for which cached data is to be returned
> +   *
> +   * @return the cached row corresponding to the given query after all variables
> +   *         have been resolved
> +   */
> +  protected Map<String,Object> getIdCacheData(Context context, String query,
> +      Iterator<Map<String,Object>> rowIterator) {
> +    Object key = context.resolve(cacheForeignKey);
> +    if (key == null) {
> +      throw new DataImportHandlerException(DataImportHandlerException.WARN,
> +          "The cache lookup value : " + cacheForeignKey
> +              + " is resolved to be null in the entity :"
> +              + context.getEntityAttribute("name"));
> +
> +    }
> +    DIHCache cache = queryVsCache.get(query);
> +    if (cache == null) {
> +      cache = instantiateCache(context);
> +      queryVsCache.put(query, cache);
> +      populateCache(query, rowIterator);
> +    }
> +    if (dataSourceRowCache == null) {
> +      dataSourceRowCache = cache.iterator(key);
> +    }
> +    if (dataSourceRowCache == null) {
> +      return null;
> +    }
> +    return getFromRowCacheTransformed();
> +  }
> +
> +  /**
> +   * If where clause is not present the cache is a Map of query vs List of Rows.
> +   *
> +   * @param query
> +   *          string for which cached row is to be returned
> +   *
> +   * @return the cached row corresponding to the given query
> +   */
> +  protected Map<String,Object> getSimpleCacheData(Context context,
> +      String query, Iterator<Map<String,Object>> rowIterator) {
> +    DIHCache cache = queryVsCache.get(query);
> +    if (cache == null) {
> +      cache = instantiateCache(context);
> +      queryVsCache.put(query, cache);
> +      populateCache(query, rowIterator);
> +      queryVsCacheIterator.put(query, cache.iterator());
> +    }
> +    if (dataSourceRowCache == null || !dataSourceRowCache.hasNext()) {
> +      dataSourceRowCache = null;
> +      Iterator<Map<String,Object>> cacheIter = queryVsCacheIterator.get(query);
> +      if (cacheIter.hasNext()) {
> +        List<Map<String,Object>> dsrcl = new ArrayList<Map<String,Object>>(1);
> +        dsrcl.add(cacheIter.next());
> +        dataSourceRowCache = dsrcl.iterator();
> +      }
> +    }
> +    if (dataSourceRowCache == null) {
> +      return null;
> +    }
> +    return getFromRowCacheTransformed();
> +  }
> +
> +  protected Map<String,Object> getFromRowCacheTransformed() {
> +    if (dataSourceRowCache == null || !dataSourceRowCache.hasNext()) {
> +      dataSourceRowCache = null;
> +      return null;
> +    }
> +    Map<String,Object> r = dataSourceRowCache.next();
> +    return r;
> +  }
> +
> +  /**
> +   * <p>
> +   * Specify the class for the cache implementation
> +   * </p>
> +   */
> +  public static final String CACHE_IMPL = "cacheImpl";
> +
> +  /**
> +   * <p>
> +   * If the cache supports persistent data, set to "true" to delete any prior
> +   * persisted data before running the entity.
> +   * </p>
> +   */
> +
> +  public static final String CACHE_DELETE_PRIOR_DATA = "cacheDeletePriorData";
> +  /**
> +   * <p>
> +   * Specify the Foreign Key from the parent entity to join on. Use if the cache
> +   * is on a child entity.
> +   * </p>
> +   */
> +  public static final String CACHE_FOREIGN_KEY = "cacheLookup";
> +
> +
> +
> +  /**
> +   * <p>
> +   * Specify the Primary Key field from this Entity to map the input records
> +   * with
> +   * </p>
> +   */
> +  public static final String CACHE_PRIMARY_KEY = "cachePk";
> +  /**
> +   * <p>
> +   * If true, a pre-existing cache is re-opened for read-only access.
> +   * </p>
> +   */
> +  public static final String CACHE_READ_ONLY = "cacheReadOnly";
> +
> +
> +
> +
> +}
>
> Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java?rev=1303792&view=auto
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java (added)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java Thu Mar 22 14:11:16 2012
> @@ -0,0 +1,5 @@
> +package org.apache.solr.handler.dataimport;
> +
> +public enum DIHLogLevels {
> +       START_ENTITY, END_ENTITY, TRANSFORMED_ROW, ENTITY_META, PRE_TRANSFORMER_ROW, START_DOC, END_DOC, ENTITY_OUT, ROW_END, TRANSFORMER_EXCEPTION, ENTITY_EXCEPTION, DISABLE_LOGGING, ENABLE_LOGGING, NONE
> +}
>
> Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java?rev=1303792&view=auto
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java (added)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java Thu Mar 22 14:11:16 2012
> @@ -0,0 +1,34 @@
> +package org.apache.solr.handler.dataimport;
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +import java.io.File;
> +import java.util.Properties;
> +
> +/**
> + *
> + */
> +public interface DIHPropertiesWriter {
> +
> +    public void init(DataImporter dataImporter);
> +
> +    public boolean isWritable();
> +
> +       public void persist(Properties props);
> +
> +       public Properties readIndexerProperties();
> +
> +}
>
> Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java?rev=1303792&view=auto
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java (added)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java Thu Mar 22 14:11:16 2012
> @@ -0,0 +1,105 @@
> +package org.apache.solr.handler.dataimport;
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +import java.util.Map;
> +import java.util.Set;
> +
> +import org.apache.solr.common.SolrInputDocument;
> +
> +/**
> + * @solr.experimental
> + *
> + */
> +public interface DIHWriter {
> +
> +       /**
> +        * <p>
> +        *  If this writer supports transactions or commit points, then commit any changes,
> +        *  optionally optimizing the data for read/write performance
> +        * </p>
> +        * @param optimize
> +        */
> +       public void commit(boolean optimize);
> +
> +       /**
> +        * <p>
> +        *  Release resources used by this writer.  After calling close, reads & updates will throw exceptions.
> +        * </p>
> +        */
> +       public void close();
> +
> +       /**
> +        * <p>
> +        *  If this writer supports transactions or commit points, then roll back any uncommitted changes.
> +        * </p>
> +        */
> +       public void rollback();
> +
> +       /**
> +        * <p>
> +        *  Delete from the writer's underlying data store based the passed-in writer-specific query. (Optional Operation)
> +        * </p>
> +        * @param q
> +        */
> +       public void deleteByQuery(String q);
> +
> +       /**
> +        * <p>
> +        *  Delete everything from the writer's underlying data store
> +        * </p>
> +        */
> +       public void doDeleteAll();
> +
> +       /**
> +        * <p>
> +        *  Delete from the writer's underlying data store based on the passed-in Primary Key
> +        * </p>
> +        * @param key
> +        */
> +       public void deleteDoc(Object key);
> +
> +
> +
> +       /**
> +        * <p>
> +        *  Add a document to this writer's underlying data store.
> +        * </p>
> +        * @param doc
> +        * @return
> +        */
> +       public boolean upload(SolrInputDocument doc);
> +
> +
> +
> +       /**
> +        * <p>
> +        *  Provide context information for this writer.  init() should be called before using the writer.
> +        * </p>
> +        * @param context
> +        */
> +       public void init(Context context) ;
> +
> +
> +       /**
> +        * <p>
> +        *  Specify the keys to be modified by a delta update (required by writers that can store duplicate keys)
> +        * </p>
> +        * @param deltaKeys
> +        */
> +       public void setDeltaKeys(Set<Map<String, Object>> deltaKeys) ;
> +
> +}
>
> Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java?rev=1303792&view=auto
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java (added)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java Thu Mar 22 14:11:16 2012
> @@ -0,0 +1,27 @@
> +package org.apache.solr.handler.dataimport;
> +
> +import java.util.HashSet;
> +import java.util.Map;
> +import java.util.Set;
> +
> +public abstract class DIHWriterBase implements DIHWriter {
> +  protected String keyFieldName;
> +  protected Set<Object> deltaKeys = null;
> +
> +  public void setDeltaKeys(Set<Map<String,Object>> passedInDeltaKeys) {
> +    deltaKeys = new HashSet<Object>();
> +    for (Map<String,Object> aMap : passedInDeltaKeys) {
> +      if (aMap.size() > 0) {
> +        Object key = null;
> +        if (keyFieldName != null) {
> +          key = aMap.get(keyFieldName);
> +        } else {
> +          key = aMap.entrySet().iterator().next();
> +        }
> +        if (key != null) {
> +          deltaKeys.add(key);
> +        }
> +      }
> +    }
> +  }
> +}
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java Thu Mar 22 14:11:16 2012
> @@ -110,6 +110,8 @@ public class DataConfig {
>     public DataSource dataSrc;
>
>     public Map<String, List<Field>> colNameVsField = new HashMap<String, List<Field>>();
> +
> +    public boolean initalized = false;
>
>     public Entity() {
>     }
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java Thu Mar 22 14:11:16 2012
> @@ -22,7 +22,6 @@ import org.apache.solr.common.SolrInputD
>  import org.apache.solr.common.params.CommonParams;
>  import org.apache.solr.common.params.ModifiableSolrParams;
>  import org.apache.solr.common.params.SolrParams;
> -import org.apache.solr.common.params.UpdateParams;
>  import org.apache.solr.common.util.ContentStreamBase;
>  import org.apache.solr.common.util.NamedList;
>  import org.apache.solr.common.util.ContentStream;
> @@ -115,7 +114,7 @@ public class DataImportHandler extends R
>           final InputSource is = new InputSource(core.getResourceLoader().openConfig(configLoc));
>           is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(configLoc));
>           importer = new DataImporter(is, core,
> -                  dataSources, coreScopeSession);
> +                  dataSources, coreScopeSession, myName);
>         }
>       }
>     } catch (Throwable e) {
> @@ -167,7 +166,7 @@ public class DataImportHandler extends R
>         try {
>           processConfiguration((NamedList) initArgs.get("defaults"));
>           importer = new DataImporter(new InputSource(new StringReader(requestParams.dataConfig)), req.getCore()
> -                  , dataSources, coreScopeSession);
> +                  , dataSources, coreScopeSession, myName);
>         } catch (RuntimeException e) {
>           rsp.add("exception", DebugLogger.getStacktraceString(e));
>           importer = null;
> @@ -199,16 +198,18 @@ public class DataImportHandler extends R
>         UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
>         SolrResourceLoader loader = req.getCore().getResourceLoader();
>         SolrWriter sw = getSolrWriter(processor, loader, requestParams);
> -
> +
>         if (requestParams.debug) {
>           if (debugEnabled) {
>             // Synchronous request for the debug mode
>             importer.runCmd(requestParams, sw);
>             rsp.add("mode", "debug");
>             rsp.add("documents", debugDocuments);
> -            if (sw.debugLogger != null)
> -              rsp.add("verbose-output", sw.debugLogger.output);
> +            if (requestParams.debugVerboseOutput != null) {
> +              rsp.add("verbose-output", requestParams.debugVerboseOutput);
> +            }
>             debugDocuments.clear();
> +            requestParams.debugVerboseOutput = null;
>           } else {
>             message = DataImporter.MSG.DEBUG_NOT_ENABLED;
>           }
> @@ -217,7 +218,7 @@ public class DataImportHandler extends R
>           if(requestParams.contentStream == null && !requestParams.syncMode){
>             importer.runAsync(requestParams, sw);
>           } else {
> -              importer.runCmd(requestParams, sw);
> +            importer.runCmd(requestParams, sw);
>           }
>         }
>       } else if (DataImporter.RELOAD_CONF_CMD.equals(command)) {
> @@ -282,9 +283,8 @@ public class DataImportHandler extends R
>   private SolrWriter getSolrWriter(final UpdateRequestProcessor processor,
>                                    final SolrResourceLoader loader, final DataImporter.RequestParams requestParams) {
>
> -    return new SolrWriter(processor, loader.getConfigDir(), myName) {
> +    return new SolrWriter(processor) {
>
> -      @Override
>       public boolean upload(SolrInputDocument document) {
>         try {
>           if (requestParams.debug) {
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java Thu Mar 22 14:11:16 2012
> @@ -18,11 +18,13 @@
>  package org.apache.solr.handler.dataimport;
>
>  import org.apache.solr.common.SolrException;
> +import org.apache.solr.common.SolrInputDocument;
>  import org.apache.solr.core.SolrConfig;
>  import org.apache.solr.core.SolrCore;
>  import org.apache.solr.schema.IndexSchema;
>  import org.apache.solr.schema.SchemaField;
>  import org.apache.solr.common.util.ContentStream;
> +import org.apache.solr.common.util.NamedList;
>  import org.apache.solr.common.util.StrUtils;
>  import org.apache.solr.common.util.SystemIdResolver;
>  import org.apache.solr.common.util.XMLErrorLogger;
> @@ -39,7 +41,6 @@ import org.apache.commons.io.IOUtils;
>
>  import javax.xml.parsers.DocumentBuilder;
>  import javax.xml.parsers.DocumentBuilderFactory;
> -import java.io.File;
>  import java.io.StringReader;
>  import java.text.SimpleDateFormat;
>  import java.util.*;
> @@ -81,26 +82,35 @@ public class DataImporter {
>   public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
>
>   private SolrCore core;
> +
> +  private DIHPropertiesWriter propWriter;
>
>   private ReentrantLock importLock = new ReentrantLock();
>
>   private final Map<String , Object> coreScopeSession;
>
>   private boolean isDeltaImportSupported = false;
> +  private final String handlerName;
>
>   /**
>    * Only for testing purposes
>    */
>   DataImporter() {
>     coreScopeSession = new ConcurrentHashMap<String, Object>();
> +    this.propWriter = new SimplePropertiesWriter();
> +    propWriter.init(this);
> +    this.handlerName = "dataimport" ;
>   }
>
> -  DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session) {
> +  DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session, String handlerName) {
> +      this.handlerName = handlerName;
>     if (dataConfig == null)
>       throw new DataImportHandlerException(SEVERE,
>               "Configuration not found");
>     this.core = core;
>     this.schema = core.getSchema();
> +    this.propWriter = new SimplePropertiesWriter();
> +    propWriter.init(this);
>     dataSourceProps = ds;
>     if (session == null)
>       session = new HashMap<String, Object>();
> @@ -121,7 +131,11 @@ public class DataImporter {
>     }
>   }
>
> -  private void verifyWithSchema(Map<String, DataConfig.Field> fields) {
> +   public String getHandlerName() {
> +        return handlerName;
> +    }
> +
> +    private void verifyWithSchema(Map<String, DataConfig.Field> fields) {
>     Map<String, SchemaField> schemaFields = schema.getFields();
>     for (Map.Entry<String, SchemaField> entry : schemaFields.entrySet()) {
>       SchemaField sf = entry.getValue();
> @@ -354,7 +368,7 @@ public class DataImporter {
>     setIndexStartTime(new Date());
>
>     try {
> -      docBuilder = new DocBuilder(this, writer, requestParams);
> +      docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
>       checkWritablePersistFile(writer);
>       docBuilder.execute();
>       if (!requestParams.debug)
> @@ -371,11 +385,11 @@ public class DataImporter {
>   }
>
>   private void checkWritablePersistFile(SolrWriter writer) {
> -    File persistFile = writer.getPersistFile();
> -    boolean isWritable = persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite();
> -    if (isDeltaImportSupported && !isWritable) {
> -      throw new DataImportHandlerException(SEVERE, persistFile.getAbsolutePath() +
> -          " is not writable. Delta imports are supported by data config but will not work.");
> +//     File persistFile = propWriter.getPersistFile();
> +//    boolean isWritable = persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite();
> +    if (isDeltaImportSupported && !propWriter.isWritable()) {
> +      throw new DataImportHandlerException(SEVERE,
> +          "Properties is not writable. Delta imports are supported by data config but will not work.");
>     }
>   }
>
> @@ -385,7 +399,7 @@ public class DataImporter {
>
>     try {
>       setIndexStartTime(new Date());
> -      docBuilder = new DocBuilder(this, writer, requestParams);
> +      docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
>       checkWritablePersistFile(writer);
>       docBuilder.execute();
>       if (!requestParams.debug)
> @@ -504,7 +518,7 @@ public class DataImporter {
>     public String command = null;
>
>     public boolean debug = false;
> -
> +
>     public boolean verbose = false;
>
>     public boolean syncMode = false;
> @@ -526,6 +540,10 @@ public class DataImporter {
>     public String dataConfig;
>
>     public ContentStream contentStream;
> +
> +    public List<SolrInputDocument> debugDocuments = new ArrayList<SolrInputDocument>(0);
> +
> +    public NamedList debugVerboseOutput = null;
>
>     public RequestParams() {
>     }
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java Thu Mar 22 14:11:16 2012
> @@ -46,7 +46,7 @@ class DebugLogger {
>   private Stack<DebugInfo> debugStack;
>
>   NamedList output;
> -  private final SolrWriter writer;
> +//  private final SolrWriter writer1;
>
>   private static final String LINE = "---------------------------------------------";
>
> @@ -55,8 +55,8 @@ class DebugLogger {
>
>   boolean enabled = true;
>
> -  public DebugLogger(SolrWriter solrWriter) {
> -    writer = solrWriter;
> +  public DebugLogger() {
> +//    writer = solrWriter;
>     output = new NamedList();
>     debugStack = new Stack<DebugInfo>() {
>
> @@ -68,7 +68,7 @@ class DebugLogger {
>         return super.pop();
>       }
>     };
> -    debugStack.push(new DebugInfo(null, -1, null));
> +    debugStack.push(new DebugInfo(null, DIHLogLevels.NONE, null));
>     output = debugStack.peek().lst;
>   }
>
> @@ -76,47 +76,47 @@ class DebugLogger {
>     return debugStack.isEmpty() ? null : debugStack.peek();
>   }
>
> -  public void log(int event, String name, Object row) {
> -    if (event == SolrWriter.DISABLE_LOGGING) {
> +  public void log(DIHLogLevels event, String name, Object row) {
> +    if (event == DIHLogLevels.DISABLE_LOGGING) {
>       enabled = false;
>       return;
> -    } else if (event == SolrWriter.ENABLE_LOGGING) {
> +    } else if (event == DIHLogLevels.ENABLE_LOGGING) {
>       enabled = true;
>       return;
>     }
>
> -    if (!enabled && event != SolrWriter.START_ENTITY
> -            && event != SolrWriter.END_ENTITY) {
> +    if (!enabled && event != DIHLogLevels.START_ENTITY
> +            && event != DIHLogLevels.END_ENTITY) {
>       return;
>     }
>
> -    if (event == SolrWriter.START_DOC) {
> -      debugStack.push(new DebugInfo(null, SolrWriter.START_DOC, peekStack()));
> -    } else if (SolrWriter.START_ENTITY == event) {
> +    if (event == DIHLogLevels.START_DOC) {
> +      debugStack.push(new DebugInfo(null, DIHLogLevels.START_DOC, peekStack()));
> +    } else if (DIHLogLevels.START_ENTITY == event) {
>       debugStack
> -              .push(new DebugInfo(name, SolrWriter.START_ENTITY, peekStack()));
> -    } else if (SolrWriter.ENTITY_OUT == event
> -            || SolrWriter.PRE_TRANSFORMER_ROW == event) {
> -      if (debugStack.peek().type == SolrWriter.START_ENTITY
> -              || debugStack.peek().type == SolrWriter.START_DOC) {
> +              .push(new DebugInfo(name, DIHLogLevels.START_ENTITY, peekStack()));
> +    } else if (DIHLogLevels.ENTITY_OUT == event
> +            || DIHLogLevels.PRE_TRANSFORMER_ROW == event) {
> +      if (debugStack.peek().type == DIHLogLevels.START_ENTITY
> +              || debugStack.peek().type == DIHLogLevels.START_DOC) {
>         debugStack.peek().lst.add(null, fmt.format(new Object[]{++debugStack
>                 .peek().rowCount}));
>         addToNamedList(debugStack.peek().lst, row);
>         debugStack.peek().lst.add(null, LINE);
>       }
> -    } else if (event == SolrWriter.ROW_END) {
> +    } else if (event == DIHLogLevels.ROW_END) {
>       popAllTransformers();
> -    } else if (SolrWriter.END_ENTITY == event) {
> -      while (debugStack.pop().type != SolrWriter.START_ENTITY)
> +    } else if (DIHLogLevels.END_ENTITY == event) {
> +      while (debugStack.pop().type != DIHLogLevels.START_ENTITY)
>         ;
> -    } else if (SolrWriter.END_DOC == event) {
> -      while (debugStack.pop().type != SolrWriter.START_DOC)
> +    } else if (DIHLogLevels.END_DOC == event) {
> +      while (debugStack.pop().type != DIHLogLevels.START_DOC)
>         ;
> -    } else if (event == SolrWriter.TRANSFORMER_EXCEPTION) {
> +    } else if (event == DIHLogLevels.TRANSFORMER_EXCEPTION) {
>       debugStack.push(new DebugInfo(name, event, peekStack()));
>       debugStack.peek().lst.add("EXCEPTION",
>               getStacktraceString((Exception) row));
> -    } else if (SolrWriter.TRANSFORMED_ROW == event) {
> +    } else if (DIHLogLevels.TRANSFORMED_ROW == event) {
>       debugStack.push(new DebugInfo(name, event, peekStack()));
>       debugStack.peek().lst.add(null, LINE);
>       addToNamedList(debugStack.peek().lst, row);
> @@ -125,10 +125,10 @@ class DebugLogger {
>         DataImportHandlerException dataImportHandlerException = (DataImportHandlerException) row;
>         dataImportHandlerException.debugged = true;
>       }
> -    } else if (SolrWriter.ENTITY_META == event) {
> +    } else if (DIHLogLevels.ENTITY_META == event) {
>       popAllTransformers();
>       debugStack.peek().lst.add(name, row);
> -    } else if (SolrWriter.ENTITY_EXCEPTION == event) {
> +    } else if (DIHLogLevels.ENTITY_EXCEPTION == event) {
>       if (row instanceof DataImportHandlerException) {
>         DataImportHandlerException dihe = (DataImportHandlerException) row;
>         if (dihe.debugged)
> @@ -144,8 +144,8 @@ class DebugLogger {
>
>   private void popAllTransformers() {
>     while (true) {
> -      int type = debugStack.peek().type;
> -      if (type == SolrWriter.START_DOC || type == SolrWriter.START_ENTITY)
> +       DIHLogLevels type = debugStack.peek().type;
> +      if (type == DIHLogLevels.START_DOC || type == DIHLogLevels.START_ENTITY)
>         break;
>       debugStack.pop();
>     }
> @@ -182,23 +182,23 @@ class DebugLogger {
>
>       @Override
>       public Object getData(String query) {
> -        writer.log(SolrWriter.ENTITY_META, "query", query);
> +        log(DIHLogLevels.ENTITY_META, "query", query);
>         long start = System.currentTimeMillis();
>         try {
>           return ds.getData(query);
>         } catch (DataImportHandlerException de) {
> -          writer.log(SolrWriter.ENTITY_EXCEPTION,
> +          log(DIHLogLevels.ENTITY_EXCEPTION,
>                   null, de);
>           throw de;
>         } catch (Exception e) {
> -          writer.log(SolrWriter.ENTITY_EXCEPTION,
> +          log(DIHLogLevels.ENTITY_EXCEPTION,
>                   null, e);
>           DataImportHandlerException de = new DataImportHandlerException(
>                   DataImportHandlerException.SEVERE, "", e);
>           de.debugged = true;
>           throw de;
>         } finally {
> -          writer.log(SolrWriter.ENTITY_META, "time-taken", DocBuilder
> +          log(DIHLogLevels.ENTITY_META, "time-taken", DocBuilder
>                   .getTimeElapsedSince(start));
>         }
>       }
> @@ -209,18 +209,18 @@ class DebugLogger {
>     return new Transformer() {
>       @Override
>       public Object transformRow(Map<String, Object> row, Context context) {
> -        writer.log(SolrWriter.PRE_TRANSFORMER_ROW, null, row);
> +        log(DIHLogLevels.PRE_TRANSFORMER_ROW, null, row);
>         String tName = getTransformerName(t);
>         Object result = null;
>         try {
>           result = t.transformRow(row, context);
> -          writer.log(SolrWriter.TRANSFORMED_ROW, tName, result);
> +          log(DIHLogLevels.TRANSFORMED_ROW, tName, result);
>         } catch (DataImportHandlerException de) {
> -          writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, de);
> +          log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, de);
>           de.debugged = true;
>           throw de;
>         } catch (Exception e) {
> -          writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, e);
> +          log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, e);
>           DataImportHandlerException de = new DataImportHandlerException(DataImportHandlerException.SEVERE, "", e);
>           de.debugged = true;
>           throw de;
> @@ -259,23 +259,23 @@ class DebugLogger {
>
>     NamedList lst;
>
> -    int type;
> +    DIHLogLevels type;
>
>     DebugInfo parent;
>
> -    public DebugInfo(String name, int type, DebugInfo parent) {
> +    public DebugInfo(String name, DIHLogLevels type, DebugInfo parent) {
>       this.name = name;
>       this.type = type;
>       this.parent = parent;
>       lst = new NamedList();
>       if (parent != null) {
>         String displayName = null;
> -        if (type == SolrWriter.START_ENTITY) {
> +        if (type == DIHLogLevels.START_ENTITY) {
>           displayName = "entity:" + name;
> -        } else if (type == SolrWriter.TRANSFORMED_ROW
> -                || type == SolrWriter.TRANSFORMER_EXCEPTION) {
> +        } else if (type == DIHLogLevels.TRANSFORMED_ROW
> +                || type == DIHLogLevels.TRANSFORMER_EXCEPTION) {
>           displayName = "transformer:" + name;
> -        } else if (type == SolrWriter.START_DOC) {
> +        } else if (type == DIHLogLevels.START_DOC) {
>           this.name = displayName = "document#" + SolrWriter.getDocCount();
>         }
>         parent.lst.add(displayName, lst);
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java Thu Mar 22 14:11:16 2012
> @@ -57,27 +57,62 @@ public class DocBuilder {
>
>   public Statistics importStatistics = new Statistics();
>
> -  SolrWriter writer;
> +  DIHWriter writer;
>
>   DataImporter.RequestParams requestParameters;
>
>   boolean verboseDebug = false;
>
> -   Map<String, Object> session = new ConcurrentHashMap<String, Object>();
> +  Map<String, Object> session = new ConcurrentHashMap<String, Object>();
>
>   static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
>   Map<String, Object> functionsNamespace;
>   private Properties persistedProperties;
> -
> -  public DocBuilder(DataImporter dataImporter, SolrWriter writer, DataImporter.RequestParams reqParams) {
> +
> +  private DIHPropertiesWriter propWriter;
> +  private static final String PARAM_WRITER_IMPL = "writerImpl";
> +  private static final String DEFAULT_WRITER_NAME = "SolrWriter";
> +  private DebugLogger debugLogger;
> +  private DataImporter.RequestParams reqParams;
> +
> +    @SuppressWarnings("unchecked")
> +  public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {
>     INSTANCE.set(this);
>     this.dataImporter = dataImporter;
> -    this.writer = writer;
> +    this.reqParams = reqParams;
> +    this.propWriter = propWriter;
>     DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
>     requestParameters = reqParams;
>     verboseDebug = requestParameters.debug && requestParameters.verbose;
>     functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this);
> -    persistedProperties = writer.readIndexerProperties();
> +    persistedProperties = propWriter.readIndexerProperties();
> +
> +    String writerClassStr = null;
> +    if(reqParams!=null && reqParams.requestParams != null) {
> +       writerClassStr = (String) reqParams.requestParams.get(PARAM_WRITER_IMPL);
> +    }
> +    if(writerClassStr != null && !writerClassStr.equals(DEFAULT_WRITER_NAME) && !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "." + DEFAULT_WRITER_NAME)) {
> +       try {
> +               Class<DIHWriter> writerClass = loadClass(writerClassStr, dataImporter.getCore());
> +               this.writer = writerClass.newInstance();
> +       } catch (Exception e) {
> +               throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to load Writer implementation:" + writerClassStr, e);
> +       }
> +       } else {
> +       writer = solrWriter;
> +    }
> +    ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.requestParams, null, this);
> +    writer.init(ctx);
> +  }
> +
> +
> +
> +
> +  DebugLogger getDebugLogger(){
> +    if (debugLogger == null) {
> +      debugLogger = new DebugLogger();
> +    }
> +    return debugLogger;
>   }
>
>   public VariableResolverImpl getVariableResolver() {
> @@ -137,94 +172,103 @@ public class DocBuilder {
>
>   @SuppressWarnings("unchecked")
>   public void execute() {
> -    dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
> -    document = dataImporter.getConfig().document;
> -    final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
> -    statusMessages.put(TIME_ELAPSED, new Object() {
> -      @Override
> -      public String toString() {
> -        return getTimeElapsedSince(startTime.get());
> -      }
> -    });
> -
> -    statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
> -            importStatistics.queryCount);
> -    statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
> -            importStatistics.rowsCount);
> -    statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
> -            importStatistics.docCount);
> -    statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
> -            importStatistics.skipDocCount);
> -
> -    List<String> entities = requestParameters.entities;
> -
> -    // Trigger onImportStart
> -    if (document.onImportStart != null) {
> -      invokeEventListener(document.onImportStart);
> -    }
> -    AtomicBoolean fullCleanDone = new AtomicBoolean(false);
> -    //we must not do a delete of *:* multiple times if there are multiple root entities to be run
> -    Properties lastIndexTimeProps = new Properties();
> -    lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
> -            DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
> -    for (DataConfig.Entity e : document.entities) {
> -      if (entities != null && !entities.contains(e.name))
> -        continue;
> -      lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY,
> -              DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
> -      root = e;
> -      String delQuery = e.allAttributes.get("preImportDeleteQuery");
> -      if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
> -        cleanByQuery(delQuery, fullCleanDone);
> -        doDelta();
> -        delQuery = e.allAttributes.get("postImportDeleteQuery");
> -        if (delQuery != null) {
> -          fullCleanDone.set(false);
> -          cleanByQuery(delQuery, fullCleanDone);
> -        }
> -      } else {
> -        cleanByQuery(delQuery, fullCleanDone);
> -        doFullDump();
> -        delQuery = e.allAttributes.get("postImportDeleteQuery");
> -        if (delQuery != null) {
> -          fullCleanDone.set(false);
> -          cleanByQuery(delQuery, fullCleanDone);
> -        }
> -      }
> -      statusMessages.remove(DataImporter.MSG.TOTAL_DOC_PROCESSED);
> -    }
> -
> -    if (stop.get()) {
> -      // Dont commit if aborted using command=abort
> -      statusMessages.put("Aborted", DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
> -      rollback();
> -    } else {
> -      // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
> -      if (!requestParameters.clean) {
> -        if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
> -          finish(lastIndexTimeProps);
> -        }
> -      } else {
> -        // Finished operation normally, commit now
> -        finish(lastIndexTimeProps);
> -      }
> -
> -      if (writer != null) {
> -        writer.finish();
> -      }
> -
> -      if (document.onImportEnd != null) {
> -        invokeEventListener(document.onImportEnd);
> -      }
> -    }
> -
> -    statusMessages.remove(TIME_ELAPSED);
> -    statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
> -    if(importStatistics.failedDocCount.get() > 0)
> -      statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
> -
> -    statusMessages.put("Time taken ", getTimeElapsedSince(startTime.get()));
> -    LOG.info("Time taken = " + getTimeElapsedSince(startTime.get()));
> +       try {
> +           dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
> +           document = dataImporter.getConfig().document;
> +           final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
> +           statusMessages.put(TIME_ELAPSED, new Object() {
> +             @Override
> +             public String toString() {
> +               return getTimeElapsedSince(startTime.get());
> +             }
> +           });
> +
> +           statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
> +                   importStatistics.queryCount);
> +           statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
> +                   importStatistics.rowsCount);
> +           statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
> +                   importStatistics.docCount);
> +           statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
> +                   importStatistics.skipDocCount);
> +
> +           List<String> entities = requestParameters.entities;
> +
> +           // Trigger onImportStart
> +           if (document.onImportStart != null) {
> +             invokeEventListener(document.onImportStart);
> +           }
> +           AtomicBoolean fullCleanDone = new AtomicBoolean(false);
> +           //we must not do a delete of *:* multiple times if there are multiple root entities to be run
> +           Properties lastIndexTimeProps = new Properties();
> +           lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
> +                   DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
> +           for (DataConfig.Entity e : document.entities) {
> +             if (entities != null && !entities.contains(e.name))
> +               continue;
> +             lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY,
> +                     DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
> +             root = e;
> +             String delQuery = e.allAttributes.get("preImportDeleteQuery");
> +             if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
> +               cleanByQuery(delQuery, fullCleanDone);
> +               doDelta();
> +               delQuery = e.allAttributes.get("postImportDeleteQuery");
> +               if (delQuery != null) {
> +                 fullCleanDone.set(false);
> +                 cleanByQuery(delQuery, fullCleanDone);
> +               }
> +             } else {
> +               cleanByQuery(delQuery, fullCleanDone);
> +               doFullDump();
> +               delQuery = e.allAttributes.get("postImportDeleteQuery");
> +               if (delQuery != null) {
> +                 fullCleanDone.set(false);
> +                 cleanByQuery(delQuery, fullCleanDone);
> +               }
> +             }
> +             statusMessages.remove(DataImporter.MSG.TOTAL_DOC_PROCESSED);
> +           }
> +
> +           if (stop.get()) {
> +             // Dont commit if aborted using command=abort
> +             statusMessages.put("Aborted", DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
> +             rollback();
> +           } else {
> +             // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
> +             if (!requestParameters.clean) {
> +               if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
> +                 finish(lastIndexTimeProps);
> +               }
> +             } else {
> +               // Finished operation normally, commit now
> +               finish(lastIndexTimeProps);
> +             }
> +
> +             if (document.onImportEnd != null) {
> +               invokeEventListener(document.onImportEnd);
> +             }
> +           }
> +
> +           statusMessages.remove(TIME_ELAPSED);
> +           statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
> +           if(importStatistics.failedDocCount.get() > 0)
> +             statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
> +
> +           statusMessages.put("Time taken ", getTimeElapsedSince(startTime.get()));
> +           LOG.info("Time taken = " + getTimeElapsedSince(startTime.get()));
> +         } catch(Exception e)
> +               {
> +                       throw new RuntimeException(e);
> +               } finally
> +               {
> +                       if (writer != null) {
> +             writer.close();
> +           }
> +                       if(requestParameters.debug) {
> +                               requestParameters.debugVerboseOutput = getDebugLogger().output;
> +                       }
> +               }
>   }
>
>   @SuppressWarnings("unchecked")
> @@ -240,7 +284,7 @@ public class DocBuilder {
>         addStatusMessage("Optimized");
>     }
>     try {
> -      writer.persist(lastIndexTimeProps);
> +      propWriter.persist(lastIndexTimeProps);
>     } catch (Exception e) {
>       LOG.error("Could not write property file", e);
>       statusMessages.put("error", "Could not write property file. Delta imports will not work. " +
> @@ -254,20 +298,32 @@ public class DocBuilder {
>     addStatusMessage("Rolledback");
>   }
>
> -  @SuppressWarnings("unchecked")
>   private void doFullDump() {
>     addStatusMessage("Full Dump Started");
> -    if(dataImporter.getConfig().isMultiThreaded && !verboseDebug){
> +    if (dataImporter.getConfig().isMultiThreaded && !verboseDebug) {
> +      EntityRunner entityRunner = null;
>       try {
>         LOG.info("running multithreaded full-import");
> -        new EntityRunner(root,null).run(null,Context.FULL_DUMP,null);
> +        entityRunner =  new EntityRunner(root, null);
> +        entityRunner.run(null, Context.FULL_DUMP, null);
>       } catch (Exception e) {
>         throw new RuntimeException("Error in multi-threaded import", e);
> +      } finally {
> +        if (entityRunner != null) {
> +          List<EntityRunner> closure = new ArrayList<EntityRunner>();
> +          closure.add(entityRunner);
> +          for (int i = 0; i < closure.size(); i++) {
> +            assert(!closure.get(i).entityProcessorWrapper.isEmpty());
> +            closure.addAll(closure.get(i).entityProcessorWrapper.iterator().next().children.values());
> +          }
> +          for (EntityRunner er : closure) {
> +            er.entityProcessor.destroy();
> +          }
> +        }
>       }
>     } else {
>       buildDocument(getVariableResolver(), null, null, root, true, null);
> -    }
> -
> +    }
>   }
>
>   @SuppressWarnings("unchecked")
> @@ -293,6 +349,7 @@ public class DocBuilder {
>       // Make sure that documents are not re-created
>     }
>     deletedKeys = null;
> +    writer.setDeltaKeys(allPks);
>
>     statusMessages.put("Total Changed Documents", allPks.size());
>     VariableResolverImpl vri = getVariableResolver();
> @@ -385,7 +442,7 @@ public class DocBuilder {
>       for (int i = 0; i < threads; i++) {
>         entityProcessorWrapper.add(new ThreadedEntityProcessorWrapper(entityProcessor, DocBuilder.this, this, getVariableResolver()));
>       }
> -      context = new ThreadedContext(this, DocBuilder.this);
> +      context = new ThreadedContext(this, DocBuilder.this, getVariableResolver());
>     }
>
>
> @@ -426,7 +483,6 @@ public class DocBuilder {
>           }
>         }
>       } finally {
> -        entityProcessor.destroy();
>       }
>
>
> @@ -476,6 +532,9 @@ public class DocBuilder {
>                   LOG.debug("adding a doc "+docWrapper);
>                 }
>                 boolean result = writer.upload(docWrapper);
> +                if(reqParams.debug) {
> +                       reqParams.debugDocuments.add(docWrapper);
> +                }
>                 docWrapper = null;
>                 if (result){
>                   importStatistics.docCount.incrementAndGet();
> @@ -511,7 +570,6 @@ public class DocBuilder {
>           }
>         }
>       } finally {
> -        epw.destroy();
>         currentEntityProcWrapper.remove();
>         Context.CURRENT_CONTEXT.remove();
>       }
> @@ -526,7 +584,7 @@ public class DocBuilder {
>           }
>         }
>       }
> -    }
> +    }
>   }
>
>   /**A reverse linked list .
> @@ -544,10 +602,35 @@ public class DocBuilder {
>     }
>   }
>
> +  private void resetEntity(DataConfig.Entity entity) {
> +    entity.initalized = false;
> +    if (entity.entities != null) {
> +      for (DataConfig.Entity child : entity.entities) {
> +        resetEntity(child);
> +      }
> +    }
> +  }
> +
> +  private void buildDocument(VariableResolverImpl vr, DocWrapper doc,
> +      Map<String,Object> pk, DataConfig.Entity entity, boolean isRoot,
> +      ContextImpl parentCtx) {
> +    List<EntityProcessorWrapper> entitiesToDestroy = new ArrayList<EntityProcessorWrapper>();
> +    try {
> +      buildDocument(vr, doc, pk, entity, isRoot, parentCtx, entitiesToDestroy);
> +    } catch (Exception e) {
> +      throw new RuntimeException(e);
> +    } finally {
> +      for (EntityProcessorWrapper entityWrapper : entitiesToDestroy) {
> +        entityWrapper.destroy();
> +      }
> +      resetEntity(entity);
> +    }
> +  }
> +
>   @SuppressWarnings("unchecked")
>   private void buildDocument(VariableResolverImpl vr, DocWrapper doc,
>                              Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot,
> -                             ContextImpl parentCtx) {
> +                             ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
>
>     EntityProcessorWrapper entityProcessor = getEntityProcessor(entity);
>
> @@ -556,13 +639,17 @@ public class DocBuilder {
>             session, parentCtx, this);
>     entityProcessor.init(ctx);
>     Context.CURRENT_CONTEXT.set(ctx);
> +    if (!entity.initalized) {
> +      entitiesToDestroy.add(entityProcessor);
> +      entity.initalized = true;
> +    }
>
>     if (requestParameters.start > 0) {
> -      writer.log(SolrWriter.DISABLE_LOGGING, null, null);
> +      getDebugLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
>     }
>
>     if (verboseDebug) {
> -      writer.log(SolrWriter.START_ENTITY, entity.name, null);
> +      getDebugLogger().log(DIHLogLevels.START_ENTITY, entity.name, null);
>     }
>
>     int seenDocCount = 0;
> @@ -576,11 +663,11 @@ public class DocBuilder {
>           seenDocCount++;
>
>           if (seenDocCount > requestParameters.start) {
> -            writer.log(SolrWriter.ENABLE_LOGGING, null, null);
> +            getDebugLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
>           }
>
>           if (verboseDebug && entity.isDocRoot) {
> -            writer.log(SolrWriter.START_DOC, entity.name, null);
> +            getDebugLogger().log(DIHLogLevels.START_DOC, entity.name, null);
>           }
>           if (doc == null && entity.isDocRoot) {
>             doc = new DocWrapper();
> @@ -609,7 +696,7 @@ public class DocBuilder {
>           }
>
>           if (verboseDebug) {
> -            writer.log(SolrWriter.ENTITY_OUT, entity.name, arow);
> +            getDebugLogger().log(DIHLogLevels.ENTITY_OUT, entity.name, arow);
>           }
>           importStatistics.rowsCount.incrementAndGet();
>           if (doc != null) {
> @@ -620,7 +707,7 @@ public class DocBuilder {
>             vr.addNamespace(entity.name, arow);
>             for (DataConfig.Entity child : entity.entities) {
>               buildDocument(vr, doc,
> -                  child.isDocRoot ? pk : null, child, false, ctx);
> +                  child.isDocRoot ? pk : null, child, false, ctx, entitiesToDestroy);
>             }
>             vr.removeNamespace(entity.name);
>           }
> @@ -634,6 +721,9 @@ public class DocBuilder {
>               return;
>             if (!doc.isEmpty()) {
>               boolean result = writer.upload(doc);
> +              if(reqParams.debug) {
> +               reqParams.debugDocuments.add(doc);
> +              }
>               doc = null;
>               if (result){
>                 importStatistics.docCount.incrementAndGet();
> @@ -645,7 +735,7 @@ public class DocBuilder {
>
>         } catch (DataImportHandlerException e) {
>           if (verboseDebug) {
> -            writer.log(SolrWriter.ENTITY_EXCEPTION, entity.name, e);
> +            getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, e);
>           }
>           if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
>             continue;
> @@ -664,23 +754,22 @@ public class DocBuilder {
>             throw e;
>         } catch (Throwable t) {
>           if (verboseDebug) {
> -            writer.log(SolrWriter.ENTITY_EXCEPTION, entity.name, t);
> +            getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, t);
>           }
>           throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
>         } finally {
>           if (verboseDebug) {
> -            writer.log(SolrWriter.ROW_END, entity.name, null);
> +            getDebugLogger().log(DIHLogLevels.ROW_END, entity.name, null);
>             if (entity.isDocRoot)
> -              writer.log(SolrWriter.END_DOC, null, null);
> +              getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
>             Context.CURRENT_CONTEXT.remove();
>           }
>         }
>       }
>     } finally {
>       if (verboseDebug) {
> -        writer.log(SolrWriter.END_ENTITY, null, null);
> +        getDebugLogger().log(DIHLogLevels.END_ENTITY, null, null);
>       }
> -      entityProcessor.destroy();
>     }
>   }
>
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java Thu Mar 22 14:11:16 2012
> @@ -17,6 +17,7 @@
>  package org.apache.solr.handler.dataimport;
>
>  import org.apache.solr.common.SolrException;
> +
>  import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> @@ -43,21 +44,25 @@ public class EntityProcessorBase extends
>
>   protected Iterator<Map<String, Object>> rowIterator;
>
> -  protected List<Transformer> transformers;
> -
> -  protected String query;
> -
> -  protected String onError = ABORT;
> +  protected String query;
> +
> +  protected String onError = ABORT;
> +
> +  protected DIHCacheSupport cacheSupport = null;
>
>
>   @Override
>   public void init(Context context) {
> -    rowIterator = null;
>     this.context = context;
>     if (isFirstInit) {
>       firstInit(context);
>     }
> -    query = null;
> +    if(cacheSupport!=null) {
> +      rowIterator = null;
> +      query = null;
> +      cacheSupport.initNewParent(context);
> +    }
> +
>   }
>
>   /**first time init call. do one-time operations here
> @@ -66,29 +71,20 @@ public class EntityProcessorBase extends
>     entityName = context.getEntityAttribute("name");
>     String s = context.getEntityAttribute(ON_ERROR);
>     if (s != null) onError = s;
> +    initCache(context);
>     isFirstInit = false;
>   }
>
> +    protected void initCache(Context context) {
> +        String cacheImplName = context
> +            .getResolvedEntityAttribute(DIHCacheSupport.CACHE_IMPL);
>
> -  protected Map<String, Object> getNext() {
> -    try {
> -      if (rowIterator == null)
> -        return null;
> -      if (rowIterator.hasNext())
> -        return rowIterator.next();
> -      query = null;
> -      rowIterator = null;
> -      return null;
> -    } catch (Exception e) {
> -      SolrException.log(log, "getNext() failed for query '" + query + "'", e);
> -      query = null;
> -      rowIterator = null;
> -      wrapAndThrow(DataImportHandlerException.WARN, e);
> -      return null;
> +        if (cacheImplName != null ) {
> +          cacheSupport = new DIHCacheSupport(context, cacheImplName);
> +        }
>     }
> -  }
>
> -  @Override
> +    @Override
>   public Map<String, Object> nextModifiedRowKey() {
>     return null;
>   }
> @@ -114,165 +110,40 @@ public class EntityProcessorBase extends
>   public Map<String, Object> nextRow() {
>     return null;// do not do anything
>   }
> -
> -
> -  @Override
> -  public void destroy() {
> -    /*no op*/
> -  }
> -
> -  /**
> -   * Only used by cache implementations
> -   */
> -  protected String cachePk;
> -
> -  /**
> -   * Only used by cache implementations
> -   */
> -  protected String cacheVariableName;
> -
> -  /**
> -   * Only used by cache implementations
> -   */
> -  protected Map<String, List<Map<String, Object>>> simpleCache;
> -
> -  /**
> -   * Only used by cache implementations
> -   */
> -  protected Map<String, Map<Object, List<Map<String, Object>>>> cacheWithWhereClause;
> -
> -  protected List<Map<String, Object>> dataSourceRowCache;
> -
> -  /**
> -   * Only used by cache implementations
> -   */
> -  protected void cacheInit() {
> -    if (simpleCache != null || cacheWithWhereClause != null)
> -      return;
> -    String where = context.getEntityAttribute("where");
> -
> -    String cacheKey = context.getEntityAttribute(CACHE_KEY);
> -    String lookupKey = context.getEntityAttribute(CACHE_LOOKUP);
> -    if(cacheKey != null && lookupKey == null){
> -      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
> -              "'cacheKey' is specified for the entity "+ entityName+" but 'cacheLookup' is missing" );
> -
> -    }
> -    if (where == null && cacheKey == null) {
> -      simpleCache = new HashMap<String, List<Map<String, Object>>>();
> -    } else {
> -      if (where != null) {
> -        String[] splits = where.split("=");
> -        cachePk = splits[0];
> -        cacheVariableName = splits[1].trim();
> -      } else {
> -        cachePk = cacheKey;
> -        cacheVariableName = lookupKey;
> -      }
> -      cacheWithWhereClause = new HashMap<String, Map<Object, List<Map<String, Object>>>>();
> -    }
> -  }
> -
> -  /**
> -   * If the where clause is present the cache is sql Vs Map of key Vs List of Rows. Only used by cache implementations.
> -   *
> -   * @param query the query string for which cached data is to be returned
> -   *
> -   * @return the cached row corresponding to the given query after all variables have been resolved
> -   */
> -  protected Map<String, Object> getIdCacheData(String query) {
> -    Map<Object, List<Map<String, Object>>> rowIdVsRows = cacheWithWhereClause
> -            .get(query);
> -    List<Map<String, Object>> rows = null;
> -    Object key = context.resolve(cacheVariableName);
> -    if (key == null) {
> -      throw new DataImportHandlerException(DataImportHandlerException.WARN,
> -              "The cache lookup value : " + cacheVariableName + " is resolved to be null in the entity :" +
> -                      context.getEntityAttribute("name"));
> -
> -    }
> -    if (rowIdVsRows != null) {
> -      rows = rowIdVsRows.get(key);
> -      if (rows == null)
> +
> +  protected Map<String, Object> getNext() {
> +    if(cacheSupport==null) {
> +      try {
> +        if (rowIterator == null)
> +          return null;
> +        if (rowIterator.hasNext())
> +          return rowIterator.next();
> +        query = null;
> +        rowIterator = null;
>         return null;
> -      dataSourceRowCache = new ArrayList<Map<String, Object>>(rows);
> -      return getFromRowCacheTransformed();
> -    } else {
> -      rows = getAllNonCachedRows();
> -      if (rows.isEmpty()) {
> +      } catch (Exception e) {
> +        SolrException.log(log, "getNext() failed for query '" + query + "'", e);
> +        query = null;
> +        rowIterator = null;
> +        wrapAndThrow(DataImportHandlerException.WARN, e);
>         return null;
> -      } else {
> -        rowIdVsRows = new HashMap<Object, List<Map<String, Object>>>();
> -        for (Map<String, Object> row : rows) {
> -          Object k = row.get(cachePk);
> -          if (k == null) {
> -            throw new DataImportHandlerException(DataImportHandlerException.WARN,
> -                    "No value available for the cache key : " + cachePk + " in the entity : " +
> -                            context.getEntityAttribute("name"));
> -          }
> -          if (!k.getClass().equals(key.getClass())) {
> -            throw new DataImportHandlerException(DataImportHandlerException.WARN,
> -                    "The key in the cache type : " + k.getClass().getName() +
> -                            "is not same as the lookup value type " + key.getClass().getName() + " in the entity " +
> -                            context.getEntityAttribute("name"));
> -          }
> -          if (rowIdVsRows.get(k) == null)
> -            rowIdVsRows.put(k, new ArrayList<Map<String, Object>>());
> -          rowIdVsRows.get(k).add(row);
> -        }
> -        cacheWithWhereClause.put(query, rowIdVsRows);
> -        if (!rowIdVsRows.containsKey(key))
> -          return null;
> -        dataSourceRowCache = new ArrayList<Map<String, Object>>(rowIdVsRows.get(key));
> -        if (dataSourceRowCache.isEmpty()) {
> -          dataSourceRowCache = null;
> -          return null;
> -        }
> -        return getFromRowCacheTransformed();
>       }
> -    }
> +    } else  {
> +      return cacheSupport.getCacheData(context, query, rowIterator);
> +    }
>   }
>
> -  /**
> -   * <p> Get all the rows from the the datasource for the given query. Only used by cache implementations. </p> This
> -   * <b>must</b> be implemented by sub-classes which intend to provide a cached implementation
> -   *
> -   * @return the list of all rows fetched from the datasource.
> -   */
> -  protected List<Map<String, Object>> getAllNonCachedRows() {
> -    return Collections.EMPTY_LIST;
> -  }
>
> -  /**
> -   * If where clause is not present the cache is a Map of query vs List of Rows. Only used by cache implementations.
> -   *
> -   * @param query string for which cached row is to be returned
> -   *
> -   * @return the cached row corresponding to the given query
> -   */
> -  protected Map<String, Object> getSimpleCacheData(String query) {
> -    List<Map<String, Object>> rows = simpleCache.get(query);
> -    if (rows != null) {
> -      dataSourceRowCache = new ArrayList<Map<String, Object>>(rows);
> -      return getFromRowCacheTransformed();
> -    } else {
> -      rows = getAllNonCachedRows();
> -      if (rows.isEmpty()) {
> -        return null;
> -      } else {
> -        dataSourceRowCache = new ArrayList<Map<String, Object>>(rows);
> -        simpleCache.put(query, rows);
> -        return getFromRowCacheTransformed();
> -      }
> -    }
> +  @Override
> +  public void destroy() {
> +       query = null;
> +       if(cacheSupport!=null){
> +         cacheSupport.destroyAll();
> +       }
> +       cacheSupport = null;
>   }
>
> -  protected Map<String, Object> getFromRowCacheTransformed() {
> -    Map<String, Object> r = dataSourceRowCache.remove(0);
> -    if (dataSourceRowCache.isEmpty())
> -      dataSourceRowCache = null;
> -    return r;
> -  }
> +
>
>   public static final String TRANSFORMER = "transformer";
>
> @@ -288,8 +159,4 @@ public class EntityProcessorBase extends
>
>   public static final String SKIP_DOC = "$skipDoc";
>
> -  public static final String CACHE_KEY = "cacheKey";
> -
> -  public static final String CACHE_LOOKUP = "cacheLookup";
> -
>  }
>
> Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java?rev=1303792&r1=1303791&r2=1303792&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java (original)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java Thu Mar 22 14:11:16 2012
> @@ -84,7 +84,7 @@ public class EntityProcessorWrapper exte
>       @Override
>       public boolean add(Transformer transformer) {
>         if (docBuilder != null && docBuilder.verboseDebug) {
> -          transformer = docBuilder.writer.getDebugLogger().wrapTransformer(transformer);
> +          transformer = docBuilder.getDebugLogger().wrapTransformer(transformer);
>         }
>         return super.add(transformer);
>       }
>
> Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java?rev=1303792&view=auto
> ==============================================================================
> --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java (added)
> +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java Thu Mar 22 14:11:16 2012
> @@ -0,0 +1,117 @@
> +package org.apache.solr.handler.dataimport;
> +
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +import java.io.File;
> +import java.io.FileInputStream;
> +import java.io.FileNotFoundException;
> +import java.io.FileOutputStream;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.util.Properties;
> +
> +import org.apache.solr.core.SolrCore;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class SimplePropertiesWriter implements DIHPropertiesWriter {
> +  private static final Logger log = LoggerFactory
> +      .getLogger(SimplePropertiesWriter.class);
> +
> +  static final String IMPORTER_PROPERTIES = "dataimport.properties";
> +
> +  static final String LAST_INDEX_KEY = "last_index_time";
> +
> +  private String persistFilename = IMPORTER_PROPERTIES;
> +
> +  private String configDir = null;
> +
> +  public void init(DataImporter dataImporter) {
> +    SolrCore core = dataImporter.getCore();
> +    String configDir = core == null ? "." : core.getResourceLoader()
> +        .getConfigDir();
> +    String persistFileName = dataImporter.getHandlerName();
> +
> +    this.configDir = configDir;
> +    if (persistFileName != null) {
> +      persistFilename = persistFileName + ".properties";
> +    }
> +  }
> +
> +  private File getPersistFile() {
> +    String filePath = configDir;
> +    if (configDir != null && !configDir.endsWith(File.separator)) filePath += File.separator;
> +    filePath += persistFilename;
> +    return new File(filePath);
> +  }
> +
> +  public boolean isWritable() {
> +    File persistFile = getPersistFile();
> +    return persistFile.exists() ? persistFile.canWrite() : persistFile
> +        .getParentFile().canWrite();
> +
> +  }
> +
> +  public void persist(Properties p) {
> +    OutputStream propOutput = null;
> +
> +    Properties props = readIndexerProperties();
> +
> +    try {
> +      props.putAll(p);
> +      String filePath = configDir;
> +      if (configDir != null && !configDir.endsWith(File.separator)) filePath += File.separator;
> +      filePath += persistFilename;
> +      propOutput = new FileOutputStream(filePath);
> +      props.store(propOutput, null);
> +      log.info("Wrote last indexed time to " + persistFilename);
> +    } catch (Exception e) {
> +      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
> +          "Unable to persist Index Start Time", e);
> +    } finally {
> +      try {
> +        if (propOutput != null) propOutput.close();
> +      } catch (IOException e) {
> +        propOutput = null;
> +      }
> +    }
> +  }
> +
> +  public Properties readIndexerProperties() {
> +    Properties props = new Properties();
> +    InputStream propInput = null;
> +
> +    try {
> +      propInput = new FileInputStream(configDir + persistFilename);
> +      props.load(propInput);
> +      log.info("Read " + persistFilename);
> +    } catch (Exception e) {
> +      log.warn("Unable to read: " + persistFilename);
> +    } finally {
> +      try {
> +        if (propInput != null) propInput.close();
> +      } catch (IOException e) {
> +        propInput = null;
> +      }
> +    }
> +
> +    return props;
> +  }
> +
> +}
>
>



-- 
lucidimagination.com

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org


Mime
View raw message