Return-Path: Delivered-To: apmail-directory-commits-archive@www.apache.org Received: (qmail 9302 invoked from network); 12 Oct 2006 18:33:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 12 Oct 2006 18:33:50 -0000 Received: (qmail 92343 invoked by uid 500); 12 Oct 2006 18:33:50 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 92210 invoked by uid 500); 12 Oct 2006 18:33:49 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 92144 invoked by uid 99); 12 Oct 2006 18:33:49 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Oct 2006 11:33:49 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Oct 2006 11:33:42 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id D721F1A981D; Thu, 12 Oct 2006 11:33:21 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r463366 [2/5] - in /directory/trunks/apacheds/mitosis/src: ./ main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/directory/ main/java/org/apache/directory/mitosis/ main/java/org/apache/directory/mitosis/common/ main/... Date: Thu, 12 Oct 2006 18:33:19 -0000 To: commits@directory.apache.org From: elecharny@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061012183321.D721F1A981D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/AttributeOperation.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/AttributeOperation.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/AttributeOperation.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/AttributeOperation.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.operation; + +import javax.naming.Name; +import javax.naming.NamingException; +import javax.naming.directory.Attribute; + +import org.apache.directory.server.core.partition.PartitionNexus; +import org.apache.directory.mitosis.common.CSN; +import org.apache.directory.mitosis.operation.support.EntryUtil; +import org.apache.directory.mitosis.store.ReplicationStore; + +/** + * An {@link Operation} that adds an attribute to an entry. + * + * @author Apache Directory Project + */ +public abstract class AttributeOperation extends Operation +{ + private final Name name; + private final Attribute attribute; + + /** + * Create a new operation that affects an entry with the specified name. + * + * @param name the normalized name of an entry + * @param attribute an attribute to modify + */ + public AttributeOperation( CSN csn, Name name, Attribute attribute ) + { + super( csn ); + + assert name != null; + assert attribute != null; + + this.name = name; + this.attribute = ( Attribute ) attribute.clone(); + } + + /** + * Returns the attribute to modify. + */ + public Attribute getAttribute() + { + return ( Attribute ) attribute.clone(); + } + + /** + * Returns the name of an entry this operation will affect. + */ + public Name getName() + { + return ( Name ) name.clone(); + } + + protected final void execute0( PartitionNexus nexus, ReplicationStore store ) throws NamingException + { + if( !EntryUtil.isEntryUpdatable( nexus, name, getCSN() ) ) + { + return; + } + EntryUtil.createGlueEntries( nexus, name, true ); + + execute1( nexus ); + } + + protected abstract void execute1( PartitionNexus nexus ) throws NamingException; + + + /** + * Returns string representation of this operation. + */ + public String toString() + { + return super.toString() + ": [" + name.toString() + ']'; + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/CompositeOperation.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/CompositeOperation.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/CompositeOperation.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/CompositeOperation.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.operation; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import javax.naming.Name; +import javax.naming.NamingException; + +import org.apache.directory.server.core.DirectoryServiceConfiguration; +import org.apache.directory.server.core.partition.PartitionNexus; +import org.apache.directory.mitosis.common.CSN; +import org.apache.directory.mitosis.common.CSNVector; +import org.apache.directory.mitosis.common.ReplicaId; +import org.apache.directory.mitosis.common.UUID; +import org.apache.directory.mitosis.configuration.ReplicationConfiguration; +import org.apache.directory.mitosis.store.ReplicationLogIterator; +import org.apache.directory.mitosis.store.ReplicationStore; + +/** + * An {@link Operation} that contains other {@link Operation}s. + * + * @author Apache Directory Project + */ +public class CompositeOperation extends Operation +{ + private static final long serialVersionUID = 6252675003841951356L; + + private static final ReplicationStore DUMMY_STORE = new ReplicationStore() { + + public void open( DirectoryServiceConfiguration serviceCfg, ReplicationConfiguration cfg) { + } + + public void close() { + } + + public ReplicaId getReplicaId() { + return null; + } + + public Set getKnownReplicaIds() { + return null; + } + + public Name getDN(UUID uuid) { + return null; + } + + public boolean putUUID(UUID uuid, Name dn) { + return false; + } + + public boolean removeUUID(UUID uuid) { + return false; + } + + public void putLog(Operation operation) { + } + + public ReplicationLogIterator getLogs(CSN fromCSN, boolean inclusive) { + return null; + } + + public ReplicationLogIterator getLogs(CSNVector updateVector, boolean inclusive) { + return null; + } + + public int removeLogs(CSN toCSN, boolean inclusive) { + return 0; + } + + public int getLogSize() { + return 0; + } + + public int getLogSize(ReplicaId replicaId) { + return 0; + } + + public CSNVector getUpdateVector() { + return null; + } + + public CSNVector getPurgeVector() { + return null; + } + }; + + private final List children = new ArrayList(); + + public CompositeOperation( CSN csn ) + { + super( csn ); + } + + public void add( Operation op ) + { + assert op != null; + assert op.getCSN().equals( this.getCSN() ); + children.add( op ); + } + + public void clear() + { + children.clear(); + } + + protected void execute0( PartitionNexus nexus, ReplicationStore store ) throws NamingException + { + Iterator i = children.iterator(); + while( i.hasNext() ) + { + Operation op = ( Operation ) i.next(); + op.execute( nexus, DUMMY_STORE ); + } + } + + public String toString() + { + return children.toString(); + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/DeleteAttributeOperation.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/DeleteAttributeOperation.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/DeleteAttributeOperation.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/DeleteAttributeOperation.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.operation; + +import javax.naming.Name; +import javax.naming.NamingException; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.directory.BasicAttributes; +import javax.naming.directory.DirContext; + +import org.apache.directory.server.core.partition.PartitionNexus; +import org.apache.directory.shared.ldap.name.LdapDN; +import org.apache.directory.mitosis.common.CSN; + +/** + * An {@link Operation} that deletes an attribute from an entry. + * + * @author Apache Directory Project + */ +public class DeleteAttributeOperation extends AttributeOperation +{ + private static final long serialVersionUID = -131557844165710365L; + + /** + * Creates a new operation that deletes the specified attribute. + * + * @param attribute an attribute to delete + */ + public DeleteAttributeOperation( CSN csn, Name name, Attribute attribute ) + { + super( csn, name, attribute ); + } + + public String toString() + { + return super.toString() + ".delete( " + getAttribute() + " )"; + } + + protected void execute1( PartitionNexus nexus ) throws NamingException + { + Attributes attrs = new BasicAttributes(); + attrs.put( getAttribute() ); + nexus.modify( (LdapDN)getName(), DirContext.REMOVE_ATTRIBUTE, attrs ); + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/Operation.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/Operation.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/Operation.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/Operation.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.operation; + +import java.io.Serializable; + +import javax.naming.NamingException; +import javax.naming.OperationNotSupportedException; + +import org.apache.directory.server.core.partition.PartitionNexus; +import org.apache.directory.mitosis.common.CSN; +import org.apache.directory.mitosis.store.ReplicationStore; + +/** + * Represents a small operation on an entry in replicated {@link DirectoryPartition}. + * + * @author Apache Directory Project + */ +public class Operation implements Serializable +{ + /** + * Declares the Serial Version Uid. + * + * @see Always + * Declare Serial Version Uid + */ + private static final long serialVersionUID = 1L; + + /** The entry CSN */ + private CSN csn; + + /** + * Creates a new instance of Operation, for the entry which + * csn is given as a parameter. + * + * @param csn The entry's csn. + */ + public Operation( CSN csn ) + { + assert csn != null; + this.csn = csn; + } + + /** + * @return Returns {@link CSN} for this operation. + */ + public CSN getCSN() + { + return csn; + } + + /** + * @return the CSN for this operation + */ + public String toString() + { + return csn.toString(); + } + + /** + * Exeutes this operation on the specified nexus. + */ + public final void execute( PartitionNexus nexus, ReplicationStore store ) throws NamingException + { + synchronized( nexus ) + { + execute0( nexus, store ); + store.putLog( this ); + } + } + + protected void execute0( PartitionNexus nexus, ReplicationStore store ) throws NamingException + { + throw new OperationNotSupportedException( nexus.getSuffix().toString() ); + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationCodec.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationCodec.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationCodec.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationCodec.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.operation; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamConstants; + +/** + * Encodes {@link Operation}s to byte[] and vice versa. + * + * @author Apache Directory Project + */ +public class OperationCodec +{ + public OperationCodec() + { + } + + public byte[] encode( Operation op ) + { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + try + { + ObjectOutputStream out = new ObjectOutputStream( bout ); + out.useProtocolVersion( ObjectStreamConstants.PROTOCOL_VERSION_2 ); + out.writeObject( op ); + out.flush(); + out.close(); + } + catch( IOException e ) + { + throw ( InternalError ) new InternalError().initCause( e ); + } + return bout.toByteArray(); + } + + public Operation decode( byte[] data ) + { + ObjectInputStream in; + try + { + in = new ObjectInputStream( + new ByteArrayInputStream( data ) ); + return ( Operation ) in.readObject(); + } + catch( IOException e ) + { + throw ( InternalError ) new InternalError().initCause( e ); + } + catch( ClassNotFoundException e ) + { + throw ( InternalError ) new InternalError().initCause( e ); + } + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.operation; + +import java.util.Map; + +import javax.naming.Name; +import javax.naming.NameAlreadyBoundException; +import javax.naming.NamingEnumeration; +import javax.naming.NamingException; +import javax.naming.OperationNotSupportedException; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.directory.BasicAttribute; +import javax.naming.directory.DirContext; +import javax.naming.directory.ModificationItem; +import javax.naming.directory.SearchControls; +import javax.naming.directory.SearchResult; + +import org.apache.directory.server.core.DirectoryServiceConfiguration; +import org.apache.directory.server.core.partition.PartitionNexus; +import org.apache.directory.shared.ldap.filter.PresenceNode; +import org.apache.directory.shared.ldap.name.LdapDN; +import org.apache.directory.shared.ldap.util.NamespaceTools; +import org.apache.directory.mitosis.common.CSN; +import org.apache.directory.mitosis.common.CSNFactory; +import org.apache.directory.mitosis.common.Constants; +import org.apache.directory.mitosis.common.ReplicaId; +import org.apache.directory.mitosis.common.UUIDFactory; +import org.apache.directory.mitosis.configuration.ReplicationConfiguration; + +/** + * Converts a complex JNDI operations into multiple simple operations. + * + * @author Apache Directory Project + */ +public class OperationFactory +{ + private final ReplicaId replicaId; + private final Map environment; + private final PartitionNexus nexus; + private final UUIDFactory uuidFactory; + private final CSNFactory csnFactory; + + public OperationFactory( DirectoryServiceConfiguration serviceCfg, ReplicationConfiguration cfg ) + { + this.replicaId = cfg.getReplicaId(); + this.environment = serviceCfg.getEnvironment(); + this.nexus = serviceCfg.getPartitionNexus(); + this.uuidFactory = cfg.getUuidFactory(); + this.csnFactory = cfg.getCsnFactory(); + } + + public Operation newAdd( String userProvidedName, Name normalizedName, Attributes entry ) throws NamingException + { + return newAdd( newCSN(), userProvidedName, normalizedName, entry ); + } + + private Operation newAdd( CSN csn, String userProvidedName, Name normalizedName, Attributes entry ) throws NamingException + { + // Check an entry already exists. + checkBeforeAdd( normalizedName ); + + CompositeOperation result = new CompositeOperation( csn ); + + // Insert 'entryUUID' and 'entryDeleted'. + entry = ( Attributes ) entry.clone(); + entry.remove( Constants.ENTRY_UUID ); + entry.remove( Constants.ENTRY_DELETED ); + entry.put( Constants.ENTRY_UUID, uuidFactory.newInstance().toOctetString() ); + entry.put( Constants.ENTRY_DELETED, "false" ); + + // NOTE: We inlined addDefaultOperations() because ApacheDS currently + // creates an index entry only for ADD operation (and not for + // MODIFY operation) + entry.put( Constants.ENTRY_CSN, csn.toOctetString() ); + + result.add( new AddEntryOperation( csn, normalizedName, userProvidedName, entry ) ); + return result; + } + + public Operation newDelete( Name normalizedName ) + { + CSN csn = newCSN(); + CompositeOperation result = new CompositeOperation( csn ); + + // Transform into replace operation. + result.add( + new ReplaceAttributeOperation( + csn, + normalizedName, + new BasicAttribute( Constants.ENTRY_DELETED, "true" ) ) ); + + return addDefaultOperations( result, csn, normalizedName ); + } + + public Operation newModify( Name normalizedName, int modOp, Attributes attributes ) + { + CSN csn = newCSN(); + CompositeOperation result = new CompositeOperation( csn ); + NamingEnumeration e = attributes.getAll(); + // Transform into multiple {@link AttributeOperation}s. + while( e.hasMoreElements() ) + { + Attribute attr = ( Attribute ) e.nextElement(); + result.add( newModify( csn, normalizedName, modOp, attr ) ); + } + + // Resurrect the entry in case it is deleted. + result.add( + new ReplaceAttributeOperation( + csn, + normalizedName, + new BasicAttribute( Constants.ENTRY_DELETED, "false" ) ) ); + + return addDefaultOperations( result, null, normalizedName ); + } + + public Operation newModify( Name normalizedName, ModificationItem[] items ) + { + CSN csn = newCSN(); + CompositeOperation result = new CompositeOperation( csn ); + final int length = items.length; + // Transform into multiple {@link AttributeOperation}s. + for( int i = 0; i < length; i ++ ) + { + ModificationItem item = items[ i ]; + result.add( + newModify( + csn, + normalizedName, + item.getModificationOp(), + item.getAttribute() ) ); + } + + // Resurrect the entry in case it is deleted. + result.add( + new ReplaceAttributeOperation( + csn, + normalizedName, + new BasicAttribute( Constants.ENTRY_DELETED, "false" ) ) ); + + return addDefaultOperations( result, csn, normalizedName ); + } + + private Operation newModify( CSN csn, Name normalizedName, int modOp, Attribute attribute ) + { + switch( modOp ) + { + case DirContext.ADD_ATTRIBUTE: + return new AddAttributeOperation( + csn, + normalizedName, + attribute ); + case DirContext.REPLACE_ATTRIBUTE: + return new ReplaceAttributeOperation( + csn, + normalizedName, + attribute ); + case DirContext.REMOVE_ATTRIBUTE: + return new DeleteAttributeOperation( + csn, + normalizedName, + attribute ); + default: + throw new IllegalArgumentException( "Unknown modOp: " + modOp ); + } + } + + public Operation newModifyRn( Name oldName, String newRdn, boolean deleteOldRn ) throws NamingException + { + return newMove( oldName, oldName.getSuffix( 1 ), newRdn, deleteOldRn ); + } + + public Operation newMove( Name oldName, Name newParentName ) throws NamingException + { + return newMove( oldName, newParentName, oldName.get( oldName.size() - 1 ), true ); + } + + public Operation newMove( Name oldName, Name newParentName, String newRdn, boolean deleteOldRn ) throws NamingException + { + if( !deleteOldRn ) + { + throw new OperationNotSupportedException( "deleteOldRn must be true." ); + } + + // Prepare to create composite operations + CSN csn = newCSN(); + CompositeOperation result = new CompositeOperation( csn ); + + // Retrieve all subtree including the base entry + SearchControls ctrl = new SearchControls(); + ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE ); + NamingEnumeration e = nexus.search( + (LdapDN)oldName, environment, new PresenceNode( "objectClass" ), ctrl ); + + while( e.hasMore() ) + { + SearchResult sr = ( SearchResult ) e.next(); + + // Get the name of the old entry + Name oldEntryName = new LdapDN( sr.getName() ); + + // Delete the old entry + result.add( + new ReplaceAttributeOperation( + csn, + oldEntryName, + new BasicAttribute( Constants.ENTRY_DELETED, "true" ) ) ); + + // Get the old entry attributes and replace RDN if required + Attributes entry = sr.getAttributes(); + if( oldEntryName.size() == oldName.size() ) + { + entry.remove( + NamespaceTools.getRdnAttribute( + oldName.get( oldName.size() - 1 ) ) ); + entry.put( + NamespaceTools.getRdnAttribute( newRdn ), + NamespaceTools.getRdnValue( newRdn ) ); + } + + // Calculate new name from newParentName, oldEntryName, and newRdn. + Name newEntryName = ( Name ) newParentName.clone(); + newEntryName.add( newRdn ); + for( int i = oldEntryName.size() - newEntryName.size(); i > 0; i-- ) + { + newEntryName.add( oldEntryName.get( oldEntryName.size() - i ) ); + } + + // Add the new entry + //// FIXME Get UPDN somehow + result.add( newAdd( csn, newEntryName.toString(), newEntryName, entry ) ); + + // Add default operations to the old entry. + // Please note that newAdd() already added default operations + // to the new entry. + addDefaultOperations( result, csn, oldEntryName ); + } + + return result; + } + + private void checkBeforeAdd( Name newEntryName ) throws NamingException + { + if( nexus.hasEntry( (LdapDN)newEntryName ) ) + { + throw new NameAlreadyBoundException( newEntryName.toString() + " already exists." ); + } + } + + private CompositeOperation addDefaultOperations( CompositeOperation result, CSN csn, Name normalizedName ) + { + result.add( + new ReplaceAttributeOperation( + csn, + normalizedName, + new BasicAttribute( + Constants.ENTRY_CSN, + csn.toOctetString()) ) ); + return result; + } + + private CSN newCSN() + { + return csnFactory.newInstance( replicaId ); + } + +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/ReplaceAttributeOperation.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/ReplaceAttributeOperation.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/ReplaceAttributeOperation.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/ReplaceAttributeOperation.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.operation; + +import javax.naming.Name; +import javax.naming.NamingException; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.directory.BasicAttributes; +import javax.naming.directory.DirContext; + +import org.apache.directory.server.core.partition.PartitionNexus; +import org.apache.directory.shared.ldap.name.LdapDN; +import org.apache.directory.mitosis.common.CSN; + +/** + * An {@link Operation} that replaces an attribute in an entry. + * + * @author Apache Directory Project + */ +public class ReplaceAttributeOperation extends AttributeOperation +{ + private static final long serialVersionUID = -6573196586521610472L; + + /** + * Creates a new operation that replaces the specified attribute. + * + * @param attribute an attribute to replace + */ + public ReplaceAttributeOperation( CSN csn, Name name, Attribute attribute ) + { + super( csn, name, attribute ); + } + + public String toString() + { + return super.toString() + ".replace( " + getAttribute() + " )"; + } + + protected void execute1( PartitionNexus nexus ) throws NamingException + { + Attributes attrs = new BasicAttributes(); + attrs.put( getAttribute() ); + nexus.modify( (LdapDN)getName(), DirContext.REPLACE_ATTRIBUTE, attrs ); + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/support/EntryUtil.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/support/EntryUtil.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/support/EntryUtil.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/operation/support/EntryUtil.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.operation.support; + +import javax.naming.Name; +import javax.naming.NameNotFoundException; +import javax.naming.NamingException; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.directory.BasicAttribute; +import javax.naming.directory.BasicAttributes; + +import org.apache.directory.server.core.partition.PartitionNexus; +import org.apache.directory.shared.ldap.name.LdapDN; +import org.apache.directory.shared.ldap.util.NamespaceTools; +import org.apache.directory.mitosis.common.CSN; +import org.apache.directory.mitosis.common.Constants; +import org.apache.directory.mitosis.common.SimpleCSN; + +public class EntryUtil +{ + public static boolean isEntryUpdatable( PartitionNexus nexus, Name name, CSN newCSN ) throws NamingException + { + Attributes entry = nexus.lookup( (LdapDN)name ); + + if( entry == null ) + { + return true; + } + + Attribute entryCSNAttr = entry.get( Constants.ENTRY_CSN ); + + if( entryCSNAttr == null ) + { + return true; + } + else + { + CSN oldCSN = null; + + try + { + oldCSN = new SimpleCSN( String.valueOf( entryCSNAttr.get() ) ); + } + catch( IllegalArgumentException e ) + { + return true; + } + + return oldCSN.compareTo( newCSN ) < 0; + } + } + + public static void createGlueEntries( PartitionNexus nexus, Name name, boolean includeLeaf ) throws NamingException + { + assert name.size() > 0; + + for( int i = name.size() - 1; i > 0; i -- ) + { + createGlueEntry( nexus, name.getSuffix( i ) ); + } + + if( includeLeaf ) + { + createGlueEntry( nexus, name ); + } + } + + private static void createGlueEntry( PartitionNexus nexus, Name name ) throws NamingException + { + try + { + if( nexus.hasEntry( (LdapDN)name ) ) + { + return; + } + } + catch( NameNotFoundException e ) + { + // Skip if there's no backend associated with the name. + return; + } + + // Create a glue entry. + Attributes entry = new BasicAttributes(); + //// Add RDN attribute. + String rdn = name.get( name.size() - 1 ); + String rdnAttribute = NamespaceTools.getRdnAttribute( rdn ); + String rdnValue = NamespaceTools.getRdnValue( rdn ); + entry.put( rdnAttribute, rdnValue ); + //// Add objectClass attribute. + Attribute objectClassAttr = new BasicAttribute( "objectClass" ); + objectClassAttr.add( "top" ); + objectClassAttr.add( "extensibleObject" ); + entry.put( objectClassAttr ); + + // And add it to the nexus. + nexus.add( (LdapDN)name, entry ); + } + + private EntryUtil() + { + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.LoggingFilter; +import org.apache.mina.filter.thread.ThreadPoolFilter; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.directory.mitosis.common.Replica; +import org.apache.directory.mitosis.configuration.ReplicationConfiguration; +import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientProtocolHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages all outgoing connections to remote replicas. + * + * @author Trustin Lee + * @version $Rev: 116 $, $Date: 2006-09-18 13:47:53Z $ + */ +class ClientConnectionManager +{ + private static final Logger log = LoggerFactory.getLogger( ClientConnectionManager.class ); + + private final ReplicationService service; + private final IoConnector connector = new SocketConnector(); + private final Map sessions = new HashMap(); + private ReplicationConfiguration configuration; + private ConnectionMonitor monitor; + + ClientConnectionManager( ReplicationService service ) + { + this.service = service; + } + + public void start( ReplicationConfiguration cfg ) throws Exception + { + // initialze client connection + //// initialize thread pool + ThreadPoolFilter threadPoolFilter = new ThreadPoolFilter(); + connector.getFilterChain().addLast( "threadPool", threadPoolFilter ); + //// add logger + connector.getFilterChain().addLast( "logger", new LoggingFilter() ); + + this.configuration = cfg; + + monitor = new ConnectionMonitor(); + monitor.start(); + } + + public void stop() throws Exception + { + // close all connections + monitor.shutdown(); + + // remove all filters + connector.getFilterChain().remove( "threadPool" ); + connector.getFilterChain().remove( "logger" ); + } + + private class ConnectionMonitor extends Thread + { + private boolean timeToShutdown = false; + + public ConnectionMonitor() + { + super( "ClientConnectionManager" ); + } + + public void shutdown() + { + timeToShutdown = true; + while( isAlive() ) + { + try + { + join(); + } + catch( InterruptedException e ) + { + log.warn( "Unexpected exception.", e ); + } + } + } + + public void run() + { + while( !timeToShutdown ) + { + connectUnconnected(); + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e ) + { + log.warn( "Unexpected exception.", e ); + } + } + + disconnectConnected(); + } + + private void connectUnconnected() + { + Iterator i = configuration.getPeerReplicas().iterator(); + while( i.hasNext() ) + { + Replica replica = ( Replica ) i.next(); + Connection con = ( Connection ) sessions.get( replica.getId() ); + if( con == null ) + { + con = new Connection(); + sessions.put( replica.getId(), con ); + } + + synchronized( con ) + { + if( con.inProgress ) + { + // connection is in progress + continue; + } + + if( con.session != null ) + { + if( con.session.isConnected() ) + { + continue; + } + con.session = null; + } + + // put to connectingSession with dummy value to mark + // that connection is in progress + con.inProgress = true; + + if( con.delay < 0 ) + { + con.delay = 0; + } + else if( con.delay == 0 ) + { + con.delay = 2; + } + else + { + con.delay *= 2; + if( con.delay > 60 ) + { + con.delay = 60; + } + } + } + + Connector connector = new Connector( replica, con ); + synchronized( con ) + { + con.connector = connector; + } + connector.start(); + } + } + + private void disconnectConnected() + { + log.info( "Closing all connections..." ); + for( ;; ) + { + Iterator i = sessions.values().iterator(); + while( i.hasNext() ) + { + Connection con = ( Connection ) i.next(); + synchronized( con ) + { + if( con.inProgress ) + { + if( con.connector != null ) + { + con.connector.interrupt(); + } + continue; + } + + i.remove(); + + if( con.session != null ) + { + con.session.close(); + } + } + } + + if( sessions.isEmpty() ) + { + break; + } + + // Sleep 1 second and try again waiting for Connector threads. + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e ) + { + log.warn( "Unexpected exception.", e ); + } + } + } + } + + private class Connector extends Thread + { + private final Replica replica; + private final Connection con; + + public Connector( Replica replica, Connection con ) + { + super( "ClientConnectionManager-" + replica ); + this.replica = replica; + this.con = con; + } + + public void run() + { + if( con.delay > 0 ) + { + log.info( "[" + replica + "] Waiting for " + con.delay + " seconds to reconnect." ); + try + { + Thread.sleep( con.delay * 1000L ); + } + catch( InterruptedException e ) + { + } + } + + log.info( "[" + replica + "] Connecting..." ); + + IoSession session; + try + { + connector.setConnectTimeout( configuration.getResponseTimeout() ); + ConnectFuture future = connector.connect( + replica.getAddress(), + new ReplicationClientProtocolHandler( service ) ); + + future.join(); + session = future.getSession(); + + synchronized( con ) + { + con.session = session; + con.delay = -1; // reset delay + con.inProgress = false; + } + } + catch( IOException e ) + { + log.warn("[" + replica + "] Failed to connect.", e ); + } + finally + { + synchronized( con ) + { + con.inProgress = false; + con.connector = null; + } + } + } + } + + private static class Connection + { + private IoSession session; + private int delay = -1; + private boolean inProgress; + private Connector connector; + + public Connection() + { + } + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service; + +import org.apache.directory.server.core.DirectoryServiceConfiguration; +import org.apache.mina.common.IoSession; +import org.apache.directory.mitosis.common.Replica; +import org.apache.directory.mitosis.configuration.ReplicationConfiguration; + +public interface ReplicationContext { + IoSession getSession(); + ReplicationConfiguration getConfiguration(); + ReplicationService getService(); + DirectoryServiceConfiguration getServiceConfiguration(); + int getNextSequence(); + + Replica getPeer(); + void setPeer( Replica peer ); + + State getState(); + void setState( State state ); + + void scheduleExpiration( Object message ); + Object cancelExpiration( int sequence ); + void cancelAllExpirations(); + int getScheduledExpirations(); + + public static class State + { + /** + * Connection is established. + */ + public static final State INIT = new State( "INIT" ); + + /** + * Client has logged in and is ready to exchange information. + */ + public static final State READY = new State( "READY" ); + + private final String value; + + private State( String value ) + { + this.value = value; + } + + public String toString() + { + return value; + } + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationLogCleanJob.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationLogCleanJob.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationLogCleanJob.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationLogCleanJob.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service; + +import java.util.Iterator; + +import javax.naming.NamingException; + +import org.apache.directory.server.core.DirectoryService; +import org.apache.directory.server.core.interceptor.Interceptor; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +public class ReplicationLogCleanJob implements Job +{ + public static final String INSTANCE_ID = "instanceId"; + + public ReplicationLogCleanJob() + { + } + + public void execute( JobExecutionContext ctx ) throws JobExecutionException + { + String instanceId = ctx.getJobDetail().getJobDataMap().getString( INSTANCE_ID ); + if( instanceId == null ) + { + // Execute for all instances in the VM if instanceId is not specified. + Iterator it = DirectoryService.getAllInstances().iterator(); + while( it.hasNext() ) + { + DirectoryService service = ( DirectoryService ) it.next(); + execute0( service.getConfiguration().getInstanceId() ); + } + } + else + { + // Execute for the instance with the specified instanceId if + // it is specified. + execute0( instanceId ); + } + } + + private void execute0( String instanceId ) throws JobExecutionException + { + DirectoryService service = DirectoryService.getInstance( instanceId ); + Iterator it = service.getConfiguration().getInterceptorChain().getAll().iterator(); + while( it.hasNext() ) + { + Interceptor interceptor = (Interceptor) it.next(); + if( interceptor instanceof ReplicationService ) + { + try + { + ( ( ReplicationService ) interceptor ).purgeAgedData(); + } + catch( NamingException e ) + { + throw new JobExecutionException( e ); + } + } + } + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.naming.Name; +import javax.naming.NameNotFoundException; +import javax.naming.NamingEnumeration; +import javax.naming.NamingException; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.directory.ModificationItem; +import javax.naming.directory.SearchControls; +import javax.naming.directory.SearchResult; + +import org.apache.directory.server.core.DirectoryServiceConfiguration; +import org.apache.directory.server.core.configuration.InterceptorConfiguration; +import org.apache.directory.server.core.enumeration.SearchResultFilteringEnumeration; +import org.apache.directory.server.core.interceptor.BaseInterceptor; +import org.apache.directory.server.core.interceptor.NextInterceptor; +import org.apache.directory.server.core.invocation.InvocationStack; +import org.apache.directory.server.core.partition.PartitionNexus; +import org.apache.directory.shared.ldap.exception.LdapNameNotFoundException; +import org.apache.directory.shared.ldap.filter.ExprNode; +import org.apache.directory.shared.ldap.filter.FilterParser; +import org.apache.directory.shared.ldap.filter.FilterParserImpl; +import org.apache.directory.shared.ldap.name.LdapDN; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.TransportType; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.directory.mitosis.common.CSN; +import org.apache.directory.mitosis.common.Constants; +import org.apache.directory.mitosis.common.ReplicaId; +import org.apache.directory.mitosis.common.SimpleCSN; +import org.apache.directory.mitosis.configuration.ReplicationConfiguration; +import org.apache.directory.mitosis.operation.Operation; +import org.apache.directory.mitosis.operation.OperationFactory; +import org.apache.directory.mitosis.service.protocol.handler.ReplicationServerProtocolHandler; +import org.apache.directory.mitosis.store.ReplicationStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationService extends BaseInterceptor +{ + private static final Logger log = LoggerFactory.getLogger( ReplicationService.class ); + private DirectoryServiceConfiguration directoryServiceConfiguration; + private ReplicationConfiguration configuration; + private PartitionNexus nexus; + private OperationFactory operationFactory; + private ReplicationStore store; + private IoAcceptor registry; + private final ClientConnectionManager clientConnectionManager = new ClientConnectionManager( this ); + + public ReplicationService() + { + } + + public ReplicationConfiguration getConfiguration() + { + return configuration; + } + + public void setConfiguration( ReplicationConfiguration cfg ) + { + cfg.validate(); + this.configuration = cfg; + } + + public DirectoryServiceConfiguration getFactoryConfiguration() + { + return directoryServiceConfiguration; + } + + public void init( DirectoryServiceConfiguration serviceCfg, InterceptorConfiguration cfg ) throws NamingException + { + configuration.validate(); + // and then preserve frequently used ones + directoryServiceConfiguration = serviceCfg; + nexus = serviceCfg.getPartitionNexus(); + store = configuration.getStore(); + operationFactory = new OperationFactory( serviceCfg, configuration ); + + // Initialize store and service + store.open( serviceCfg, configuration ); + boolean serviceStarted = false; + try + { + startNetworking(); + serviceStarted = true; + } + catch( Exception e ) + { + throw new ReplicationServiceException( "Failed to initialize MINA ServiceRegistry.", e ); + } + finally + { + if( !serviceStarted ) + { + // roll back + store.close(); + } + } + + purgeAgedData(); + } + + private void startNetworking() throws Exception + { + registry = new SocketAcceptor(); + IoServiceConfig config = new SocketAcceptorConfig(); + + config.getFilterChain().addLast( "protocol", new ProtocolCodecFilter( new ReplicationServerProtocolHandler( this ) ) ); + + // bind server protocol provider + registry.bind( new InetSocketAddress( 10101 ), + new ServerHandler( true, new RunnableFactory() ), + config ); + + clientConnectionManager.start( configuration ); + } + + public void destroy() + { + stopNetworking(); + store.close(); + } + + private void stopNetworking() + { + // close all open connections, deactivate all filters and service registry + try + { + clientConnectionManager.stop(); + } + catch( Exception e ) + { + log.warn( "Failed to stop the client connection manager.", e ); + } + registry.unbindAll(); + } + + public void purgeAgedData() throws NamingException + { + Attributes rootDSE = nexus.getRootDSE(); + Attribute namingContextsAttr = rootDSE.get( "namingContexts" ); + if( namingContextsAttr == null || namingContextsAttr.size() == 0 ) + { + throw new NamingException( "No namingContexts attributes in rootDSE." ); + } + + CSN purgeCSN = new SimpleCSN( + System.currentTimeMillis() - + configuration.getLogMaxAge() * 1000L * 60L * 60L * 24L, // convert days to millis + new ReplicaId( "ZZZZZZZZZZZZZZZZ" ), + Integer.MAX_VALUE ); + FilterParser parser = new FilterParserImpl(); + ExprNode filter; + + try + { + filter = parser.parse( + "(& (entryCSN=<" + + purgeCSN.toOctetString() + + ") (entryDeleted=true))" ); + } + catch( IOException e ) + { + throw ( NamingException ) new NamingException().initCause( e ); + } + catch( ParseException e ) + { + throw ( NamingException ) new NamingException().initCause( e ); + } + + // Iterate all context partitions to send all entries of them. + NamingEnumeration e = namingContextsAttr.getAll(); + while( e.hasMore() ) + { + Object value = e.next(); + // Convert attribute value to JNDI name. + Name contextName; + if( value instanceof Name ) + { + contextName = ( Name ) value; + } + else + { + contextName = new LdapDN( String.valueOf( value ) ); + } + + log.info( "Purging aged data under '" + contextName + '"'); + purgeAgedData( contextName, filter ); + } + + store.removeLogs( purgeCSN, false ); + } + + private void purgeAgedData( Name contextName, ExprNode filter ) throws NamingException + { + SearchControls ctrl = new SearchControls(); + ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE ); + ctrl.setReturningAttributes( new String[] + { "entryCSN", "entryDeleted" } ); + + NamingEnumeration e = nexus.search( + (LdapDN)contextName, + directoryServiceConfiguration.getEnvironment(), + filter, ctrl ); + + List names = new ArrayList(); + try + { + while( e.hasMore() ) + { + SearchResult sr = ( SearchResult ) e.next(); + Name name = new LdapDN( sr.getName() ); + if( name.size() > contextName.size() ) + { + names.add( new LdapDN( sr.getName() ) ); + } + } + } + finally + { + e.close(); + } + + Iterator it = names.iterator(); + while( it.hasNext() ) + { + Name name = (Name) it.next(); + try + { + Attributes entry = nexus.lookup( (LdapDN)name ); + log.info( "Purge: " + name + " (" + entry + ')' ); + nexus.delete( (LdapDN)name ); + } + catch( NamingException ex ) + { + log.warn( "Failed to fetch/delete: " + name, ex ); + } + } + } + + public void add( NextInterceptor nextInterceptor, String userProvidedName, Name normalizedName, Attributes entry ) throws NamingException + { + Operation op = operationFactory.newAdd( userProvidedName, normalizedName, entry ); + op.execute( nexus, store ); + } + + public void delete( NextInterceptor nextInterceptor, Name name ) throws NamingException + { + Operation op = operationFactory.newDelete( name ); + op.execute( nexus, store ); + } + + public void modify( NextInterceptor next, Name name, int modOp, Attributes attrs ) throws NamingException + { + Operation op = operationFactory.newModify( name, modOp, attrs ); + op.execute( nexus, store ); + } + + public void modify( NextInterceptor next, Name name, ModificationItem[] items ) throws NamingException + { + Operation op = operationFactory.newModify( name, items ); + op.execute( nexus, store ); + } + + public void modifyRn( NextInterceptor next, Name oldName, String newRDN, boolean deleteOldRDN ) throws NamingException + { + Operation op = operationFactory.newModifyRn( oldName, newRDN, deleteOldRDN ); + op.execute( nexus, store ); + } + + public void move( NextInterceptor next, Name oldName, Name newParentName, String newRDN, boolean deleteOldRDN ) throws NamingException + { + Operation op = operationFactory.newMove( oldName, newParentName, newRDN, deleteOldRDN ); + op.execute( nexus, store ); + } + + public void move( NextInterceptor next, Name oldName, Name newParentName ) throws NamingException + { + Operation op = operationFactory.newMove( oldName, newParentName ); + op.execute( nexus, store ); + } + + public boolean hasEntry( NextInterceptor nextInterceptor, Name name ) throws NamingException + { + // Ask others first. + boolean hasEntry = nextInterceptor.hasEntry( (LdapDN)name ); + + // If the entry exists, + if( hasEntry ) + { + // Check DELETED attribute. + try + { + Attributes entry = nextInterceptor.lookup( (LdapDN)name ); + hasEntry = !isDeleted( entry ); + } + catch( NameNotFoundException e ) + { + System.out.println( e.toString( true ) ); + hasEntry = false; + } + } + + return hasEntry; + } + + public Attributes lookup( NextInterceptor nextInterceptor, Name name ) throws NamingException + { + Attributes result = nextInterceptor.lookup( (LdapDN)name ); + ensureNotDeleted( name, result ); + return result; + } + + + public Attributes lookup( NextInterceptor nextInterceptor, Name name, String[] attrIds ) throws NamingException + { + boolean found = false; + // Look for 'entryDeleted' attribute is in attrIds. + for( int i = 0; i < attrIds.length; i ++ ) + { + if( Constants.ENTRY_DELETED.equals( attrIds[i] ) ) + { + found = true; + break; + } + } + + // If not exists, add one. + if( !found ) + { + String[] newAttrIds = new String[ attrIds.length + 1 ]; + System.arraycopy( attrIds, 0, newAttrIds, 0, attrIds.length ); + newAttrIds[ attrIds.length ] = Constants.ENTRY_DELETED; + attrIds = newAttrIds; + } + + Attributes result = nextInterceptor.lookup( (LdapDN)name, attrIds ); + ensureNotDeleted( name, result ); + return result; + } + + + public NamingEnumeration list( NextInterceptor nextInterceptor, Name baseName ) throws NamingException + { + NamingEnumeration e = nextInterceptor.list( (LdapDN)baseName ); + return new SearchResultFilteringEnumeration( e, new SearchControls(), InvocationStack.getInstance().peek(), Constants.DELETED_ENTRIES_FILTER ); + } + + + public NamingEnumeration search( NextInterceptor nextInterceptor, Name baseName, Map environment, ExprNode filter, SearchControls searchControls ) throws NamingException + { + NamingEnumeration e = nextInterceptor.search( (LdapDN)baseName, environment, filter, searchControls ); + if ( searchControls.getReturningAttributes() != null ) + { + return e; + } + + return new SearchResultFilteringEnumeration( e, searchControls, InvocationStack.getInstance().peek(), Constants.DELETED_ENTRIES_FILTER ); + } + + private void ensureNotDeleted( Name name, Attributes entry ) throws NamingException, LdapNameNotFoundException + { + if( isDeleted( entry ) ) + { + LdapNameNotFoundException e = + new LdapNameNotFoundException( "Deleted entry: " + name ); + e.setResolvedName( nexus.getMatchedName( (LdapDN)name ) ); + throw e; + } + } + + private boolean isDeleted( Attributes entry ) throws NamingException + { + if( entry == null ) + { + return true; + } + + Attribute deleted = entry.get( Constants.ENTRY_DELETED ); + return ( deleted != null && "true".equals( deleted.get().toString() ) ); + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationServiceException.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationServiceException.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationServiceException.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationServiceException.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service; + +import org.apache.directory.mitosis.common.ReplicationException; + +public class ReplicationServiceException extends ReplicationException +{ + private static final long serialVersionUID = 3906090070204430386L; + + public ReplicationServiceException() + { + super(); + } + + public ReplicationServiceException( String message, Throwable cause ) + { + super( message, cause ); + } + + public ReplicationServiceException( String message ) + { + super( message ); + } + + public ReplicationServiceException( Throwable cause ) + { + super( cause ); + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.directory.server.core.DirectoryServiceConfiguration; +import org.apache.mina.common.IoSession; +import org.apache.mina.util.SessionLog; +import org.apache.directory.mitosis.common.Replica; +import org.apache.directory.mitosis.configuration.ReplicationConfiguration; +import org.apache.directory.mitosis.service.protocol.message.BaseMessage; + +public class SimpleReplicationContext implements ReplicationContext +{ + private static final Timer expirationTimer = new Timer( "ReplicationMessageExpirer" ); + + private final ReplicationService service; + private final ReplicationConfiguration configuration; + private final DirectoryServiceConfiguration serviceConfiguration; + private final IoSession session; + private final Map expirableMessages = new HashMap(); + private int nextSequence; + private Replica peer; + private State state = State.INIT; + + public SimpleReplicationContext( ReplicationService service, DirectoryServiceConfiguration serviceCfg, ReplicationConfiguration configuration, IoSession session ) + { + this.service = service; + this.configuration = configuration; + this.serviceConfiguration = serviceCfg; + this.session = session; + } + + public ReplicationService getService() + { + return service; + } + + public ReplicationConfiguration getConfiguration() + { + return configuration; + } + + public DirectoryServiceConfiguration getServiceConfiguration() + { + return serviceConfiguration; + } + + public IoSession getSession() + { + return session; + } + + public int getNextSequence() + { + return nextSequence ++; + } + + public Replica getPeer() + { + return peer; + } + + public void setPeer( Replica peer ) + { + assert peer != null; + this.peer = peer; + } + + public State getState() + { + return state; + } + + public void setState( State state ) + { + this.state = state; + } + + public void scheduleExpiration( Object message ) + { + BaseMessage bm = ( BaseMessage ) message; + ExpirationTask task = new ExpirationTask( bm ); + synchronized( expirableMessages ) + { + expirableMessages.put( + new Integer( bm.getSequence() ), + task ); + } + + expirationTimer.schedule( task, configuration.getResponseTimeout() * 1000L ); + } + + public Object cancelExpiration( int sequence ) + { + ExpirationTask task = removeTask( sequence ); + if( task == null ) + { + return null; + } + + task.cancel(); + return task.message; + } + + public void cancelAllExpirations() + { + synchronized( expirableMessages ) + { + Iterator i = expirableMessages.values().iterator(); + while( i.hasNext() ) + { + ( ( ExpirationTask ) i.next() ).cancel(); + } + } + } + + public int getScheduledExpirations() + { + synchronized( expirableMessages ) + { + return expirableMessages.size(); + } + } + + private ExpirationTask removeTask( int sequence ) + { + ExpirationTask task; + synchronized( expirableMessages ) + { + task = ( ExpirationTask ) expirableMessages.remove( new Integer( sequence ) ); + } + return task; + } + + private class ExpirationTask extends TimerTask + { + private final BaseMessage message; + + private ExpirationTask( Object message ) + { + this.message = ( BaseMessage ) message; + } + + public void run() + { + if( removeTask( message.getSequence() ) == this ) + { + SessionLog.warn( + getSession(), + "No response within " + configuration.getResponseTimeout() + " second(s) for message #" + message.getSequence() ); + getSession().close(); + } + } + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/Constants.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/Constants.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/Constants.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/Constants.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service.protocol; + +public class Constants +{ + public static final int LOGIN = 0x00; + public static final int LOGIN_ACK = 0x01; + public static final int GET_UPDATE_VECTOR = 0x02; + public static final int GET_UPDATE_VECTOR_ACK = 0x03; + public static final int LOG_ENTRY = 0x04; + public static final int LOG_ENTRY_ACK = 0x05; + public static final int BEGIN_LOG_ENTRIES = 0x06; + public static final int BEGIN_LOG_ENTRIES_ACK = 0x07; + public static final int END_LOG_ENTRIES = 0x08; + public static final int END_LOG_ENTRIES_ACK = 0x09; + + public static final int OK = 0; + public static final int NOT_OK = -1; + + private Constants() + { + } +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BaseMessageDecoder.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BaseMessageDecoder.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BaseMessageDecoder.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BaseMessageDecoder.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service.protocol.codec; + +import org.apache.directory.mitosis.service.protocol.message.BaseMessage; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolDecoderException; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.mina.filter.codec.demux.MessageDecoder; +import org.apache.mina.filter.codec.demux.MessageDecoderResult; + +public abstract class BaseMessageDecoder implements MessageDecoder +{ + private final int type; + private final int minBodyLength; + private final int maxBodyLength; + private boolean readHeader; + private int sequence; + private int bodyLength; + + protected BaseMessageDecoder( int type, int minBodyLength, int maxBodyLength ) + { + this.type = type; + this.minBodyLength = minBodyLength; + this.maxBodyLength = maxBodyLength; + } + + public final MessageDecoderResult decodable( IoSession session, ByteBuffer buf ) + { + return type == buf.get()? OK : NOT_OK; + } + + public final MessageDecoderResult decode( IoSession session, ByteBuffer in, + ProtocolDecoderOutput out ) throws Exception + { + if( !readHeader ) + { + if( in.remaining() < 9 ) + { + return NEED_DATA; + } + + in.get(); // skip type field + sequence = in.getInt(); + bodyLength = in.getInt(); + + if( bodyLength < minBodyLength || bodyLength > maxBodyLength ) + { + throw new ProtocolDecoderException( "Wrong bodyLength: " + bodyLength ); + } + + readHeader = true; + } + + if( readHeader ) + { + if( in.remaining() < bodyLength ) + { + return NEED_DATA; + } + + int oldLimit = in.limit(); + + try + { + in.limit( in.position() + bodyLength ); + out.write( decodeBody( sequence, bodyLength, in ) ); + return OK; + } + finally + { + readHeader = false; + in.limit( oldLimit ); + } + } + + throw new InternalError(); + } + + protected abstract BaseMessage decodeBody( int sequence, int bodyLength, ByteBuffer in ) throws Exception; +} Added: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BaseMessageEncoder.java URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BaseMessageEncoder.java?view=auto&rev=463366 ============================================================================== --- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BaseMessageEncoder.java (added) +++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BaseMessageEncoder.java Thu Oct 12 11:33:14 2006 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.mitosis.service.protocol.codec; + +import org.apache.directory.mitosis.service.protocol.message.BaseMessage; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.demux.MessageEncoder; + +public abstract class BaseMessageEncoder implements MessageEncoder +{ + public BaseMessageEncoder() + { + } + + public final void encode( IoSession session, Object in, ProtocolEncoderOutput out) throws Exception + { + BaseMessage m = ( BaseMessage ) in; + ByteBuffer buf = ByteBuffer.allocate( 16 ); + buf.setAutoExpand( true ); + buf.put( ( byte ) m.getType() ); + buf.putInt( m.getSequence() ); + buf.putInt( 0 ); // placeholder for body length field + + final int bodyStartPos = buf.position(); + encodeBody( m, buf ); + final int bodyEndPos = buf.position(); + final int bodyLength = bodyEndPos - bodyStartPos; + + // fill bodyLength + buf.position( bodyStartPos - 4 ); + buf.putInt( bodyLength ); + buf.position( bodyEndPos ); + + buf.flip(); + out.write( buf ); + } + + protected abstract void encodeBody( BaseMessage in, ByteBuffer out ) throws Exception; +}