Return-Path: X-Original-To: apmail-gora-commits-archive@www.apache.org Delivered-To: apmail-gora-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6039110F79 for ; Sun, 21 Jul 2013 00:45:52 +0000 (UTC) Received: (qmail 74847 invoked by uid 500); 21 Jul 2013 00:45:52 -0000 Delivered-To: apmail-gora-commits-archive@gora.apache.org Received: (qmail 74824 invoked by uid 500); 21 Jul 2013 00:45:52 -0000 Mailing-List: contact commits-help@gora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gora.apache.org Delivered-To: mailing list commits@gora.apache.org Received: (qmail 74817 invoked by uid 99); 21 Jul 2013 00:45:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Jul 2013 00:45:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Jul 2013 00:45:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 868EE2388980; Sun, 21 Jul 2013 00:45:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1505247 - in /gora/trunk: gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java gora-tutorial/conf/gora.properties Date: Sun, 21 Jul 2013 00:45:29 -0000 To: commits@gora.apache.org From: lewismc@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130721004529.868EE2388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: lewismc Date: Sun Jul 21 00:45:29 2013 New Revision: 1505247 URL: http://svn.apache.org/r1505247 Log: reformat SolrStore for correct 2 line indents Modified: gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java gora/trunk/gora-tutorial/conf/gora.properties Modified: gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java?rev=1505247&r1=1505246&r2=1505247&view=diff ============================================================================== --- gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java (original) +++ gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java Sun Jul 21 00:45:29 2013 @@ -52,475 +52,455 @@ import org.jdom.input.SAXBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SolrStore - extends DataStoreBase { +public class SolrStore extends DataStoreBase { - private static final Logger LOG = LoggerFactory.getLogger( SolrStore.class ); + private static final Logger LOG = LoggerFactory.getLogger( SolrStore.class ); - protected static final String DEFAULT_MAPPING_FILE = "gora-solr-mapping.xml"; + protected static final String DEFAULT_MAPPING_FILE = "gora-solr-mapping.xml"; - protected static final String SOLR_URL_PROPERTY = "solr.url"; + protected static final String SOLR_URL_PROPERTY = "solr.url"; - protected static final String SOLR_CONFIG_PROPERTY = "solr.config"; + protected static final String SOLR_CONFIG_PROPERTY = "solr.config"; - protected static final String SOLR_SCHEMA_PROPERTY = "solr.schema"; + protected static final String SOLR_SCHEMA_PROPERTY = "solr.schema"; - protected static final String SOLR_BATCH_SIZE_PROPERTY = "solr.batchSize"; - - protected static final String SOLR_COMMIT_WITHIN_PROPERTY = "solr.commitWithin"; - - protected static final String SOLR_RESULTS_SIZE_PROPERTY = "solr.resultsSize"; + protected static final String SOLR_BATCH_SIZE_PROPERTY = "solr.batchSize"; - protected static final int DEFAULT_BATCH_SIZE = 100; - - protected static final int DEFAULT_COMMIT_WITHIN = 1000; - - protected static final int DEFAULT_RESULTS_SIZE = 100; - - private SolrMapping mapping; - - private String solrServerUrl, solrConfig, solrSchema; - - private SolrServer server, adminServer; + //protected static final String SOLR_SOLRJSERVER_IMPL = "solr.solrjserver"; - private ArrayList batch; - - private int batchSize = DEFAULT_BATCH_SIZE; - - private int commitWithin = DEFAULT_COMMIT_WITHIN; - - private int resultsSize = DEFAULT_RESULTS_SIZE; - - @Override - public void initialize( Class keyClass, Class persistentClass, Properties properties ) { - super.initialize( keyClass, persistentClass, properties ); - String mappingFile = DataStoreFactory.getMappingFile( properties, this, DEFAULT_MAPPING_FILE ); - try { - mapping = readMapping( mappingFile ); - } - catch ( IOException e ) { - LOG.error( e.getMessage() ); - LOG.error( e.getStackTrace().toString() ); - } + protected static final String SOLR_COMMIT_WITHIN_PROPERTY = "solr.commitWithin"; - solrServerUrl = DataStoreFactory.findProperty( properties, this, SOLR_URL_PROPERTY, null ); - solrConfig = DataStoreFactory.findProperty( properties, this, SOLR_CONFIG_PROPERTY, null ); - solrSchema = DataStoreFactory.findProperty( properties, this, SOLR_SCHEMA_PROPERTY, null ); - LOG.info( "Using Solr server at " + solrServerUrl ); - adminServer = new HttpSolrServer( solrServerUrl ); - server = new HttpSolrServer( solrServerUrl + "/" + mapping.getCoreName() ); - if ( autoCreateSchema ) { - createSchema(); - } - String batchSizeString = DataStoreFactory.findProperty( properties, this, SOLR_BATCH_SIZE_PROPERTY, null ); - if ( batchSizeString != null ) { - try { - batchSize = Integer.parseInt( batchSizeString ); - } - catch ( NumberFormatException nfe ) { - LOG.warn( "Invalid batch size '" + batchSizeString + "', using default " + DEFAULT_BATCH_SIZE ); - } - } - batch = new ArrayList( batchSize ); - String commitWithinString = DataStoreFactory.findProperty( properties, this, SOLR_COMMIT_WITHIN_PROPERTY, null ); - if ( commitWithinString != null ) { - try { - commitWithin = Integer.parseInt( commitWithinString ); - } - catch ( NumberFormatException nfe ) { - LOG.warn( "Invalid commit within '" + commitWithinString + "', using default " + DEFAULT_COMMIT_WITHIN ); - } - } - String resultsSizeString = DataStoreFactory.findProperty( properties, this, SOLR_RESULTS_SIZE_PROPERTY, null ); - if ( resultsSizeString != null ) { - try { - resultsSize = Integer.parseInt( resultsSizeString ); - } - catch ( NumberFormatException nfe ) { - LOG.warn( "Invalid results size '" + resultsSizeString + "', using default " + DEFAULT_RESULTS_SIZE ); - } - } - } - - @SuppressWarnings("unchecked") - private SolrMapping readMapping( String filename ) - throws IOException { - SolrMapping map = new SolrMapping(); - try { - SAXBuilder builder = new SAXBuilder(); - Document doc = builder.build( getClass().getClassLoader().getResourceAsStream( filename ) ); - - List classes = doc.getRootElement().getChildren( "class" ); - - for ( Element classElement : classes ) { - if ( classElement.getAttributeValue( "keyClass" ).equals( keyClass.getCanonicalName() ) - && classElement.getAttributeValue( "name" ).equals( persistentClass.getCanonicalName() ) ) { - - String tableName = getSchemaName( classElement.getAttributeValue( "table" ), persistentClass ); - map.setCoreName( tableName ); - - Element primaryKeyEl = classElement.getChild( "primarykey" ); - map.setPrimaryKey( primaryKeyEl.getAttributeValue( "column" ) ); - - List fields = classElement.getChildren( "field" ); - - for ( Element field : fields ) { - String fieldName = field.getAttributeValue( "name" ); - String columnName = field.getAttributeValue( "column" ); - map.addField( fieldName, columnName ); - } - break; - } - } - } - catch ( Exception ex ) { - throw new IOException( ex ); - } - - return map; - } - - public SolrMapping getMapping() { - return mapping; - } - - @Override - public String getSchemaName() { - return mapping.getCoreName(); - } - - @Override - public void createSchema() { - try { - if ( !schemaExists() ) - CoreAdminRequest.createCore( mapping.getCoreName(), mapping.getCoreName(), adminServer, solrConfig, - solrSchema ); - } - catch ( Exception e ) { - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - } + protected static final String SOLR_RESULTS_SIZE_PROPERTY = "solr.resultsSize"; + + protected static final int DEFAULT_BATCH_SIZE = 100; - @Override - /** Default implementation deletes and recreates the schema*/ - public void truncateSchema() { - try { - server.deleteByQuery( "*:*" ); - server.commit(); - } - catch ( Exception e ) { - // ignore? - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - } + protected static final int DEFAULT_COMMIT_WITHIN = 1000; - @Override - public void deleteSchema() { - // XXX should this be only in truncateSchema ??? - try { - server.deleteByQuery( "*:*" ); - server.commit(); - } - catch ( Exception e ) { - // ignore? - // LOG.error(e.getMessage()); - // LOG.error(e.getStackTrace().toString()); - } - try { - CoreAdminRequest.unloadCore( mapping.getCoreName(), adminServer ); - } - catch ( Exception e ) { - if ( e.getMessage().contains( "No such core" ) ) { - return; // it's ok, the core is not there - } - else { - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - } - } + protected static final int DEFAULT_RESULTS_SIZE = 100; - @Override - public boolean schemaExists() { - boolean exists = false; - try { - CoreAdminResponse rsp = CoreAdminRequest.getStatus( mapping.getCoreName(), adminServer ); - exists = rsp.getUptime( mapping.getCoreName() ) != null; - } - catch ( Exception e ) { - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - return exists; - } + private SolrMapping mapping; - private static final String toDelimitedString( String[] arr, String sep ) { - if ( arr == null || arr.length == 0 ) { - return ""; - } - StringBuilder sb = new StringBuilder(); - for ( int i = 0; i < arr.length; i++ ) { - if ( i > 0 ) - sb.append( sep ); - sb.append( arr[i] ); - } - return sb.toString(); - } - - public static String escapeQueryKey( String key ) { - if ( key == null ) { - return null; - } - StringBuilder sb = new StringBuilder(); - for ( int i = 0; i < key.length(); i++ ) { - char c = key.charAt( i ); - switch ( c ) { - case ':': - case '*': - sb.append( "\\" + c ); - break; - default: - sb.append( c ); - } - } - return sb.toString(); - } - - @Override - public T get( K key, String[] fields ) { - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set( CommonParams.QT, "/get" ); - params.set( CommonParams.FL, toDelimitedString( fields, "," ) ); - params.set( "id", key.toString() ); - try { - QueryResponse rsp = server.query( params ); - Object o = rsp.getResponse().get( "doc" ); - if ( o == null ) { - return null; - } - return newInstance( (SolrDocument)o, fields ); - } - catch ( Exception e ) { - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } + private String solrServerUrl, solrConfig, solrSchema; + + private SolrServer server, adminServer; + + private ArrayList batch; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private int commitWithin = DEFAULT_COMMIT_WITHIN; + + private int resultsSize = DEFAULT_RESULTS_SIZE; + + @Override + public void initialize( Class keyClass, Class persistentClass, Properties properties ) { + super.initialize( keyClass, persistentClass, properties ); + String mappingFile = DataStoreFactory.getMappingFile( properties, this, DEFAULT_MAPPING_FILE ); + try { + mapping = readMapping( mappingFile ); + } + catch ( IOException e ) { + LOG.error( e.getMessage() ); + LOG.error( e.getStackTrace().toString() ); + } + + solrServerUrl = DataStoreFactory.findProperty( properties, this, SOLR_URL_PROPERTY, null ); + solrConfig = DataStoreFactory.findProperty( properties, this, SOLR_CONFIG_PROPERTY, null ); + solrSchema = DataStoreFactory.findProperty( properties, this, SOLR_SCHEMA_PROPERTY, null ); + LOG.info( "Using Solr server at " + solrServerUrl ); + adminServer = new HttpSolrServer( solrServerUrl ); + server = new HttpSolrServer( solrServerUrl + "/" + mapping.getCoreName() ); + if ( autoCreateSchema ) { + createSchema(); + } + String batchSizeString = DataStoreFactory.findProperty( properties, this, SOLR_BATCH_SIZE_PROPERTY, null ); + if ( batchSizeString != null ) { + try { + batchSize = Integer.parseInt( batchSizeString ); + } catch ( NumberFormatException nfe ) { + LOG.warn( "Invalid batch size '" + batchSizeString + "', using default " + DEFAULT_BATCH_SIZE ); + } + } + batch = new ArrayList( batchSize ); + String commitWithinString = DataStoreFactory.findProperty( properties, this, SOLR_COMMIT_WITHIN_PROPERTY, null ); + if ( commitWithinString != null ) { + try { + commitWithin = Integer.parseInt( commitWithinString ); + } catch ( NumberFormatException nfe ) { + LOG.warn( "Invalid commit within '" + commitWithinString + "', using default " + DEFAULT_COMMIT_WITHIN ); + } + } + String resultsSizeString = DataStoreFactory.findProperty( properties, this, SOLR_RESULTS_SIZE_PROPERTY, null ); + if ( resultsSizeString != null ) { + try { + resultsSize = Integer.parseInt( resultsSizeString ); + } catch ( NumberFormatException nfe ) { + LOG.warn( "Invalid results size '" + resultsSizeString + "', using default " + DEFAULT_RESULTS_SIZE ); + } + } + } + + @SuppressWarnings("unchecked") + private SolrMapping readMapping( String filename ) throws IOException { + SolrMapping map = new SolrMapping(); + try { + SAXBuilder builder = new SAXBuilder(); + Document doc = builder.build( getClass().getClassLoader().getResourceAsStream( filename ) ); + + List classes = doc.getRootElement().getChildren( "class" ); + + for ( Element classElement : classes ) { + if ( classElement.getAttributeValue( "keyClass" ).equals( keyClass.getCanonicalName() ) + && classElement.getAttributeValue( "name" ).equals( persistentClass.getCanonicalName() ) ) { + + String tableName = getSchemaName( classElement.getAttributeValue( "table" ), persistentClass ); + map.setCoreName( tableName ); + + Element primaryKeyEl = classElement.getChild( "primarykey" ); + map.setPrimaryKey( primaryKeyEl.getAttributeValue( "column" ) ); + + List fields = classElement.getChildren( "field" ); + + for ( Element field : fields ) { + String fieldName = field.getAttributeValue( "name" ); + String columnName = field.getAttributeValue( "column" ); + map.addField( fieldName, columnName ); + } + break; + } + } + } catch ( Exception ex ) { + throw new IOException( ex ); + } + + return map; + } + + public SolrMapping getMapping() { + return mapping; + } + + @Override + public String getSchemaName() { + return mapping.getCoreName(); + } + + @Override + public void createSchema() { + try { + if ( !schemaExists() ) + CoreAdminRequest.createCore( mapping.getCoreName(), mapping.getCoreName(), adminServer, solrConfig, + solrSchema ); + } catch ( Exception e ) { + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + } + + @Override + /** Default implementation deletes and recreates the schema*/ + public void truncateSchema() { + try { + server.deleteByQuery( "*:*" ); + server.commit(); + } catch ( Exception e ) { + // ignore? + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + } + + @Override + public void deleteSchema() { + // XXX should this be only in truncateSchema ??? + try { + server.deleteByQuery( "*:*" ); + server.commit(); + } catch ( Exception e ) { + // ignore? + // LOG.error(e.getMessage()); + // LOG.error(e.getStackTrace().toString()); + } + try { + CoreAdminRequest.unloadCore( mapping.getCoreName(), adminServer ); + } catch ( Exception e ) { + if ( e.getMessage().contains( "No such core" ) ) { + return; // it's ok, the core is not there + } else { + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + } + } + + @Override + public boolean schemaExists() { + boolean exists = false; + try { + CoreAdminResponse rsp = CoreAdminRequest.getStatus( mapping.getCoreName(), adminServer ); + exists = rsp.getUptime( mapping.getCoreName() ) != null; + } catch ( Exception e ) { + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + return exists; + } + + private static final String toDelimitedString( String[] arr, String sep ) { + if ( arr == null || arr.length == 0 ) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for ( int i = 0; i < arr.length; i++ ) { + if ( i > 0 ) + sb.append( sep ); + sb.append( arr[i] ); + } + return sb.toString(); + } + + public static String escapeQueryKey( String key ) { + if ( key == null ) { + return null; + } + StringBuilder sb = new StringBuilder(); + for ( int i = 0; i < key.length(); i++ ) { + char c = key.charAt( i ); + switch ( c ) { + case ':': + case '*': + sb.append( "\\" + c ); + break; + default: + sb.append( c ); + } + } + return sb.toString(); + } + + @Override + public T get( K key, String[] fields ) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set( CommonParams.QT, "/get" ); + params.set( CommonParams.FL, toDelimitedString( fields, "," ) ); + params.set( "id", key.toString() ); + try { + QueryResponse rsp = server.query( params ); + Object o = rsp.getResponse().get( "doc" ); + if ( o == null ) { return null; - } - - public T newInstance( SolrDocument doc, String[] fields ) - throws IOException { - T persistent = newPersistent(); - if ( fields == null ) { - fields = fieldMap.keySet().toArray( new String[fieldMap.size()] ); - } - String pk = mapping.getPrimaryKey(); - for ( String f : fields ) { - Field field = fieldMap.get( f ); - Schema fieldSchema = field.schema(); - String sf = null; - if ( pk.equals( f ) ) { - sf = f; - } - else { - sf = mapping.getSolrField( f ); - } - Object sv = doc.get( sf ); - Object v; - if ( sv == null ) { - continue; - } - switch ( fieldSchema.getType() ) { - case MAP: - case ARRAY: - case RECORD: - v = IOUtils.deserialize( (byte[]) sv, datumReader, fieldSchema, persistent.get( field.pos() ) ); - persistent.put( field.pos(), v ); - break; - case ENUM: - v = AvroUtils.getEnumValue( fieldSchema, (String) sv ); - persistent.put( field.pos(), v ); - break; - case FIXED: - throw new IOException( "???" ); - // break; - case BYTES: - persistent.put( field.pos(), ByteBuffer.wrap( (byte[]) sv ) ); - break; - case BOOLEAN: - case DOUBLE: - case FLOAT: - case INT: - case LONG: - persistent.put( field.pos(), sv ); - break; - case STRING: - persistent.put( field.pos(), new Utf8( sv.toString() ) ); - break; - case UNION: - LOG.error( "Union is not supported yet" ); - break; - default: - LOG.error( "Unknown field type: " + fieldSchema.getType() ); - } - persistent.setDirty( field.pos() ); - } - persistent.clearDirty(); - return persistent; - } - - @Override - public void put( K key, T persistent ) { - Schema schema = persistent.getSchema(); - StateManager stateManager = persistent.getStateManager(); - if ( !stateManager.isDirty( persistent ) ) { - // nothing to do - return; - } - SolrInputDocument doc = new SolrInputDocument(); - // add primary key - doc.addField( mapping.getPrimaryKey(), key ); - // populate the doc - List fields = schema.getFields(); - for ( Field field : fields ) { - String sf = mapping.getSolrField( field.name() ); - // Solr will append values to fields in a SolrInputDocument, even the key - // mapping won't find the primary - if ( sf == null ) { - continue; - } - Schema fieldSchema = field.schema(); - Object v = persistent.get( field.pos() ); - if ( v == null ) { - continue; - } - switch ( fieldSchema.getType() ) { - case MAP: - case ARRAY: - case RECORD: - byte[] data = null; - try { - data = IOUtils.serialize( datumWriter, fieldSchema, v ); - } - catch ( IOException e ) { - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - doc.addField( sf, data ); - break; - case BYTES: - doc.addField( sf, ( (ByteBuffer) v ).array() ); - break; - case ENUM: - case STRING: - doc.addField( sf, v.toString() ); - break; - case BOOLEAN: - case DOUBLE: - case FLOAT: - case INT: - case LONG: - doc.addField( sf, v ); - break; - case UNION: - LOG.error( "Union is not supported yet" ); - break; - default: - LOG.error( "Unknown field type: " + fieldSchema.getType() ); - } - } - System.out.println( "DOCUMENT: " + doc ); - batch.add( doc ); - if ( batch.size() >= batchSize ) { - try { - add( batch, commitWithin ); - batch.clear(); - } - catch ( Exception e ) { - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - } - } - - @Override - public boolean delete( K key ) { - String keyField = mapping.getPrimaryKey(); - try { - UpdateResponse rsp = server.deleteByQuery( keyField + ":" + escapeQueryKey( key.toString() ) ); - server.commit(); - LOG.info( rsp.toString() ); - return true; - } - catch ( Exception e ) { - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - return false; - } - - @Override - public long deleteByQuery( Query query ) { - String q = ( (SolrQuery) query ).toSolrQuery(); - try { - UpdateResponse rsp = server.deleteByQuery( q ); - server.commit(); - LOG.info( rsp.toString() ); - } - catch ( Exception e ) { - LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - return 0; - } - - @Override - public Result execute( Query query ) { - try { - return new SolrResult( this, query, server, resultsSize ); - } - catch ( IOException e ) { + } + return newInstance( (SolrDocument)o, fields ); + } catch ( Exception e ) { + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + return null; + } + + public T newInstance( SolrDocument doc, String[] fields ) + throws IOException { + T persistent = newPersistent(); + if ( fields == null ) { + fields = fieldMap.keySet().toArray( new String[fieldMap.size()] ); + } + String pk = mapping.getPrimaryKey(); + for ( String f : fields ) { + Field field = fieldMap.get( f ); + Schema fieldSchema = field.schema(); + String sf = null; + if ( pk.equals( f ) ) { + sf = f; + } else { + sf = mapping.getSolrField( f ); + } + Object sv = doc.get( sf ); + Object v; + if ( sv == null ) { + continue; + } + switch ( fieldSchema.getType() ) { + case MAP: + case ARRAY: + case RECORD: + v = IOUtils.deserialize( (byte[]) sv, datumReader, fieldSchema, persistent.get( field.pos() ) ); + persistent.put( field.pos(), v ); + break; + case ENUM: + v = AvroUtils.getEnumValue( fieldSchema, (String) sv ); + persistent.put( field.pos(), v ); + break; + case FIXED: + throw new IOException( "???" ); + // break; + case BYTES: + persistent.put( field.pos(), ByteBuffer.wrap( (byte[]) sv ) ); + break; + case BOOLEAN: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + persistent.put( field.pos(), sv ); + break; + case STRING: + persistent.put( field.pos(), new Utf8( sv.toString() ) ); + break; + case UNION: + LOG.error( "Union is not supported yet" ); + break; + default: + LOG.error( "Unknown field type: " + fieldSchema.getType() ); + } + persistent.setDirty( field.pos() ); + } + persistent.clearDirty(); + return persistent; + } + + @Override + public void put( K key, T persistent ) { + Schema schema = persistent.getSchema(); + StateManager stateManager = persistent.getStateManager(); + if ( !stateManager.isDirty( persistent ) ) { + // nothing to do + return; + } + SolrInputDocument doc = new SolrInputDocument(); + // add primary key + doc.addField( mapping.getPrimaryKey(), key ); + // populate the doc + List fields = schema.getFields(); + for ( Field field : fields ) { + String sf = mapping.getSolrField( field.name() ); + // Solr will append values to fields in a SolrInputDocument, even the key + // mapping won't find the primary + if ( sf == null ) { + continue; + } + Schema fieldSchema = field.schema(); + Object v = persistent.get( field.pos() ); + if ( v == null ) { + continue; + } + switch ( fieldSchema.getType() ) { + case MAP: + case ARRAY: + case RECORD: + byte[] data = null; + try { + data = IOUtils.serialize( datumWriter, fieldSchema, v ); + } catch ( IOException e ) { LOG.error( e.getMessage(), e.getStackTrace().toString() ); - } - return null; - } - - @Override - public Query newQuery() { - return new SolrQuery( this ); - } - - @Override - public List> getPartitions( Query query ) - throws IOException { - // TODO: implement this using Hadoop DB support - - ArrayList> partitions = new ArrayList>(); - partitions.add( new PartitionQueryImpl( query ) ); - - return partitions; - } - - @Override - public void flush() { - try { - if ( batch.size() > 0 ) { - add( batch, commitWithin ); - batch.clear(); - } - } - catch ( Exception e ) { - LOG.error(e.getMessage(), e.getStackTrace()); - } - } - - @Override - public void close() { - // In testing, the index gets closed before the commit in flush() can happen - // so an exception gets thrown - //flush(); - } + } + doc.addField( sf, data ); + break; + case BYTES: + doc.addField( sf, ( (ByteBuffer) v ).array() ); + break; + case ENUM: + case STRING: + doc.addField( sf, v.toString() ); + break; + case BOOLEAN: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + doc.addField( sf, v ); + break; + case UNION: + LOG.error( "Union is not supported yet" ); + break; + default: + LOG.error( "Unknown field type: " + fieldSchema.getType() ); + } + } + LOG.info( "DOCUMENT: " + doc ); + batch.add( doc ); + if ( batch.size() >= batchSize ) { + try { + add( batch, commitWithin ); + batch.clear(); + } catch ( Exception e ) { + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + } + } + + @Override + public boolean delete( K key ) { + String keyField = mapping.getPrimaryKey(); + try { + UpdateResponse rsp = server.deleteByQuery( keyField + ":" + escapeQueryKey( key.toString() ) ); + server.commit(); + LOG.info( rsp.toString() ); + return true; + } catch ( Exception e ) { + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + return false; + } + + @Override + public long deleteByQuery( Query query ) { + String q = ( (SolrQuery) query ).toSolrQuery(); + try { + UpdateResponse rsp = server.deleteByQuery( q ); + server.commit(); + LOG.info( rsp.toString() ); + } catch ( Exception e ) { + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + return 0; + } + + @Override + public Result execute( Query query ) { + try { + return new SolrResult( this, query, server, resultsSize ); + } catch ( IOException e ) { + LOG.error( e.getMessage(), e.getStackTrace().toString() ); + } + return null; + } + + @Override + public Query newQuery() { + return new SolrQuery( this ); + } + + @Override + public List> getPartitions( Query query ) + throws IOException { + // TODO: implement this using Hadoop DB support + + ArrayList> partitions = new ArrayList>(); + partitions.add( new PartitionQueryImpl( query ) ); + + return partitions; + } + + @Override + public void flush() { + try { + if ( batch.size() > 0 ) { + add( batch, commitWithin ); + batch.clear(); + } + } catch ( Exception e ) { + LOG.error(e.getMessage(), e.getStackTrace()); + } + } + + @Override + public void close() { + // In testing, the index gets closed before the commit in flush() can happen + // so an exception gets thrown + //flush(); + } - private void add(ArrayList batch, int commitWithin) throws SolrServerException, IOException { - if (commitWithin == 0) { - server.add( batch ); - server.commit( false, true, true ); - } - else { - server.add( batch, commitWithin ); - } + private void add(ArrayList batch, int commitWithin) throws SolrServerException, IOException { + if (commitWithin == 0) { + server.add( batch ); + server.commit( false, true, true ); + } else { + server.add( batch, commitWithin ); } - + } } Modified: gora/trunk/gora-tutorial/conf/gora.properties URL: http://svn.apache.org/viewvc/gora/trunk/gora-tutorial/conf/gora.properties?rev=1505247&r1=1505246&r2=1505247&view=diff ============================================================================== --- gora/trunk/gora-tutorial/conf/gora.properties (original) +++ gora/trunk/gora-tutorial/conf/gora.properties Sun Jul 21 00:45:29 2013 @@ -48,4 +48,7 @@ gora.sqlstore.jdbc.url=jdbc:hsqldb:file: gora.solrstore.solr.url=http://localhost:8983/solr gora.solrstore.solr.commitwithin=0 gora.solrstore.solr.batchsize=100 +# set which Solrj server impl you wish to use +# cloud, concurrent, http, loadbalance +#gora.solrstore.solr.solrjserver=http