Author: akarasulu Date: Sat Nov 4 17:25:51 2006 New Revision: 471315 URL: http://svn.apache.org/viewvc?view=rev&rev=471315 Log: reformatting code Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java?view=diff&rev=471315&r1=471314&r2=471315 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java (original) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java Sat Nov 4 17:25:51 2006 @@ -19,6 +19,7 @@ */ package org.apache.directory.mitosis.service.protocol.handler; + import java.net.InetSocketAddress; import java.util.Iterator; import java.util.Map; @@ -56,6 +57,7 @@ import org.apache.directory.mitosis.store.ReplicationLogIterator; import org.apache.directory.mitosis.store.ReplicationStore; + /** * {@link ReplicationContextHandler} that implements client-side replication logic * which sends any changes out-of-date to server. @@ -63,43 +65,42 @@ * @author Trustin Lee * @version $Rev: 116 $, $Date: 2006-09-18 13:47:53Z $ */ -public class ReplicationClientContextHandler implements - ReplicationContextHandler +public class ReplicationClientContextHandler implements ReplicationContextHandler { public void contextBegin( ReplicationContext ctx ) throws Exception { // Send a login message. - LoginMessage m = new LoginMessage( ctx.getNextSequence(), - ctx.getService().getConfiguration().getReplicaId() ); + LoginMessage m = new LoginMessage( ctx.getNextSequence(), ctx.getService().getConfiguration().getReplicaId() ); ctx.getSession().write( m ); - + // Set write timeout ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() ); - + // Check update vector of the remote peer every 5 seconds. ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, 5 ); } + public void contextEnd( ReplicationContext ctx ) throws Exception { } - public void messageReceived( ReplicationContext ctx, Object message ) - throws Exception + + public void messageReceived( ReplicationContext ctx, Object message ) throws Exception { ctx.cancelExpiration( ( ( BaseMessage ) message ).getSequence() ); - - if( ctx.getState() == State.READY ) + + if ( ctx.getState() == State.READY ) { - if( message instanceof LogEntryAckMessage ) + if ( message instanceof LogEntryAckMessage ) { onLogEntryAck( ctx, ( LogEntryAckMessage ) message ); } - else if( message instanceof BeginLogEntriesAckMessage ) + else if ( message instanceof BeginLogEntriesAckMessage ) { onBeginLogEntriesAck( ctx, ( BeginLogEntriesAckMessage ) message ); } - else if( message instanceof EndLogEntriesAckMessage ) + else if ( message instanceof EndLogEntriesAckMessage ) { // Do nothing } @@ -110,7 +111,7 @@ } else { - if( message instanceof LoginAckMessage ) + if ( message instanceof LoginAckMessage ) { onLoginAck( ctx, ( LoginAckMessage ) message ); } @@ -121,103 +122,98 @@ } } - public void messageSent( ReplicationContext ctx, Object message ) - throws Exception + + public void messageSent( ReplicationContext ctx, Object message ) throws Exception { - if( message instanceof LogEntryMessage || - message instanceof LoginMessage ) + if ( message instanceof LogEntryMessage || message instanceof LoginMessage ) { ctx.scheduleExpiration( message ); } } - public void exceptionCaught( ReplicationContext ctx, Throwable cause ) - throws Exception + + public void exceptionCaught( ReplicationContext ctx, Throwable cause ) throws Exception { SessionLog.warn( ctx.getSession(), "Unexpected exception.", cause ); ctx.getSession().close(); } - public void contextIdle( ReplicationContext ctx, IdleStatus status ) - throws Exception + + public void contextIdle( ReplicationContext ctx, IdleStatus status ) throws Exception { // If this cilent is logged in, all responses for sent messages // (LogEntryMessages) is received, and no write request is pending, // it means previous replication process ended or this is the // first replication attempt. - if( ctx.getState() == State.READY && - ctx.getScheduledExpirations() == 0 && - ctx.getSession().getScheduledWriteRequests() == 0 ) + if ( ctx.getState() == State.READY && ctx.getScheduledExpirations() == 0 + && ctx.getSession().getScheduledWriteRequests() == 0 ) { beginReplication( ctx ); } } + private void onLoginAck( ReplicationContext ctx, LoginAckMessage message ) { - if( message.getResponseCode() != Constants.OK ) + if ( message.getResponseCode() != Constants.OK ) { - SessionLog.warn( ctx.getSession(), - "Login attempt failed: " + message.getResponseCode() ); + SessionLog.warn( ctx.getSession(), "Login attempt failed: " + message.getResponseCode() ); ctx.getSession().close(); return; } - + Iterator i = ctx.getConfiguration().getPeerReplicas().iterator(); - while( i.hasNext() ) + while ( i.hasNext() ) { Replica replica = ( Replica ) i.next(); - if( replica.getId().equals( message.getReplicaId() ) ) + if ( replica.getId().equals( message.getReplicaId() ) ) { - if( replica.getAddress().getAddress().equals( - ( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) ) + if ( replica.getAddress().getAddress().equals( + ( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) ) { ctx.setPeer( replica ); ctx.setState( State.READY ); - + beginReplication( ctx ); return; } else { - SessionLog.warn( - ctx.getSession(), - "Peer address mismatches: " + - ctx.getSession().getRemoteAddress() + - " (expected: " + replica.getAddress() ); + SessionLog.warn( ctx.getSession(), "Peer address mismatches: " + + ctx.getSession().getRemoteAddress() + " (expected: " + replica.getAddress() ); ctx.getSession().close(); return; } } } - - SessionLog.warn( - ctx.getSession(), - "Unknown peer replica ID: " + message.getReplicaId() ); + + SessionLog.warn( ctx.getSession(), "Unknown peer replica ID: " + message.getReplicaId() ); ctx.getSession().close(); } + private void beginReplication( ReplicationContext ctx ) { // Initiate replication process asking update vector. ctx.getSession().write( new BeginLogEntriesMessage( ctx.getNextSequence() ) ); } + private void onLogEntryAck( ReplicationContext ctx, LogEntryAckMessage message ) throws Exception { - if( message.getResponseCode() != Constants.OK ) + if ( message.getResponseCode() != Constants.OK ) { - SessionLog.warn( - ctx.getSession(), - "Remote peer failed to execute a log entry." ); + SessionLog.warn( ctx.getSession(), "Remote peer failed to execute a log entry." ); ctx.getSession().close(); } } - private void onBeginLogEntriesAck( ReplicationContext ctx, BeginLogEntriesAckMessage message ) throws NamingException + + private void onBeginLogEntriesAck( ReplicationContext ctx, BeginLogEntriesAckMessage message ) + throws NamingException { // Start transaction only when the server says OK. - if( message.getResponseCode() != Constants.OK ) + if ( message.getResponseCode() != Constants.OK ) { return; } @@ -229,60 +225,55 @@ { myPV = store.getPurgeVector(); } - catch( Exception e ) + catch ( Exception e ) { - SessionLog.warn( - ctx.getSession(), - "Failed to get update vector.", e ); + SessionLog.warn( ctx.getSession(), "Failed to get update vector.", e ); ctx.getSession().close(); return; } - + // Do full-DIT transfer if the peer is new and I'm not new. try { - if( myPV.size() > 0 && yourUV.size() == 0 ) + if ( myPV.size() > 0 && yourUV.size() == 0 ) { - SessionLog.warn( - ctx.getSession(), - "Starting a whole DIT transfer." ); + SessionLog.warn( ctx.getSession(), "Starting a whole DIT transfer." ); sendAllEntries( ctx ); } else { - SessionLog.warn( - ctx.getSession(), - "Starting a partial replication log transfer." ); + SessionLog.warn( ctx.getSession(), "Starting a partial replication log transfer." ); sendReplicationLogs( ctx, myPV, yourUV ); } } finally { // Send EngLogEntries message to release the remote peer resources. - ctx.getSession().write( new EndLogEntriesMessage ( ctx.getNextSequence() ) ); + ctx.getSession().write( new EndLogEntriesMessage( ctx.getNextSequence() ) ); } } - + + private void sendAllEntries( ReplicationContext ctx ) throws NamingException { Attributes rootDSE = ctx.getServiceConfiguration().getPartitionNexus().getRootDSE(); - + Attribute namingContextsAttr = rootDSE.get( "namingContexts" ); - if( namingContextsAttr == null || namingContextsAttr.size() == 0 ) + if ( namingContextsAttr == null || namingContextsAttr.size() == 0 ) { SessionLog.warn( ctx.getSession(), "No namingContexts attributes in rootDSE." ); return; } - + // Iterate all context partitions to send all entries of them. NamingEnumeration e = namingContextsAttr.getAll(); - while( e.hasMore() ) + while ( e.hasMore() ) { Object value = e.next(); - + // Convert attribute value to JNDI name. LdapDN contextName; - if( value instanceof LdapDN ) + if ( value instanceof LdapDN ) { contextName = ( LdapDN ) value; } @@ -290,56 +281,55 @@ { contextName = new LdapDN( String.valueOf( value ) ); } - + SessionLog.info( ctx.getSession(), "Sending entries under '" + contextName + '\'' ); - Map mapping = ctx.getServiceConfiguration().getGlobalRegistries() - .getAttributeTypeRegistry().getNormalizerMapping(); + Map mapping = ctx.getServiceConfiguration().getGlobalRegistries().getAttributeTypeRegistry() + .getNormalizerMapping(); contextName.normalize( mapping ); sendAllEntries( ctx, contextName ); } } + private void sendAllEntries( ReplicationContext ctx, LdapDN contextName ) throws NamingException { // Retrieve all subtree including the base entry SearchControls ctrl = new SearchControls(); - ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE ); - NamingEnumeration e = ctx.getServiceConfiguration().getPartitionNexus().search( - contextName, - ctx.getServiceConfiguration().getEnvironment(), - new PresenceNode( org.apache.directory.mitosis.common.Constants.OBJECT_CLASS_OID ), ctrl ); - + ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE ); + NamingEnumeration e = ctx.getServiceConfiguration().getPartitionNexus().search( contextName, + ctx.getServiceConfiguration().getEnvironment(), + new PresenceNode( org.apache.directory.mitosis.common.Constants.OBJECT_CLASS_OID ), ctrl ); + try { - while( e.hasMore() ) + while ( e.hasMore() ) { SearchResult sr = ( SearchResult ) e.next(); Attributes attrs = sr.getAttributes(); - + // Skip entries without entryCSN attribute. Attribute entryCSNAttr = attrs.get( org.apache.directory.mitosis.common.Constants.ENTRY_CSN ); - if( entryCSNAttr == null ) + if ( entryCSNAttr == null ) { continue; } - + // Get entryCSN of the entry. Skip if entryCSN value is invalid. CSN csn = null; try { csn = new SimpleCSN( String.valueOf( entryCSNAttr.get() ) ); } - catch( IllegalArgumentException ex ) + catch ( IllegalArgumentException ex ) { - SessionLog.warn( ctx.getSession(), - "An entry with improper entryCSN: " + sr.getName() ); + SessionLog.warn( ctx.getSession(), "An entry with improper entryCSN: " + sr.getName() ); continue; } - + // Convert the entry into AddEntryOperation log. Operation op = new AddEntryOperation( csn, new LdapDN( sr.getName() ), attrs ); - + // Send a LogEntry message for the entry. ctx.getSession().write( new LogEntryMessage( ctx.getNextSequence(), op ) ); } @@ -350,28 +340,28 @@ } } + private void sendReplicationLogs( ReplicationContext ctx, CSNVector myPV, CSNVector yourUV ) { Iterator i = myPV.getReplicaIds().iterator(); - while( i.hasNext() ) + while ( i.hasNext() ) { ReplicaId replicaId = ( ReplicaId ) i.next(); CSN myCSN = myPV.getCSN( replicaId ); CSN yourCSN = yourUV.getCSN( replicaId ); - if( yourCSN != null && ( myCSN == null || yourCSN.compareTo( myCSN ) < 0 ) ) + if ( yourCSN != null && ( myCSN == null || yourCSN.compareTo( myCSN ) < 0 ) ) { - SessionLog.warn( - ctx.getSession(), - "Remote update vector (" + yourUV + ") is out-of-date. Full replication is required." ); + SessionLog.warn( ctx.getSession(), "Remote update vector (" + yourUV + + ") is out-of-date. Full replication is required." ); ctx.getSession().close(); return; } } - + ReplicationLogIterator logIt = ctx.getConfiguration().getStore().getLogs( yourUV, false ); try { - while( logIt.next() ) + while ( logIt.next() ) { Operation op = logIt.getOperation(); ctx.getSession().write( new LogEntryMessage( ctx.getNextSequence(), op ) ); @@ -382,12 +372,11 @@ logIt.close(); } } - + + private void onUnexpectedMessage( ReplicationContext ctx, Object message ) { - SessionLog.warn( - ctx.getSession(), - "Unexpected message: " + message ); + SessionLog.warn( ctx.getSession(), "Unexpected message: " + message ); ctx.getSession().close(); } } Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java?view=diff&rev=471315&r1=471314&r2=471315 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java (original) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java Sat Nov 4 17:25:51 2006 @@ -19,10 +19,11 @@ */ package org.apache.directory.mitosis.service.protocol.handler; + import org.apache.directory.mitosis.service.ReplicationService; -public class ReplicationClientProtocolHandler extends - ReplicationProtocolHandler + +public class ReplicationClientProtocolHandler extends ReplicationProtocolHandler { public ReplicationClientProtocolHandler( ReplicationService service ) { Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java?view=diff&rev=471315&r1=471314&r2=471315 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java (original) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java Sat Nov 4 17:25:51 2006 @@ -19,6 +19,7 @@ */ package org.apache.directory.mitosis.service.protocol.handler; + import org.apache.directory.mitosis.configuration.ReplicationConfiguration; import org.apache.directory.mitosis.service.ReplicationContext; import org.apache.directory.mitosis.service.ReplicationService; @@ -28,19 +29,18 @@ import org.apache.mina.common.IoHandler; import org.apache.mina.common.IoSession; + public class ReplicationProtocolHandler implements IoHandler { private static final String CONTEXT = "context"; - + private final ReplicationService service; private final ReplicationConfiguration configuration; private final DirectoryServiceConfiguration serviceCfg; private final ReplicationContextHandler contextHandler; - public ReplicationProtocolHandler( - ReplicationService service, - ReplicationContextHandler contextHandler ) + public ReplicationProtocolHandler( ReplicationService service, ReplicationContextHandler contextHandler ) { assert service != null; assert contextHandler != null; @@ -50,32 +50,38 @@ this.serviceCfg = service.getFactoryConfiguration(); this.contextHandler = contextHandler; } - + + private ReplicationContext getContext( IoSession session ) { return ( ReplicationContext ) session.getAttribute( CONTEXT ); } - + + public void sessionCreated( IoSession session ) throws Exception { session.setAttribute( CONTEXT, new SimpleReplicationContext( service, serviceCfg, configuration, session ) ); } + public void exceptionCaught( IoSession session, Throwable cause ) throws Exception { contextHandler.exceptionCaught( getContext( session ), cause ); } + public void messageReceived( IoSession session, Object message ) throws Exception { contextHandler.messageReceived( getContext( session ), message ); } + public void messageSent( IoSession session, Object message ) throws Exception { contextHandler.messageSent( getContext( session ), message ); } + public void sessionClosed( IoSession session ) throws Exception { ReplicationContext ctx = getContext( session ); @@ -83,10 +89,12 @@ ctx.cancelAllExpirations(); } + public void sessionIdle( IoSession session, IdleStatus status ) throws Exception { contextHandler.contextIdle( getContext( session ), status ); } + public void sessionOpened( IoSession session ) throws Exception { Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java?view=diff&rev=471315&r1=471314&r2=471315 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java (original) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java Sat Nov 4 17:25:51 2006 @@ -19,10 +19,11 @@ */ package org.apache.directory.mitosis.service.protocol.handler; + import org.apache.directory.mitosis.service.ReplicationService; -public class ReplicationServerProtocolHandler extends - ReplicationProtocolHandler + +public class ReplicationServerProtocolHandler extends ReplicationProtocolHandler { public ReplicationServerProtocolHandler( ReplicationService service ) {