Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4021617A75 for ; Tue, 11 Nov 2014 18:41:32 +0000 (UTC) Received: (qmail 30302 invoked by uid 500); 11 Nov 2014 18:41:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 30129 invoked by uid 500); 11 Nov 2014 18:41:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 29780 invoked by uid 99); 11 Nov 2014 18:41:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 18:41:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7A0B4A0D731; Tue, 11 Nov 2014 18:41:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Tue, 11 Nov 2014 18:41:35 -0000 Message-Id: <07ac45e9899244cfb3782266141e3978@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecordTX.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecordTX.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecordTX.java new file mode 100644 index 0000000..7c6403f --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecordTX.java @@ -0,0 +1,96 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl.dataformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.core.journal.impl.JournalImpl; + +/** + * A JournalAddRecordTX + * + * @author Clebert Suconic + * + * + */ +public class JournalAddRecordTX extends JournalInternalRecord +{ + + private final long txID; + + private final long id; + + private final EncodingSupport record; + + private final byte recordType; + + private final boolean add; + + /** + * @param id + * @param recordType + * @param record + */ + public JournalAddRecordTX(final boolean add, + final long txID, + final long id, + final byte recordType, + final EncodingSupport record) + { + + this.txID = txID; + + this.id = id; + + this.record = record; + + this.recordType = recordType; + + this.add = add; + } + + @Override + public void encode(final HornetQBuffer buffer) + { + if (add) + { + buffer.writeByte(JournalImpl.ADD_RECORD_TX); + } + else + { + buffer.writeByte(JournalImpl.UPDATE_RECORD_TX); + } + + buffer.writeInt(fileID); + + buffer.writeByte(compactCount); + + buffer.writeLong(txID); + + buffer.writeLong(id); + + buffer.writeInt(record.getEncodeSize()); + + buffer.writeByte(recordType); + + record.encode(buffer); + + buffer.writeInt(getEncodeSize()); + } + + @Override + public int getEncodeSize() + { + return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalCompleteRecordTX.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalCompleteRecordTX.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalCompleteRecordTX.java new file mode 100644 index 0000000..e7ebb3b --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalCompleteRecordTX.java @@ -0,0 +1,114 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl.dataformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.core.journal.impl.JournalImpl; + +/** + *

+ * A transaction record (Commit or Prepare), will hold the number of elements the transaction has in + * the current file. + *

+ * While loading the {@link org.apache.activemq6.core.journal.impl.JournalFile}, the number of operations found is matched against this + * number. If for any reason there are missing operations, the transaction will be ignored. + *

+ * We can't just use a global counter as reclaiming could delete files after the transaction was + * successfully committed. That also means not having a whole file on journal-reload doesn't mean we + * have to invalidate the transaction + *

+ * The commit operation itself is not included in this total. + * @author Clebert Suconic + */ +public class JournalCompleteRecordTX extends JournalInternalRecord +{ + public enum TX_RECORD_TYPE + { + COMMIT, PREPARE; + } + private final TX_RECORD_TYPE txRecordType; + + private final long txID; + + private final EncodingSupport transactionData; + + private int numberOfRecords; + + public JournalCompleteRecordTX(final TX_RECORD_TYPE isCommit, final long txID, final EncodingSupport transactionData) + { + this.txRecordType = isCommit; + + this.txID = txID; + + this.transactionData = transactionData; + } + + @Override + public void encode(final HornetQBuffer buffer) + { + if (txRecordType == TX_RECORD_TYPE.COMMIT) + { + buffer.writeByte(JournalImpl.COMMIT_RECORD); + } + else + { + buffer.writeByte(JournalImpl.PREPARE_RECORD); + } + + buffer.writeInt(fileID); + + buffer.writeByte(compactCount); + + buffer.writeLong(txID); + + buffer.writeInt(numberOfRecords); + + if (transactionData != null) + { + buffer.writeInt(transactionData.getEncodeSize()); + } + + if (transactionData != null) + { + transactionData.encode(buffer); + } + + buffer.writeInt(getEncodeSize()); + } + + @Override + public void setNumberOfRecords(final int records) + { + numberOfRecords = records; + } + + @Override + public int getNumberOfRecords() + { + return numberOfRecords; + } + + @Override + public int getEncodeSize() + { + if (txRecordType == TX_RECORD_TYPE.COMMIT) + { + return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + 1; + } + else + { + return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0) + 1; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecord.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecord.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecord.java new file mode 100644 index 0000000..188418e --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecord.java @@ -0,0 +1,56 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl.dataformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.impl.JournalImpl; + +/** + * A JournalDeleteRecord + * + * @author Clebert Suconic + * + * + */ +public class JournalDeleteRecord extends JournalInternalRecord +{ + + private final long id; + + /** + * @param id + */ + public JournalDeleteRecord(final long id) + { + this.id = id; + } + + public void encode(final HornetQBuffer buffer) + { + buffer.writeByte(JournalImpl.DELETE_RECORD); + + buffer.writeInt(fileID); + + buffer.writeByte(compactCount); + + buffer.writeLong(id); + + buffer.writeInt(getEncodeSize()); + } + + @Override + public int getEncodeSize() + { + return JournalImpl.SIZE_DELETE_RECORD + 1; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecordTX.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecordTX.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecordTX.java new file mode 100644 index 0000000..bdbd7ef --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecordTX.java @@ -0,0 +1,77 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl.dataformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.core.journal.impl.JournalImpl; + +/** + * A JournalDeleteRecordTX + * + * @author Clebert Suconic + * + * + */ +public class JournalDeleteRecordTX extends JournalInternalRecord +{ + + private final long txID; + + private final long id; + + private final EncodingSupport record; + + /** + * @param txID + * @param id + * @param record + */ + public JournalDeleteRecordTX(final long txID, final long id, final EncodingSupport record) + { + this.id = id; + + this.txID = txID; + + this.record = record; + } + + @Override + public void encode(final HornetQBuffer buffer) + { + buffer.writeByte(JournalImpl.DELETE_RECORD_TX); + + buffer.writeInt(fileID); + + buffer.writeByte(compactCount); + + buffer.writeLong(txID); + + buffer.writeLong(id); + + buffer.writeInt(record != null ? record.getEncodeSize() : 0); + + if (record != null) + { + record.encode(buffer); + } + + buffer.writeInt(getEncodeSize()); + } + + @Override + public int getEncodeSize() + { + return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0) + 1; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalInternalRecord.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalInternalRecord.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalInternalRecord.java new file mode 100644 index 0000000..5b5f707 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalInternalRecord.java @@ -0,0 +1,73 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl.dataformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.EncodingSupport; + +/** + * A InternalEncoder + * + * @author Clebert Suconic + * + * + */ +public abstract class JournalInternalRecord implements EncodingSupport +{ + + protected int fileID; + + protected byte compactCount; + + public int getFileID() + { + return fileID; + } + + public void setFileID(final int fileID) + { + this.fileID = fileID; + } + + public void decode(final HornetQBuffer buffer) + { + } + + public void setNumberOfRecords(final int records) + { + } + + public int getNumberOfRecords() + { + return 0; + } + + public short getCompactCount() + { + return compactCount; + } + + public void setCompactCount(final short compactCount) + { + if (compactCount > Byte.MAX_VALUE) + { + this.compactCount = Byte.MAX_VALUE; + } + else + { + this.compactCount = (byte)compactCount; + } + } + + public abstract int getEncodeSize(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalRollbackRecordTX.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalRollbackRecordTX.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalRollbackRecordTX.java new file mode 100644 index 0000000..5f07256 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalRollbackRecordTX.java @@ -0,0 +1,50 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl.dataformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.impl.JournalImpl; + +/** + * A JournalRollbackRecordTX + * + * @author Clebert Suconic + * + * + */ +public class JournalRollbackRecordTX extends JournalInternalRecord +{ + private final long txID; + + public JournalRollbackRecordTX(final long txID) + { + this.txID = txID; + } + + @Override + public void encode(final HornetQBuffer buffer) + { + buffer.writeByte(JournalImpl.ROLLBACK_RECORD); + buffer.writeInt(fileID); + buffer.writeByte(compactCount); + buffer.writeLong(txID); + buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD + 1); + + } + + @Override + public int getEncodeSize() + { + return JournalImpl.SIZE_ROLLBACK_RECORD + 1; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalBundle.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalBundle.java b/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalBundle.java new file mode 100644 index 0000000..6601263 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalBundle.java @@ -0,0 +1,48 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.journal; + + +import org.apache.activemq6.api.core.HornetQIOErrorException; +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageBundle; +import org.jboss.logging.Messages; + +/** + * @author Andy Taylor + * 3/12/12 + * + * Logger Code 14 + * + * each message id must be 6 digits long starting with 14, the 3rd digit should be 9 + * + * so 149000 to 149999 + */ +@MessageBundle(projectCode = "HQ") +public interface HornetQJournalBundle +{ + HornetQJournalBundle BUNDLE = Messages.getBundle(HornetQJournalBundle.class); + + @Message(id = 149000, value = "failed to rename file {0} to {1}", format = Message.Format.MESSAGE_FORMAT) + HornetQIOErrorException ioRenameFileError(String name, String newFileName); + + @Message(id = 149001, value = "Journal data belong to a different version", format = Message.Format.MESSAGE_FORMAT) + HornetQIOErrorException journalDifferentVersion(); + + @Message(id = 149002, value = "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user''s manual", + format = Message.Format.MESSAGE_FORMAT) + HornetQIOErrorException journalFileMisMatch(); + + @Message(id = 149003, value = "File not opened", format = Message.Format.MESSAGE_FORMAT) + HornetQIOErrorException fileNotOpened(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalLogger.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalLogger.java b/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalLogger.java new file mode 100644 index 0000000..db7a9f3 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalLogger.java @@ -0,0 +1,271 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.journal; + +import org.apache.activemq6.core.journal.impl.JournalFile; +import org.jboss.logging.BasicLogger; +import org.jboss.logging.Logger; +import org.jboss.logging.annotations.Cause; +import org.jboss.logging.annotations.LogMessage; +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageLogger; + +/** + * @author Andy Taylor + * 3/15/12 + * + * Logger Code 14 + * + * each message id must be 6 digits long starting with 14, the 3rd digit donates the level so + * + * INF0 1 + * WARN 2 + * DEBUG 3 + * ERROR 4 + * TRACE 5 + * FATAL 6 + * + * so an INFO message would be 141000 to 141999 + */ +@MessageLogger(projectCode = "HQ") +public interface HornetQJournalLogger extends BasicLogger +{ + /** + * The journal logger. + */ + HornetQJournalLogger LOGGER = Logger.getMessageLogger(HornetQJournalLogger.class, HornetQJournalLogger.class.getPackage().getName()); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141000, value = "*** running direct journal blast: {0}", format = Message.Format.MESSAGE_FORMAT) + void runningJournalBlast(Integer numIts); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141002, value = "starting thread for sync speed test", format = Message.Format.MESSAGE_FORMAT) + void startingThread(); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141003, value = "Write rate = {0} bytes / sec or {1} MiB / sec", format = Message.Format.MESSAGE_FORMAT) + void writeRate(Double rate, Long l); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141004, value = "Flush rate = {0} flushes / sec", format = Message.Format.MESSAGE_FORMAT) + void flushRate(Double rate); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141005, value = "Check Data Files:", format = Message.Format.MESSAGE_FORMAT) + void checkFiles(); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141006, value = "Sequence out of order on journal", format = Message.Format.MESSAGE_FORMAT) + void seqOutOfOrder(); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141007, value = "Current File on the journal is <= the sequence file.getFileID={0} on the dataFiles" + + "\nCurrentfile.getFileId={1} while the file.getFileID()={2}" + + "\nIs same = ({3})", + format = Message.Format.MESSAGE_FORMAT) + void currentFile(Long fileID, Long id, Long fileFileID, Boolean b); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141008, value = "Free File ID out of order", format = Message.Format.MESSAGE_FORMAT) + void fileIdOutOfOrder(); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 141009, value = "A Free File is less than the maximum data", format = Message.Format.MESSAGE_FORMAT) + void fileTooSmall(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142000, value = "You have a native library with a different version than expected", format = Message.Format.MESSAGE_FORMAT) + void incompatibleNativeLibrary(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142001, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}", + format = Message.Format.MESSAGE_FORMAT) + void couldNotGetLock(String fileName); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142002, value = "Asynchronous File: {0} being finalized with opened state", format = Message.Format.MESSAGE_FORMAT) + void fileFinalizedWhileOpen(String fileName); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142003, value = "AIO Callback Error: {0}", format = Message.Format.MESSAGE_FORMAT) + void callbackError(String error); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142004, value = "Inconsistency during compacting: CommitRecord ID = {0} for an already committed transaction during compacting", + format = Message.Format.MESSAGE_FORMAT) + void inconsistencyDuringCompacting(Long transactionID); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142005, value = "Inconsistency during compacting: Delete record being read on an existent record (id={0})", + format = Message.Format.MESSAGE_FORMAT) + void inconsistencyDuringCompactingDelete(Long recordID); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142006, value = "Could not find add Record information for record {0} during compacting", + format = Message.Format.MESSAGE_FORMAT) + void compactingWithNoAddRecord(Long id); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142007, value = "Can not find record {0} during compact replay", + format = Message.Format.MESSAGE_FORMAT) + void noRecordDuringCompactReplay(Long id); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142008, value = "Could not remove file {0} from the list of data files", + format = Message.Format.MESSAGE_FORMAT) + void couldNotRemoveFile(JournalFile file); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142009, value = "Deleting {0} as it does not have the configured size", + format = Message.Format.MESSAGE_FORMAT) + void deletingFile(JournalFile file); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142010, value = "Failed to add file to opened files queue: {0}. This should NOT happen!", + format = Message.Format.MESSAGE_FORMAT) + void failedToAddFile(JournalFile nextOpenedFile); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142011, value = "Error on reading compacting for {0}", + format = Message.Format.MESSAGE_FORMAT) + void compactReadError(JournalFile file); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting", + format = Message.Format.MESSAGE_FORMAT) + void compactMergeError(Long id); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142013, value = "Prepared transaction {0} was not considered completed, it will be ignored", + format = Message.Format.MESSAGE_FORMAT) + void preparedTXIncomplete(Long id); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142014, value = "Transaction {0} is missing elements so the transaction is being ignored", + format = Message.Format.MESSAGE_FORMAT) + void txMissingElements(Long id); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142015, value = "Uncommitted transaction with id {0} found and discarded", + format = Message.Format.MESSAGE_FORMAT) + void uncomittedTxFound(Long id); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds", + format = Message.Format.MESSAGE_FORMAT) + void couldNotStopCompactor(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds", + format = Message.Format.MESSAGE_FORMAT) + void couldNotStopJournalExecutor(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142018, value = "Temporary files were left unnatended after a crash on journal directory, deleting invalid files now", + format = Message.Format.MESSAGE_FORMAT) + void tempFilesLeftOpen(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142019, value = "Deleting orphaned file {0}", format = Message.Format.MESSAGE_FORMAT) + void deletingOrphanedFile(String fileToDelete); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142020, value = "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT) + void errorClosingFile(String fileToDelete); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142021, value = "Error on IO callback, {0}", format = Message.Format.MESSAGE_FORMAT) + void errorOnIOCallback(String errorMessage); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142022, value = "Timed out on AIO poller shutdown", format = Message.Format.MESSAGE_FORMAT) + void timeoutOnPollerShutdown(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142023, value = "Executor on file {0} couldn''t complete its tasks in 60 seconds.", format = Message.Format.MESSAGE_FORMAT) + void couldNotCompleteTask(@Cause Exception e, String name); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142024, value = "Error completing callback", format = Message.Format.MESSAGE_FORMAT) + void errorCompletingCallback(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142025, value = "Error calling onError callback", format = Message.Format.MESSAGE_FORMAT) + void errorCallingErrorCallback(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142026, value = "Timed out on AIO writer shutdown", format = Message.Format.MESSAGE_FORMAT) + void timeoutOnWriterShutdown(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142027, value = "Error on writing data! {0} code - {1}", format = Message.Format.MESSAGE_FORMAT) + void errorWritingData(@Cause Throwable e, String errorMessage, Integer errorCode); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142028, value = "Error replaying pending commands after compacting", format = Message.Format.MESSAGE_FORMAT) + void errorReplayingCommands(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142029, value = "Error closing file", format = Message.Format.MESSAGE_FORMAT) + void errorClosingFile(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142030, value = "Could not open a file in 60 Seconds", format = Message.Format.MESSAGE_FORMAT) + void errorOpeningFile(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142031, value = "Error retrieving ID part of the file name {0}", format = Message.Format.MESSAGE_FORMAT) + void errorRetrievingID(@Cause Throwable e, String fileName); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142032, value = "Error reading journal file", format = Message.Format.MESSAGE_FORMAT) + void errorReadingFile(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142033, value = "Error reinitializing file {0}", format = Message.Format.MESSAGE_FORMAT) + void errorReinitializingFile(@Cause Throwable e, JournalFile file); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT) + void errorSubmittingWrite(@Cause Throwable e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT) + void errorDeletingFile(Object e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 144001, value = "Error starting poller", format = Message.Format.MESSAGE_FORMAT) + void errorStartingPoller(@Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 144002, value = "Error pushing opened file", format = Message.Format.MESSAGE_FORMAT) + void errorPushingFile(@Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 144003, value = "Error compacting", format = Message.Format.MESSAGE_FORMAT) + void errorCompacting(@Cause Throwable e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 144004, value = "Error scheduling compacting", format = Message.Format.MESSAGE_FORMAT) + void errorSchedulingCompacting(@Cause Throwable e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 144005, value = "Failed to performance blast", format = Message.Format.MESSAGE_FORMAT) + void failedToPerfBlast(@Cause Throwable e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 144006, value = "IOError code {0}, {1}", format = Message.Format.MESSAGE_FORMAT) + void ioError(final int errorCode, final String errorMessage); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/test/java/org/apache/activemq6/tests/asyncio/LibaioDependencyCheckTest.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/test/java/org/apache/activemq6/tests/asyncio/LibaioDependencyCheckTest.java b/activemq6-journal/src/test/java/org/apache/activemq6/tests/asyncio/LibaioDependencyCheckTest.java new file mode 100644 index 0000000..742635a --- /dev/null +++ b/activemq6-journal/src/test/java/org/apache/activemq6/tests/asyncio/LibaioDependencyCheckTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.tests.asyncio; + +import org.junit.Test; + +import org.junit.Assert; +import org.apache.activemq6.core.asyncio.impl.AsynchronousFileImpl; + +/** + * A LibaioDependencyCheckTest + * + * @author Clebert Suconic + * + * + */ +public class LibaioDependencyCheckTest extends Assert +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + @Test + public void testDependency() throws Exception + { + if (System.getProperties().get("os.name").equals("Linux")) + { + assertTrue("Libaio is not available on this platform", AsynchronousFileImpl.isLoaded()); + } + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/README ---------------------------------------------------------------------- diff --git a/activemq6-native/README b/activemq6-native/README new file mode 100644 index 0000000..63404ef --- /dev/null +++ b/activemq6-native/README @@ -0,0 +1,45 @@ + +This is a simple tutorial on compiling libHornetQLibAIO.so + +DEPENDENCIES + +Make sure you install these packages: + +- G++ (yum install gcc-c++ or aptitude install g++) +- Gcc (yum insall gcc or aptitude install gcc) +- JDK (full JDK) + + +LIBAIO INFORMATION + +libaio is part of the kernel project. The library makes system calls on the kernel layer. + +This is the project information: + +Git Repository: git://git.kernel.org/pub/scm/libs/libaio/libaio.git +Mailing List: linux-aio@kvack.org + + +STEPS TO BUILD + +1. Make sure you have JAVA_HOME defined, and pointing to the root of your JDK: + +Example: + + $> export JAVA_HOME=/usr/share/jdk1.7 + + +2. Call compile-native.sh. Bootstrap will call all the initial scripts you need + $> ./compile-native.sh + +if you are missing any dependencies, autoconf would tell you what you're missing. + + +COMPILED FILE + +The produced file will be under ./src/.libs/libHornetQLibAIO.so + + +DOCUMENTATION + +The User Manual, chapter 38 (Libaio Native Libraries) will provide more details about our native libraries on libaio. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/bin/libHornetQAIO32.so ---------------------------------------------------------------------- diff --git a/activemq6-native/bin/libHornetQAIO32.so b/activemq6-native/bin/libHornetQAIO32.so new file mode 100755 index 0000000..cd431fc Binary files /dev/null and b/activemq6-native/bin/libHornetQAIO32.so differ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/bin/libHornetQAIO64.so ---------------------------------------------------------------------- diff --git a/activemq6-native/bin/libHornetQAIO64.so b/activemq6-native/bin/libHornetQAIO64.so new file mode 100755 index 0000000..e7f68bc Binary files /dev/null and b/activemq6-native/bin/libHornetQAIO64.so differ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/compile-native.sh ---------------------------------------------------------------------- diff --git a/activemq6-native/compile-native.sh b/activemq6-native/compile-native.sh new file mode 100755 index 0000000..d37a352 --- /dev/null +++ b/activemq6-native/compile-native.sh @@ -0,0 +1 @@ +mvn install -Pnative-build http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/pom.xml ---------------------------------------------------------------------- diff --git a/activemq6-native/pom.xml b/activemq6-native/pom.xml new file mode 100644 index 0000000..d6929e4 --- /dev/null +++ b/activemq6-native/pom.xml @@ -0,0 +1,126 @@ + + ActiveMQ6 Native POM + 4.0.0 + + + org.apache.activemq6 + activemq6-pom + 6.0.0-SNAPSHOT + + + activemq6-native + ${native-package-type} + + + org.apache.activemq6 + activemq6-commons + ${project.version} + + + + + + + ${basedir}/target/output/ + + + + + maven-resources-plugin + + + copy-resources-32 + validate + + copy-resources + + + ${basedir}/target/output/lib/linux-i686/ + + + bin/ + + libHornetQAIO32.so + + + + + + + copy-resources-64 + validate + + copy-resources + + + ${basedir}/target/output/lib/linux-x86_64/ + + + bin/ + + libHornetQAIO64.so + + + + + + + + + + + + + + native-build + + nar + + + + + + com.github.maven-nar + nar-maven-plugin + 3.0.0 + true + + + true + + + + + + + + + aio + + + + + + jni + org.hornetq.core.libaio + + + + + + + + + + + jar + ${project.basedir}/.. + + + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AIOController.cpp ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/AIOController.cpp b/activemq6-native/src/main/c/AIOController.cpp new file mode 100644 index 0000000..e43f15a --- /dev/null +++ b/activemq6-native/src/main/c/AIOController.cpp @@ -0,0 +1,59 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + + +#include +#include "AIOController.h" +#include "JavaUtilities.h" +#include "JAIODatatypes.h" + +AIOController::AIOController(std::string fileName, int maxIO) : logger(0), fileOutput(fileName, this, maxIO) +{ +} + +void AIOController::log(THREAD_CONTEXT threadContext, short level, const char * message) +{ + jmethodID methodID = 0; + + switch (level) + { + case 0: methodID = loggerError; break; + case 1: methodID = loggerWarn; break; + case 2: methodID = loggerInfo; break; + case 3: methodID = loggerDebug; break; + default: methodID = loggerDebug; break; + } + +#ifdef DEBUG + fprintf (stderr,"Callig log methodID=%ld, message=%s, logger=%ld, threadContext = %ld\n", (long) methodID, message, (long) logger, (long) threadContext); fflush(stderr); +#endif + threadContext->CallVoidMethod(logger,methodID,threadContext->NewStringUTF(message)); +} + + +void AIOController::destroy(THREAD_CONTEXT context) +{ + if (logger != 0) + { + context->DeleteGlobalRef(logger); + } +} + +/* + * level = 0-error, 1-warn, 2-info, 3-debug + */ + + +AIOController::~AIOController() +{ +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AIOController.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/AIOController.h b/activemq6-native/src/main/c/AIOController.h new file mode 100644 index 0000000..135a3f6 --- /dev/null +++ b/activemq6-native/src/main/c/AIOController.h @@ -0,0 +1,47 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + + +#ifndef AIOCONTROLLER_H_ +#define AIOCONTROLLER_H_ +#include +#include +#include "JAIODatatypes.h" +#include "AsyncFile.h" + +class AIOController +{ +public: + jmethodID done; + jmethodID error; + + jobject logger; + + jmethodID loggerError; + jmethodID loggerWarn; + jmethodID loggerDebug; + jmethodID loggerInfo; + + /* + * level = 0-error, 1-warn, 2-info, 3-debug + */ + void log(THREAD_CONTEXT threadContext, short level, const char * message); + + AsyncFile fileOutput; + + void destroy(THREAD_CONTEXT context); + + AIOController(std::string fileName, int maxIO); + virtual ~AIOController(); +}; +#endif /*AIOCONTROLLER_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AIOException.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/AIOException.h b/activemq6-native/src/main/c/AIOException.h new file mode 100644 index 0000000..70f7c71 --- /dev/null +++ b/activemq6-native/src/main/c/AIOException.h @@ -0,0 +1,71 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + + + +#ifndef AIOEXCEPTION_H_ +#define AIOEXCEPTION_H_ + +#include +#include + + +#define NATIVE_ERROR_INTERNAL 200 +#define NATIVE_ERROR_INVALID_BUFFER 201 +#define NATIVE_ERROR_NOT_ALIGNED 202 +#define NATIVE_ERROR_CANT_INITIALIZE_AIO 203 +#define NATIVE_ERROR_CANT_RELEASE_AIO 204 +#define NATIVE_ERROR_CANT_OPEN_CLOSE_FILE 205 +#define NATIVE_ERROR_CANT_ALLOCATE_QUEUE 206 +#define NATIVE_ERROR_PREALLOCATE_FILE 208 +#define NATIVE_ERROR_ALLOCATE_MEMORY 209 +#define NATIVE_ERROR_IO 006 +#define NATIVE_ERROR_AIO_FULL 211 + + +class AIOException : public std::exception +{ +private: + int errorCode; + std::string message; +public: + AIOException(int _errorCode, std::string _message) throw() : errorCode(_errorCode), message(_message) + { + errorCode = _errorCode; + message = _message; + } + + AIOException(int _errorCode, const char * _message) throw () + { + message = std::string(_message); + errorCode = _errorCode; + } + + virtual ~AIOException() throw() + { + + } + + int inline getErrorCode() + { + return errorCode; + } + + const char* what() const throw() + { + return message.data(); + } + +}; + +#endif /*AIOEXCEPTION_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AsyncFile.cpp ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/AsyncFile.cpp b/activemq6-native/src/main/c/AsyncFile.cpp new file mode 100644 index 0000000..20316a6 --- /dev/null +++ b/activemq6-native/src/main/c/AsyncFile.cpp @@ -0,0 +1,344 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "AsyncFile.h" +#include "AIOController.h" +#include "AIOException.h" +#include "pthread.h" +#include "LockClass.h" +#include "CallbackAdapter.h" +#include "LockClass.h" + +//#define DEBUG + +#define WAIT_FOR_SPOT 10000 +#define TRIES_BEFORE_WARN 0 +#define TRIES_BEFORE_ERROR 500 + + +std::string io_error(int rc) +{ + std::stringstream buffer; + + if (rc == -ENOSYS) + buffer << "AIO not in this kernel"; + else + buffer << "Error:= " << strerror((int)-rc); + + return buffer.str(); +} + + +AsyncFile::AsyncFile(std::string & _fileName, AIOController * _controller, int _maxIO) : aioContext(0), events(0), fileHandle(0), controller(_controller), pollerRunning(0) +{ + ::pthread_mutex_init(&fileMutex,0); + ::pthread_mutex_init(&pollerMutex,0); + + maxIO = _maxIO; + fileName = _fileName; + if (io_queue_init(maxIO, &aioContext)) + { + throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio, out of AIO Handlers"); + } + + fileHandle = ::open(fileName.data(), O_RDWR | O_CREAT | O_DIRECT, 0666); + if (fileHandle < 0) + { + io_queue_release(aioContext); + throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file"); + } + +#ifdef DEBUG + fprintf (stderr,"File Handle %d", fileHandle); +#endif + + events = (struct io_event *)malloc (maxIO * sizeof (struct io_event)); + + if (events == 0) + { + throw AIOException (NATIVE_ERROR_CANT_ALLOCATE_QUEUE, "Can't allocate ioEvents"); + } + +} + +AsyncFile::~AsyncFile() +{ + if (io_queue_release(aioContext)) + { + throw AIOException(NATIVE_ERROR_CANT_RELEASE_AIO,"Can't release aio"); + } + if (::close(fileHandle)) + { + throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE,"Can't close file"); + } + free(events); + ::pthread_mutex_destroy(&fileMutex); + ::pthread_mutex_destroy(&pollerMutex); +} + +int isException (THREAD_CONTEXT threadContext) +{ + return JNI_ENV(threadContext)->ExceptionOccurred() != 0; +} + +void AsyncFile::pollEvents(THREAD_CONTEXT threadContext) +{ + + LockClass lock(&pollerMutex); + pollerRunning=1; + + + while (pollerRunning) + { + if (isException(threadContext)) + { + return; + } + int result = io_getevents(this->aioContext, 1, maxIO, events, 0); + + +#ifdef DEBUG + fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr); +#endif + + if (result > 0) + { + +#ifdef DEBUG + fprintf (stdout, "Received %d events\n", result); + fflush(stdout); +#endif + } + + for (int i=0; idata == (void *) -1) + { + pollerRunning = 0; +#ifdef DEBUG + controller->log(threadContext, 2, "Received poller request to stop"); +#endif + } + else + { + CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data; + + long result = events[i].res; + if (result < 0) + { + std::string strerror = io_error((int)result); + adapter->onError(threadContext, result, strerror); + } + else + { + adapter->done(threadContext); + } + } + + delete iocbp; + } + } +#ifdef DEBUG + controller->log(threadContext, 2, "Poller finished execution"); +#endif +} + + +void AsyncFile::preAllocate(THREAD_CONTEXT , off_t position, int blocks, size_t size, int fillChar) +{ + + if (size % ALIGNMENT != 0) + { + throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "You can only pre allocate files in multiples of 512"); + } + + void * preAllocBuffer = 0; + if (posix_memalign(&preAllocBuffer, 512, size)) + { + throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign"); + } + + memset(preAllocBuffer, fillChar, size); + + + if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file"); + + for (int i=0; idata = (void *) adapter; + + int tries = 0; + int result = 0; + + while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN)) + { +#ifdef DEBUG + fprintf (stderr, "Retrying block as iocb was full (retry=%d)\n", tries); +#endif + tries ++; + if (tries > TRIES_BEFORE_WARN) + { +#ifdef DEBUG + fprintf (stderr, "Warning level on retries, informing logger (retry=%d)\n", tries); +#endif + controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times"); + } + + if (tries > TRIES_BEFORE_ERROR) + { +#ifdef DEBUG + fprintf (stderr, "Error level on retries, throwing exception (retry=%d)\n", tries); +#endif + throw AIOException(NATIVE_ERROR_AIO_FULL, "Too many retries (500) waiting for a valid iocb block, please increase MAX_IO limit"); + } + ::usleep(WAIT_FOR_SPOT); + } + + if (result<0) + { + std::stringstream str; + str<< "Problem on submit block, errorCode=" << result; + throw AIOException (NATIVE_ERROR_IO, str.str()); + } +} + +void AsyncFile::read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter) +{ + + struct iocb * iocb = new struct iocb(); + ::io_prep_pread(iocb, fileHandle, buffer, size, position); + iocb->data = (void *) adapter; + + int tries = 0; + int result = 0; + + while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN)) + { +#ifdef DEBUG + fprintf (stderr, "Retrying block as iocb was full (retry=%d)\n", tries); +#endif + tries ++; + if (tries > TRIES_BEFORE_WARN) + { +#ifdef DEBUG + fprintf (stderr, "Warning level on retries, informing logger (retry=%d)\n", tries); +#endif + controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times"); + } + + if (tries > TRIES_BEFORE_ERROR) + { +#ifdef DEBUG + fprintf (stderr, "Error level on retries, throwing exception (retry=%d)\n", tries); +#endif + throw AIOException(NATIVE_ERROR_AIO_FULL, "Too many retries (500) waiting for a valid iocb block, please increase MAX_IO limit"); + } + ::usleep(WAIT_FOR_SPOT); + } + + if (result<0) + { + std::stringstream str; + str<< "Problem on submit block, errorCode=" << result; + throw AIOException (NATIVE_ERROR_IO, str.str()); + } +} + +long AsyncFile::getSize() +{ + struct stat statBuffer; + + if (fstat(fileHandle, &statBuffer) < 0) + { + return -1l; + } + return statBuffer.st_size; +} + + +void AsyncFile::stopPoller(THREAD_CONTEXT threadContext) +{ + pollerRunning = 0; + + + struct iocb * iocb = new struct iocb(); + ::io_prep_pwrite(iocb, fileHandle, 0, 0, 0); + iocb->data = (void *) -1; + + int result = 0; + + while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN)) + { + fprintf(stderr, "Couldn't send request to stop poller, trying again"); + controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again"); + ::usleep(WAIT_FOR_SPOT); + } + + // Waiting the Poller to finish (by giving up the lock) + LockClass lock(&pollerMutex); +} + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AsyncFile.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/AsyncFile.h b/activemq6-native/src/main/c/AsyncFile.h new file mode 100644 index 0000000..e74b78e --- /dev/null +++ b/activemq6-native/src/main/c/AsyncFile.h @@ -0,0 +1,89 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#ifndef FILEOUTPUT_H_ +#define FILEOUTPUT_H_ + +#include +#include +#include +#include +#include "JAIODatatypes.h" +#include "AIOException.h" + +class AIOController; + +class CallbackAdapter; + +/** Author: Clebert Suconic at Redhat dot com*/ +class AsyncFile +{ +private: + io_context_t aioContext; + struct io_event *events; + int fileHandle; + std::string fileName; + + pthread_mutex_t fileMutex; + pthread_mutex_t pollerMutex; + + AIOController * controller; + + bool pollerRunning; + + int maxIO; + +public: + AsyncFile(std::string & _fileName, AIOController * controller, int maxIO); + virtual ~AsyncFile(); + + void write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter); + + /** Write directly to the file without using libaio queue */ + void writeInternal(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer); + + void read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter); + + int getHandle() + { + return fileHandle; + } + + long getSize(); + + inline void * newBuffer(int size) + { + void * buffer = 0; + if (::posix_memalign(&buffer, 512, size)) + { + throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign"); + } + return buffer; + + } + + inline void destroyBuffer(void * buffer) + { + ::free(buffer); + } + + + // Finishes the polling thread (if any) and return + void stopPoller(THREAD_CONTEXT threadContext); + void preAllocate(THREAD_CONTEXT threadContext, off_t position, int blocks, size_t size, int fillChar); + + void pollEvents(THREAD_CONTEXT threadContext); + +}; + +#endif /*FILEOUTPUT_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/CallbackAdapter.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/CallbackAdapter.h b/activemq6-native/src/main/c/CallbackAdapter.h new file mode 100644 index 0000000..e9f7241 --- /dev/null +++ b/activemq6-native/src/main/c/CallbackAdapter.h @@ -0,0 +1,38 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#ifndef BUFFERADAPTER_H_ +#define BUFFERADAPTER_H_ + +#include + +#include "JAIODatatypes.h" + +class CallbackAdapter +{ +private: + +public: + CallbackAdapter() + { + + } + virtual ~CallbackAdapter() + { + + } + + virtual void done(THREAD_CONTEXT ) = 0; + virtual void onError(THREAD_CONTEXT , long , std::string )=0; +}; +#endif /*BUFFERADAPTER_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JAIODatatypes.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/JAIODatatypes.h b/activemq6-native/src/main/c/JAIODatatypes.h new file mode 100644 index 0000000..a0840fa --- /dev/null +++ b/activemq6-native/src/main/c/JAIODatatypes.h @@ -0,0 +1,24 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#ifndef JAIODATATYPES_H_ +#define JAIODATATYPES_H_ + +#include + +#define THREAD_CONTEXT JNIEnv *& +#define JNI_ENV(pointer) pointer +#define ALIGNMENT 512 + + +#endif /*JAIODATATYPES_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JNICallbackAdapter.cpp ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/JNICallbackAdapter.cpp b/activemq6-native/src/main/c/JNICallbackAdapter.cpp new file mode 100644 index 0000000..2194325 --- /dev/null +++ b/activemq6-native/src/main/c/JNICallbackAdapter.cpp @@ -0,0 +1,58 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include +#include "JNICallbackAdapter.h" +#include +#include "JavaUtilities.h" + +jobject nullObj = NULL; + +JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter() +{ + controller = _controller; + + sequence = _sequence; + + callback = _callback; + + fileController = _fileController; + + bufferReference = _bufferReference; + + isRead = _isRead; + +} + +JNICallbackAdapter::~JNICallbackAdapter() +{ +} + +void JNICallbackAdapter::done(THREAD_CONTEXT threadContext) +{ + JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, sequence, isRead ? nullObj : bufferReference); + + release(threadContext); +} + +void JNICallbackAdapter::onError(THREAD_CONTEXT threadContext, long errorCode, std::string error) +{ + controller->log(threadContext, 0, "Libaio event generated errors, callback object was informed about it"); + + jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data()); + + JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, sequence, isRead ? nullObj : bufferReference, (jint)errorCode, strError); + + release(threadContext); +} + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JNICallbackAdapter.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/JNICallbackAdapter.h b/activemq6-native/src/main/c/JNICallbackAdapter.h new file mode 100644 index 0000000..92404f8 --- /dev/null +++ b/activemq6-native/src/main/c/JNICallbackAdapter.h @@ -0,0 +1,62 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#ifndef JNIBUFFERADAPTER_H_ +#define JNIBUFFERADAPTER_H_ + +#include + +#include "CallbackAdapter.h" +#include "AIOController.h" +#include "JAIODatatypes.h" + + +class JNICallbackAdapter : public CallbackAdapter +{ +private: + + AIOController * controller; + + jobject callback; + + jobject fileController; + + jobject bufferReference; + + jlong sequence; + + // Is this a read operation + short isRead; + + void release(THREAD_CONTEXT threadContext) + { + JNI_ENV(threadContext)->DeleteGlobalRef(callback); + JNI_ENV(threadContext)->DeleteGlobalRef(fileController); + JNI_ENV(threadContext)->DeleteGlobalRef(bufferReference); + delete this; + return; + } + + +public: + // _ob must be a global Reference (use createGloblReferente before calling the constructor) + JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead); + virtual ~JNICallbackAdapter(); + + void done(THREAD_CONTEXT threadContext); + + void onError(THREAD_CONTEXT , long , std::string ); + + +}; +#endif /*JNIBUFFERADAPTER_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JNI_AsynchronousFileImpl.cpp ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/JNI_AsynchronousFileImpl.cpp b/activemq6-native/src/main/c/JNI_AsynchronousFileImpl.cpp new file mode 100644 index 0000000..018744b --- /dev/null +++ b/activemq6-native/src/main/c/JNI_AsynchronousFileImpl.cpp @@ -0,0 +1,373 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "org_hornetq_core_libaio_Native.h" + + +#include "JavaUtilities.h" +#include "AIOController.h" +#include "JNICallbackAdapter.h" +#include "AIOException.h" +#include "Version.h" + + +// This value is set here globally, to avoid passing stuff on stack between java and the native layer on every sleep call +struct timespec nanoTime; + +inline AIOController * getController(JNIEnv *env, jobject & controllerAddress) +{ + return (AIOController *) env->GetDirectBufferAddress(controllerAddress); +} + +/* Inaccessible static: log */ +/* Inaccessible static: totalMaxIO */ +/* Inaccessible static: loaded */ +/* Inaccessible static: EXPECTED_NATIVE_VERSION */ +/* + * Class: org_hornetq_core_asyncio_impl_AsynchronousFileImpl + * Method: openFile + * Signature: (Ljava/lang/String;)I + */ +JNIEXPORT jint JNICALL Java_org_hornetq_core_libaio_Native_openFile + (JNIEnv * env , jclass , jstring jstrFileName) +{ + std::string fileName = convertJavaString(env, jstrFileName); + + return open(fileName.data(), O_RDWR | O_CREAT, 0666); +} + +/* + * Class: org_hornetq_core_asyncio_impl_AsynchronousFileImpl + * Method: closeFile + * Signature: (I)V + */ +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_closeFile + (JNIEnv * , jclass , jint handle) +{ + close(handle); +} + +/* + * Class: org_hornetq_core_asyncio_impl_AsynchronousFileImpl + * Method: flock + * Signature: (I)Z + */ +JNIEXPORT jboolean JNICALL Java_org_hornetq_core_libaio_Native_flock + (JNIEnv * , jclass , jint handle) +{ + return flock(handle, LOCK_EX | LOCK_NB) == 0; +} + + + +/* + * Class: org_jboss_jaio_libaioimpl_LibAIOController + * Method: init + * Signature: (Ljava/lang/String;Ljava/lang/Class;)J + */ +JNIEXPORT jobject JNICALL Java_org_hornetq_core_libaio_Native_init + (JNIEnv * env, jclass, jclass controllerClazz, jstring jstrFileName, jint maxIO, jobject logger) +{ + AIOController * controller = 0; + try + { + std::string fileName = convertJavaString(env, jstrFileName); + + controller = new AIOController(fileName, (int) maxIO); + controller->done = env->GetMethodID(controllerClazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V"); + if (!controller->done) + { + throwException (env, -1, "can't get callbackDone method"); + return 0; + } + + controller->error = env->GetMethodID(controllerClazz, "callbackError", "(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V"); + if (!controller->done) + { + throwException (env, -1, "can't get callbackError method"); + return 0; + } + + jclass loggerClass = env->GetObjectClass(logger); + + if (!(controller->loggerDebug = env->GetMethodID(loggerClass, "debug", "(Ljava/lang/Object;)V"))) return 0; + if (!(controller->loggerWarn = env->GetMethodID(loggerClass, "warn", "(Ljava/lang/Object;)V"))) return 0; + if (!(controller->loggerInfo = env->GetMethodID(loggerClass, "info", "(Ljava/lang/Object;)V"))) return 0; + if (!(controller->loggerError = env->GetMethodID(loggerClass, "error", "(Ljava/lang/Object;)V"))) return 0; + + controller->logger = env->NewGlobalRef(logger); + + return env->NewDirectByteBuffer(controller, 0); + } + catch (AIOException& e){ + if (controller != 0) + { + delete controller; + } + throwException(env, e.getErrorCode(), e.what()); + return 0; + } +} + +/** +* objThis here is passed as a parameter at the java layer. It used to be a JNI this and now it's a java static method + where the intended reference is now passed as an argument +*/ +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_read + (JNIEnv *env, jclass, jobject objThis, jobject controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback) +{ + try + { + AIOController * controller = getController(env, controllerAddress); + void * buffer = env->GetDirectBufferAddress(jbuffer); + + if (buffer == 0) + { + throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); + return; + } + + if (((long)buffer) % 512) + { + throwException(env, NATIVE_ERROR_NOT_ALIGNED, "Buffer not aligned for use with DMA"); + return; + } + + CallbackAdapter * adapter = new JNICallbackAdapter(controller, -1, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), true); + + controller->fileOutput.read(env, position, (size_t)size, buffer, adapter); + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + } +} + + +// Fast memset on buffer +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_resetBuffer + (JNIEnv *env, jclass, jobject jbuffer, jint size) +{ + void * buffer = env->GetDirectBufferAddress(jbuffer); + + if (buffer == 0) + { + throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); + return; + } + + memset(buffer, 0, (size_t)size); + +} + +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_destroyBuffer + (JNIEnv * env, jclass, jobject jbuffer) +{ + if (jbuffer == 0) + { + throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Null Buffer"); + return; + } + void * buffer = env->GetDirectBufferAddress(jbuffer); + free(buffer); +} + +JNIEXPORT jobject JNICALL Java_org_hornetq_core_libaio_Native_newNativeBuffer + (JNIEnv * env, jclass, jlong size) +{ + try + { + + if (size % ALIGNMENT) + { + throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512"); + return 0; + } + + + // This will allocate a buffer, aligned by 512. + // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory + void * buffer = 0; + if (::posix_memalign(&buffer, 512, size)) + { + throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign"); + return 0; + } + + memset(buffer, 0, (size_t)size); + + jobject jbuffer = env->NewDirectByteBuffer(buffer, size); + return jbuffer; + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + return 0; + } +} + +/** +* objThis here is passed as a parameter at the java layer. It used to be a JNI this and now it's a java static method + where the intended reference is now passed as an argument +*/ +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_write + (JNIEnv *env, jclass, jobject objThis, jobject controllerAddress, jlong sequence, jlong position, jlong size, jobject jbuffer, jobject callback) +{ + try + { + AIOController * controller = getController(env, controllerAddress); + void * buffer = env->GetDirectBufferAddress(jbuffer); + + if (buffer == 0) + { + throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); + return; + } + + + CallbackAdapter * adapter = new JNICallbackAdapter(controller, sequence, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), false); + + controller->fileOutput.write(env, position, (size_t)size, buffer, adapter); + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + } +} + +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_writeInternal + (JNIEnv * env, jclass, jobject controllerAddress, jlong positionToWrite, jlong size, jobject jbuffer) +{ + try + { + AIOController * controller = getController(env, controllerAddress); + void * buffer = env->GetDirectBufferAddress(jbuffer); + + if (buffer == 0) + { + throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); + return; + } + + controller->fileOutput.writeInternal(env, positionToWrite, (size_t)size, buffer); + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + } +} + + +JNIEXPORT void Java_org_hornetq_core_libaio_Native_internalPollEvents + (JNIEnv *env, jclass, jobject controllerAddress) +{ + try + { + AIOController * controller = getController(env, controllerAddress); + controller->fileOutput.pollEvents(env); + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + } +} + +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_stopPoller + (JNIEnv *env, jclass, jobject controllerAddress) +{ + try + { + AIOController * controller = getController(env, controllerAddress); + controller->fileOutput.stopPoller(env); + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + } +} + +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_closeInternal + (JNIEnv *env, jclass, jobject controllerAddress) +{ + try + { + AIOController * controller = getController(env, controllerAddress); + controller->destroy(env); + delete controller; + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + } +} + + +JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_fill + (JNIEnv * env, jclass, jobject controllerAddress, jlong position, jint blocks, jlong size, jbyte fillChar) +{ + try + { + AIOController * controller = getController(env, controllerAddress); + + controller->fileOutput.preAllocate(env, position, blocks, size, fillChar); + + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + } +} + + + +/** It does nothing... just return true to make sure it has all the binary dependencies */ +JNIEXPORT jint JNICALL Java_org_hornetq_core_libaio_Native_getNativeVersion + (JNIEnv *, jclass) + +{ + return _VERSION_NATIVE_AIO; +} + + +JNIEXPORT jlong JNICALL Java_org_hornetq_core_libaio_Native_size0 + (JNIEnv * env, jclass, jobject controllerAddress) +{ + try + { + AIOController * controller = getController(env, controllerAddress); + + long size = controller->fileOutput.getSize(); + if (size < 0) + { + throwException(env, NATIVE_ERROR_INTERNAL, "InternalError on Native Layer: method size failed"); + return -1l; + } + return size; + } + catch (AIOException& e) + { + throwException(env, e.getErrorCode(), e.what()); + return -1l; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JavaUtilities.cpp ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/JavaUtilities.cpp b/activemq6-native/src/main/c/JavaUtilities.cpp new file mode 100644 index 0000000..b5ddc57 --- /dev/null +++ b/activemq6-native/src/main/c/JavaUtilities.cpp @@ -0,0 +1,58 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include +#include +#include +#include "JavaUtilities.h" + + +void throwRuntimeException(JNIEnv * env, const char * message) +{ + jclass exceptionClass = env->FindClass("java/lang/RuntimeException"); + env->ThrowNew(exceptionClass,message); + +} + +void throwException(JNIEnv * env, const int code, const char * message) +{ + jclass exceptionClass = env->FindClass("org/hornetq/api/core/HornetQException"); + if (exceptionClass==NULL) + { + std::cerr << "Couldn't throw exception message:= " << message << "\n"; + throwRuntimeException (env, "Can't find Exception class"); + return; + } + + jmethodID constructor = env->GetMethodID(exceptionClass, "", "(ILjava/lang/String;)V"); + if (constructor == NULL) + { + std::cerr << "Couldn't find the constructor ***"; + throwRuntimeException (env, "Can't find Constructor for Exception"); + return; + } + + jstring strError = env->NewStringUTF(message); + jthrowable ex = (jthrowable)env->NewObject(exceptionClass, constructor, code, strError); + env->Throw(ex); + +} + +std::string convertJavaString(JNIEnv * env, jstring& jstr) +{ + const char * valueStr = env->GetStringUTFChars(jstr, NULL); + std::string data(valueStr); + env->ReleaseStringUTFChars(jstr, valueStr); + return data; +} + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JavaUtilities.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/JavaUtilities.h b/activemq6-native/src/main/c/JavaUtilities.h new file mode 100644 index 0000000..ff5a828 --- /dev/null +++ b/activemq6-native/src/main/c/JavaUtilities.h @@ -0,0 +1,22 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#ifndef JAVAUTILITIES_H_ +#define JAVAUTILITIES_H_ +#include +#include + +void throwException(JNIEnv * env, const int code, const char * message); +std::string convertJavaString(JNIEnv * env, jstring& jstr); + +#endif /*JAVAUTILITIES_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/LockClass.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/LockClass.h b/activemq6-native/src/main/c/LockClass.h new file mode 100644 index 0000000..de7bfb6 --- /dev/null +++ b/activemq6-native/src/main/c/LockClass.h @@ -0,0 +1,35 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#ifndef LOCKCLASS_H_ +#define LOCKCLASS_H_ + +#include + +class LockClass +{ +protected: + pthread_mutex_t* _m; +public: + inline LockClass(pthread_mutex_t* m) : _m(m) + { + ::pthread_mutex_lock(_m); + } + inline ~LockClass() + { + ::pthread_mutex_unlock(_m); + } +}; + + +#endif /*LOCKCLASS_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/Version.h ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/c/Version.h b/activemq6-native/src/main/c/Version.h new file mode 100644 index 0000000..7204802 --- /dev/null +++ b/activemq6-native/src/main/c/Version.h @@ -0,0 +1,8 @@ + +#ifndef _VERSION_NATIVE_AIO + +// This definition needs to match org.hornetq.core.asyncio.impl.AsynchronousFileImpl.EXPECTED_NATIVE_VERSION +// Or else the native module won't be loaded because of version mismatches +#define _VERSION_NATIVE_AIO 52 +#endif + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/java/org/apache/activemq6/core/libaio/Native.java ---------------------------------------------------------------------- diff --git a/activemq6-native/src/main/java/org/apache/activemq6/core/libaio/Native.java b/activemq6-native/src/main/java/org/apache/activemq6/core/libaio/Native.java new file mode 100644 index 0000000..c7705d9 --- /dev/null +++ b/activemq6-native/src/main/java/org/apache/activemq6/core/libaio/Native.java @@ -0,0 +1,74 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.libaio; + +import java.nio.ByteBuffer; + +import org.apache.activemq6.api.core.HornetQException; + +/** + * @author Clebert Suconic + */ + +public class Native +{ + // Functions used for locking files ..... + public static native int openFile(String fileName); + + public static native void closeFile(int handle); + + public static native boolean flock(int handle); + // Functions used for locking files ^^^^^^^^ + + public static native void resetBuffer(ByteBuffer directByteBuffer, int size); + + public static native void destroyBuffer(ByteBuffer buffer); + + public static native ByteBuffer newNativeBuffer(long size); + + public static native void newInit(Class someClass); + + public static native ByteBuffer init(Class controllerClass, String fileName, int maxIO, Object logger) throws HornetQException; + + public static native long size0(ByteBuffer handle); + + public static native void write(Object thisObject, ByteBuffer handle, + long sequence, + long position, + long size, + ByteBuffer buffer, + Object aioPackageCallback) throws HornetQException; + + /** a direct write to the file without the use of libaio's submit. */ + public static native void writeInternal(ByteBuffer handle, long positionToWrite, long size, ByteBuffer bytes) throws HornetQException; + + /** + *This is using org.apache.activemq6.core.asyncio.AIOCallback + */ + public static native void read(Object thisObject, ByteBuffer handle, long position, long size, ByteBuffer buffer, Object aioPackageCallback) throws HornetQException; + + public static native void fill(ByteBuffer handle, long position, int blocks, long size, byte fillChar) throws HornetQException; + + public static native void closeInternal(ByteBuffer handler); + + public static native void stopPoller(ByteBuffer handler); + + /** A native method that does nothing, and just validate if the ELF dependencies are loaded and on the correct platform as this binary format */ + public static native int getNativeVersion(); + + /** Poll asynchronous events from internal queues */ + public static native void internalPollEvents(ByteBuffer handler); + + // Inner classes --------------------------------------------------------------------- + +}