Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 56B0E200C48 for ; Thu, 6 Apr 2017 15:31:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 55334160B83; Thu, 6 Apr 2017 13:31:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A3F10160B84 for ; Thu, 6 Apr 2017 15:31:11 +0200 (CEST) Received: (qmail 94801 invoked by uid 500); 6 Apr 2017 13:31:10 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 94697 invoked by uid 99); 6 Apr 2017 13:31:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 13:31:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE820E8F01; Thu, 6 Apr 2017 13:31:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 06 Apr 2017 13:31:11 -0000 Message-Id: <3fbff17a54e34859b480542fc265a1d5@git.apache.org> In-Reply-To: <358816612602444bb6656810c8375230@git.apache.org> References: <358816612602444bb6656810c8375230@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/50] [abbrv] ignite git commit: IGNITE-4564: All setters on public configuration now return "this" instance to allow convenient chaining. This closes #1449. archived-at: Thu, 06 Apr 2017 13:31:13 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java index 619c468..35cb62a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java @@ -30,6 +30,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.checkpoint.CheckpointListener; import org.apache.ignite.spi.checkpoint.CheckpointSpi; @@ -101,7 +102,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED; * @see org.apache.ignite.spi.checkpoint.CheckpointSpi */ @IgniteSpiMultipleInstancesSupport(true) -public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi, CacheCheckpointSpiMBean { +public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi { /** Default cache name (value is checkpoints). */ public static final String DFLT_CACHE_NAME = "checkpoints"; @@ -124,14 +125,21 @@ public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSp * If cache name is not provided {@link #DFLT_CACHE_NAME} is used. * * @param cacheName Cache name. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setCacheName(String cacheName) { + public CacheCheckpointSpi setCacheName(String cacheName) { this.cacheName = cacheName; + + return this; } - /** {@inheritDoc} */ - @Override public String getCacheName() { + /** + * Gets cache name to be used by this SPI.. + * + * @return Cache name to be used by this SPI. + */ + public String getCacheName() { return cacheName; } @@ -146,7 +154,7 @@ public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSp if (log.isDebugEnabled()) log.debug(configInfo("cacheName", cacheName)); - registerMBean(igniteInstanceName, this, CacheCheckpointSpiMBean.class); + registerMBean(igniteInstanceName, new CacheCheckpointSpiMBeanImpl(this), CacheCheckpointSpiMBean.class); if (log.isDebugEnabled()) log.debug(startInfo()); @@ -247,7 +255,29 @@ public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSp } /** {@inheritDoc} */ + @Override public CacheCheckpointSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheCheckpointSpi.class, this); } + + /** + * MBean implementation for CacheCheckpointSpi. + */ + private class CacheCheckpointSpiMBeanImpl extends IgniteSpiMBeanAdapter implements CacheCheckpointSpiMBean { + /** {@inheritDoc} */ + CacheCheckpointSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public String getCacheName() { + return CacheCheckpointSpi.this.getCacheName(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java index a052704..744ce59 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java @@ -32,6 +32,7 @@ import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.checkpoint.CheckpointListener; import org.apache.ignite.spi.checkpoint.CheckpointSpi; @@ -111,7 +112,7 @@ import org.apache.ignite.spi.checkpoint.CheckpointSpi; */ @SuppressWarnings({"JDBCResourceOpenedButNotSafelyClosed", "JDBCExecuteWithNonConstantString"}) @IgniteSpiMultipleInstancesSupport(true) -public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi, JdbcCheckpointSpiMBean { +public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi { /** Default number of retries in case of errors (value is {@code 2}). */ public static final int DFLT_NUMBER_OF_RETRIES = 2; @@ -242,58 +243,102 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi /** Listener. */ private CheckpointListener lsnr; - /** {@inheritDoc} */ - @Override public int getNumberOfRetries() { + /** + * Gets number of retries in case of DB failure. + * + * @return Number of retries. + */ + public int getNumberOfRetries() { return retryNum; } - /** {@inheritDoc} */ - @Override public String getDataSourceInfo() { + /** + * Gets data source description. + * + * @return Description for data source. + */ + public String getDataSourceInfo() { return dataSrc.toString(); } - /** {@inheritDoc} */ - @Override public String getUser() { + /** + * Gets checkpoint jdbc user name. + * + * @return User name for checkpoint jdbc. + */ + public String getUser() { return user; } - /** {@inheritDoc} */ - @Override public String getPwd() { + /** + * Gets checkpoint jdbc password. + * + * @return Password for checkpoint jdbc. + */ + public String getPwd() { return pwd; } - /** {@inheritDoc} */ - @Override public String getCheckpointTableName() { + /** + * Gets checkpoint table name. + * + * @return Checkpoint table name. + */ + public String getCheckpointTableName() { return tblName; } - /** {@inheritDoc} */ - @Override public String getKeyFieldName() { + /** + * Gets key field name for checkpoint table. + * + * @return Key field name for checkpoint table. + */ + public String getKeyFieldName() { return keyName; } - /** {@inheritDoc} */ - @Override public String getKeyFieldType() { + /** + * Gets key field type for checkpoint table. + * + * @return Key field type for checkpoint table. + */ + public String getKeyFieldType() { return keyType; } - /** {@inheritDoc} */ - @Override public String getValueFieldName() { + /** + * Gets value field name for checkpoint table. + * + * @return Value field name for checkpoint table. + */ + public String getValueFieldName() { return valName; } - /** {@inheritDoc} */ - @Override public String getValueFieldType() { + /** + * Gets value field type for checkpoint table. + * + * @return Value field type for checkpoint table. + */ + public String getValueFieldType() { return valType; } - /** {@inheritDoc} */ - @Override public String getExpireDateFieldName() { + /** + * Gets expiration date field name for checkpoint table. + * + * @return Create date field name for checkpoint table. + */ + public String getExpireDateFieldName() { return expDateName; } - /** {@inheritDoc} */ - @Override public String getExpireDateFieldType() { + /** + * Gets expiration date field type for checkpoint table. + * + * @return Expiration date field type for checkpoint table. + */ + public String getExpireDateFieldType() { return expDateType; } @@ -306,10 +351,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * this SPI from Spring configuration file. Refer to {@code Apache DBCP} project for more information. * * @param dataSrc DataSource object to set. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = false) - public void setDataSource(DataSource dataSrc) { + public JdbcCheckpointSpi setDataSource(DataSource dataSrc) { this.dataSrc = dataSrc; + + return this; } /** @@ -317,10 +365,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * the value is {@link #DFLT_NUMBER_OF_RETRIES}. * * @param retryNum Number of retries in case of any database errors. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setNumberOfRetries(int retryNum) { + public JdbcCheckpointSpi setNumberOfRetries(int retryNum) { this.retryNum = retryNum; + + return this; } /** @@ -329,10 +380,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * * @param user Checkpoint database user name to set. * @see #setPwd(String) + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setUser(String user) { + public JdbcCheckpointSpi setUser(String user) { this.user = user; + + return this; } /** @@ -341,20 +395,26 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * * @param pwd Checkpoint database password to set. * @see #setUser(String) + ** @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setPwd(String pwd) { + public JdbcCheckpointSpi setPwd(String pwd) { this.pwd = pwd; + + return this; } /** * Sets checkpoint table name. By default {@link #DFLT_CHECKPOINT_TABLE_NAME} is used. * * @param tblName Checkpoint table name to set. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setCheckpointTableName(String tblName) { + public JdbcCheckpointSpi setCheckpointTableName(String tblName) { this.tblName = tblName; + + return this; } /** @@ -363,10 +423,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * change key field type (see {@link #setKeyFieldType(String)}). * * @param keyName Checkpoint key field name to set. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setKeyFieldName(String keyName) { + public JdbcCheckpointSpi setKeyFieldName(String keyName) { this.keyName = keyName; + + return this; } /** @@ -375,10 +438,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * By default {@link #DFLT_EXPIRE_DATE_FIELD_TYPE} is used. * * @param keyType Checkpoint key field type to set. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setKeyFieldType(String keyType) { + public JdbcCheckpointSpi setKeyFieldType(String keyType) { this.keyType = keyType; + + return this; } /** @@ -387,10 +453,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * (see {@link #setValueFieldType(String)}). * * @param valName Checkpoint value field name to set. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setValueFieldName(String valName) { + public JdbcCheckpointSpi setValueFieldName(String valName) { this.valName = valName; + + return this; } /** @@ -400,10 +469,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * then the type should be {@code longvarbinary}. * * @param valType Checkpoint value field type to set. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setValueFieldType(String valType) { + public JdbcCheckpointSpi setValueFieldType(String valType) { this.valType = valType; + + return this; } /** @@ -413,10 +485,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * (see {@link #setExpireDateFieldType(String)}). * * @param expDateName Checkpoint expiration date field name to set. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setExpireDateFieldName(String expDateName) { + public JdbcCheckpointSpi setExpireDateFieldName(String expDateName) { this.expDateName = expDateName; + + return this; } /** @@ -425,10 +500,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi * corresponding SQL {@code DATETIME} type. * * @param expDateType Checkpoint expiration date field type to set. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setExpireDateFieldType(String expDateType) { + public JdbcCheckpointSpi setExpireDateFieldType(String expDateType) { this.expDateType = expDateType; + + return this; } /** @@ -880,4 +958,77 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi @Override public void setCheckpointListener(CheckpointListener lsnr) { this.lsnr = lsnr; } + + /** {@inheritDoc} */ + @Override + public JdbcCheckpointSpi setName(String name) { + super.setName(name); + + return this; + } + + /** + * MBean implementation for JdbcCheckpointSpi. + */ + private class JdbcCheckpointSpiMBeanImpl extends IgniteSpiMBeanAdapter implements JdbcCheckpointSpiMBean { + /** {@inheritDoc} */ + JdbcCheckpointSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public int getNumberOfRetries() { + return JdbcCheckpointSpi.this.getNumberOfRetries(); + } + + /** {@inheritDoc} */ + @Override public String getDataSourceInfo() { + return JdbcCheckpointSpi.this.getDataSourceInfo(); + } + + /** {@inheritDoc} */ + @Override public String getUser() { + return JdbcCheckpointSpi.this.getUser(); + } + + /** {@inheritDoc} */ + @Override public String getPwd() { + return JdbcCheckpointSpi.this.getPwd(); + } + + /** {@inheritDoc} */ + @Override public String getCheckpointTableName() { + return JdbcCheckpointSpi.this.getCheckpointTableName(); + } + + /** {@inheritDoc} */ + @Override public String getKeyFieldName() { + return JdbcCheckpointSpi.this.getKeyFieldName(); + } + + /** {@inheritDoc} */ + @Override public String getKeyFieldType() { + return JdbcCheckpointSpi.this.getKeyFieldType(); + } + + /** {@inheritDoc} */ + @Override public String getValueFieldName() { + return JdbcCheckpointSpi.this.getValueFieldName(); + } + + /** {@inheritDoc} */ + @Override public String getValueFieldType() { + return JdbcCheckpointSpi.this.getValueFieldType(); + } + + /** {@inheritDoc} */ + @Override public String getExpireDateFieldName() { + return JdbcCheckpointSpi.this.getExpireDateFieldName(); + } + + /** {@inheritDoc} */ + @Override public String getExpireDateFieldType() { + return JdbcCheckpointSpi.this.getExpireDateFieldType(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java index c3ac202..d0bf2d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java @@ -71,6 +71,13 @@ public class NoopCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi } /** {@inheritDoc} */ + @Override public NoopCheckpointSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(NoopCheckpointSpi.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java index 1917d38..29a7ec1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java @@ -42,6 +42,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiConsistencyChecked; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.checkpoint.CheckpointListener; import org.apache.ignite.spi.checkpoint.CheckpointSpi; @@ -118,8 +119,7 @@ import org.jetbrains.annotations.Nullable; */ @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = false) -public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi, - SharedFsCheckpointSpiMBean { +public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi { /** * Default checkpoint directory. Note that this path is relative to {@code IGNITE_HOME/work} folder * if {@code IGNITE_HOME} system or environment variable specified, otherwise it is relative to @@ -177,13 +177,21 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin dirPaths.offer(DFLT_DIR_PATH); } - /** {@inheritDoc} */ - @Override public Collection getDirectoryPaths() { + /** + * Gets collection of all configured paths where checkpoints can be saved. + * + * @return Collection of all configured paths. + */ + public Collection getDirectoryPaths() { return dirPaths; } - /** {@inheritDoc} */ - @Override public String getCurrentDirectoryPath() { + /** + * Gets path to the directory where all checkpoints are saved. + * + * @return Path to the checkpoints directory. + */ + public String getCurrentDirectoryPath() { return curDirPath; } @@ -196,13 +204,16 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin * * @param dirPaths Absolute or Ignite installation home folder relative path where checkpoints * will be stored. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setDirectoryPaths(Collection dirPaths) { + public SharedFsCheckpointSpi setDirectoryPaths(Collection dirPaths) { A.ensure(!F.isEmpty(dirPaths), "!F.isEmpty(dirPaths)"); this.dirPaths.clear(); this.dirPaths.addAll(dirPaths); + + return this; } /** {@inheritDoc} */ @@ -227,7 +238,7 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin if (!folder.isDirectory()) throw new IgniteSpiException("Checkpoint directory path is not a valid directory: " + curDirPath); - registerMBean(igniteInstanceName, this, SharedFsCheckpointSpiMBean.class); + registerMBean(igniteInstanceName, new SharedFsCheckpointSpiMBeanImpl(this), SharedFsCheckpointSpiMBean.class); // Ack parameters. if (log.isDebugEnabled()) { @@ -506,7 +517,34 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin } /** {@inheritDoc} */ + @Override public SharedFsCheckpointSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SharedFsCheckpointSpi.class, this); } + + /** + * MBean implementation for SharedFsCheckpointSpi. + */ + private class SharedFsCheckpointSpiMBeanImpl extends IgniteSpiMBeanAdapter implements SharedFsCheckpointSpiMBean { + /** {@inheritDoc} */ + SharedFsCheckpointSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public Collection getDirectoryPaths() { + return SharedFsCheckpointSpi.this.getDirectoryPaths(); + } + + /** {@inheritDoc} */ + @Override public String getCurrentDirectoryPath() { + return SharedFsCheckpointSpi.this.getCurrentDirectoryPath(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java index d94b453..703e90e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java @@ -26,6 +26,7 @@ import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.collision.CollisionContext; import org.apache.ignite.spi.collision.CollisionExternalListener; @@ -78,8 +79,7 @@ import org.apache.ignite.spi.collision.CollisionSpi; * */ @IgniteSpiMultipleInstancesSupport(true) -public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi, - FifoQueueCollisionSpiMBean { +public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi { /** * Default number of parallel jobs allowed (set to number of cores times 2). */ @@ -110,49 +110,88 @@ public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements Collision /** Number of jobs that are held. */ private volatile int heldCnt; - /** {@inheritDoc} */ - @Override public int getParallelJobsNumber() { + /** + * See {@link #setParallelJobsNumber(int)} + * + * @return Number of jobs that can be executed in parallel. + */ + public int getParallelJobsNumber() { return parallelJobsNum; } - /** {@inheritDoc} */ + /** + * Sets number of jobs that can be executed in parallel. + * + * @param parallelJobsNum Parallel jobs number. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setParallelJobsNumber(int parallelJobsNum) { + public FifoQueueCollisionSpi setParallelJobsNumber(int parallelJobsNum) { A.ensure(parallelJobsNum > 0, "parallelJobsNum > 0"); this.parallelJobsNum = parallelJobsNum; + + return this; } - /** {@inheritDoc} */ - @Override public int getWaitingJobsNumber() { + /** + * See {@link #setWaitingJobsNumber(int)} + * + * @return Maximum allowed number of waiting jobs. + */ + public int getWaitingJobsNumber() { return waitJobsNum; } - /** {@inheritDoc} */ + /** + * Sets maximum number of jobs that are allowed to wait in waiting queue. If number + * of waiting jobs ever exceeds this number, excessive jobs will be rejected. + * + * @param waitJobsNum Waiting jobs number. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setWaitingJobsNumber(int waitJobsNum) { + public FifoQueueCollisionSpi setWaitingJobsNumber(int waitJobsNum) { A.ensure(waitJobsNum >= 0, "waitingJobsNum >= 0"); this.waitJobsNum = waitJobsNum; + + return this; } - /** {@inheritDoc} */ - @Override public int getCurrentWaitJobsNumber() { + /** + * Gets current number of jobs that wait for the execution. + * + * @return Number of jobs that wait for execution. + */ + public int getCurrentWaitJobsNumber() { return waitingCnt; } - /** {@inheritDoc} */ - @Override public int getCurrentActiveJobsNumber() { + /** + * Gets current number of jobs that are active, i.e. {@code 'running + held'} jobs. + * + * @return Number of active jobs. + */ + public int getCurrentActiveJobsNumber() { return runningCnt + heldCnt; } - /** {@inheritDoc} */ - @Override public int getCurrentRunningJobsNumber() { + /** + * Gets number of currently running (not {@code 'held}) jobs. + * + * @return Number of currently running (not {@code 'held}) jobs. + */ + public int getCurrentRunningJobsNumber() { return runningCnt; } - /** {@inheritDoc} */ - @Override public int getCurrentHeldJobsNumber() { + /** + * Gets number of currently {@code 'held'} jobs. + * + * @return Number of currently {@code 'held'} jobs. + */ + public int getCurrentHeldJobsNumber() { return heldCnt; } @@ -168,7 +207,7 @@ public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements Collision if (log.isDebugEnabled()) log.debug(configInfo("parallelJobsNum", parallelJobsNum)); - registerMBean(igniteInstanceName, this, FifoQueueCollisionSpiMBean.class); + registerMBean(igniteInstanceName, new FifoQueueCollisionSpiMBeanImpl(this), FifoQueueCollisionSpiMBean.class); // Ack start. if (log.isDebugEnabled()) @@ -252,7 +291,64 @@ public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements Collision } /** {@inheritDoc} */ + @Override public FifoQueueCollisionSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(FifoQueueCollisionSpi.class, this); } + + /** + * MBean implementation for FifoQueueCollisionSpi. + */ + private class FifoQueueCollisionSpiMBeanImpl extends IgniteSpiMBeanAdapter implements FifoQueueCollisionSpiMBean { + /** {@inheritDoc} */ + FifoQueueCollisionSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public int getParallelJobsNumber() { + return FifoQueueCollisionSpi.this.getParallelJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentWaitJobsNumber() { + return FifoQueueCollisionSpi.this.getCurrentWaitJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentActiveJobsNumber() { + return FifoQueueCollisionSpi.this.getCurrentActiveJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentRunningJobsNumber() { + return FifoQueueCollisionSpi.this.getCurrentRunningJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentHeldJobsNumber() { + return FifoQueueCollisionSpi.this.getCurrentHeldJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getWaitingJobsNumber() { + return FifoQueueCollisionSpi.this.getWaitingJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public void setWaitingJobsNumber(int waitJobsNum) { + FifoQueueCollisionSpi.this.setWaitingJobsNumber(waitJobsNum); + } + + /** {@inheritDoc} */ + @Override public void setParallelJobsNumber(int parallelJobsNum) { + FifoQueueCollisionSpi.this.setParallelJobsNumber(parallelJobsNum); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java index 40d47a2..59283e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java @@ -75,7 +75,7 @@ public interface FifoQueueCollisionSpiMBean extends IgniteSpiManagementMBean { @MXBeanDescription("Number of active jobs.") public int getCurrentActiveJobsNumber(); - /* + /** * Gets number of currently running (not {@code 'held}) jobs. * * @return Number of currently running (not {@code 'held}) jobs. http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java index 37db103..8a02225 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java @@ -49,6 +49,7 @@ import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiConsistencyChecked; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.collision.CollisionContext; import org.apache.ignite.spi.collision.CollisionExternalListener; @@ -185,8 +186,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = true) -public class JobStealingCollisionSpi extends IgniteSpiAdapter implements CollisionSpi, - JobStealingCollisionSpiMBean { +public class JobStealingCollisionSpi extends IgniteSpiAdapter implements CollisionSpi { /** Maximum number of attempts to steal job by another node (default is {@code 5}). */ public static final int DFLT_MAX_STEALING_ATTEMPTS = 5; @@ -305,66 +305,133 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi /** */ private Comparator cmp; - /** {@inheritDoc} */ + /** + * Sets number of jobs that can be executed in parallel. + * + * @param activeJobsThreshold Number of jobs that can be executed in parallel. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setActiveJobsThreshold(int activeJobsThreshold) { + public JobStealingCollisionSpi setActiveJobsThreshold(int activeJobsThreshold) { A.ensure(activeJobsThreshold >= 0, "activeJobsThreshold >= 0"); this.activeJobsThreshold = activeJobsThreshold; + + return this; } - /** {@inheritDoc} */ - @Override public int getActiveJobsThreshold() { + /** + * See {@link #setActiveJobsThreshold(int)}. + * + * @return Number of jobs that can be executed in parallel. + */ + public int getActiveJobsThreshold() { return activeJobsThreshold; } - /** {@inheritDoc} */ + /** + * Sets job count threshold at which this node will + * start stealing jobs from other nodes. + * + * @param waitJobsThreshold Job count threshold. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setWaitJobsThreshold(int waitJobsThreshold) { + public JobStealingCollisionSpi setWaitJobsThreshold(int waitJobsThreshold) { A.ensure(waitJobsThreshold >= 0, "waitJobsThreshold >= 0"); this.waitJobsThreshold = waitJobsThreshold; + + return this; } - /** {@inheritDoc} */ - @Override public int getWaitJobsThreshold() { + /** + * See {@link #setWaitJobsThreshold(int)}. + * + * @return Job count threshold. + */ + public int getWaitJobsThreshold() { return waitJobsThreshold; } - /** {@inheritDoc} */ + /** + * Message expire time configuration parameter. If no response is received + * from a busy node to a job stealing message, then implementation will + * assume that message never got there, or that remote node does not have + * this node included into topology of any of the jobs it has. + * + * @param msgExpireTime Message expire time. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setMessageExpireTime(long msgExpireTime) { + public JobStealingCollisionSpi setMessageExpireTime(long msgExpireTime) { A.ensure(msgExpireTime > 0, "messageExpireTime > 0"); this.msgExpireTime = msgExpireTime; + + return this; } - /** {@inheritDoc} */ - @Override public long getMessageExpireTime() { + /** + * See {@link #setMessageExpireTime(long)}. + * + * @return Message expire time. + */ + public long getMessageExpireTime() { return msgExpireTime; } - /** {@inheritDoc} */ + /** + * Gets flag indicating whether this node should attempt to steal jobs + * from other nodes. If {@code false}, then this node will steal allow + * jobs to be stolen from it, but won't attempt to steal any jobs from + * other nodes. + *

+ * Default value is {@code true}. + * + * @param isStealingEnabled Flag indicating whether this node should attempt to steal jobs + * from other nodes. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setStealingEnabled(boolean isStealingEnabled) { + public JobStealingCollisionSpi setStealingEnabled(boolean isStealingEnabled) { this.isStealingEnabled = isStealingEnabled; + + return this; } - /** {@inheritDoc} */ - @Override public boolean isStealingEnabled() { + /** + * See {@link #setStealingEnabled(boolean)}. + * + * @return Flag indicating whether this node should attempt to steal jobs + * from other nodes. + */ + public boolean isStealingEnabled() { return isStealingEnabled; } - /** {@inheritDoc} */ + /** + * Gets maximum number of attempts to steal job by another node. + * If not specified, {@link JobStealingCollisionSpi#DFLT_MAX_STEALING_ATTEMPTS} + * value will be used. + * + * @param maxStealingAttempts Maximum number of attempts to steal job by another node. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setMaximumStealingAttempts(int maxStealingAttempts) { + public JobStealingCollisionSpi setMaximumStealingAttempts(int maxStealingAttempts) { A.ensure(maxStealingAttempts > 0, "maxStealingAttempts > 0"); this.maxStealingAttempts = maxStealingAttempts; + + return this; } - /** {@inheritDoc} */ - @Override public int getMaximumStealingAttempts() { + /** + * See {@link #setMaximumStealingAttempts(int)}. + * + * @return Maximum number of attempts to steal job by another node. + */ + public int getMaximumStealingAttempts() { return maxStealingAttempts; } @@ -374,47 +441,80 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi * {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} methods). * * @param stealAttrs Node attributes to enable job stealing for. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setStealingAttributes(Map stealAttrs) { + public JobStealingCollisionSpi setStealingAttributes(Map stealAttrs) { this.stealAttrs = stealAttrs; + + return this; } - /** {@inheritDoc} */ - @Override public Map getStealingAttributes() { + /** + * {@link #setStealingAttributes(Map)}. + * + * @return Node attributes to enable job stealing for. + */ + public Map getStealingAttributes() { return stealAttrs; } - /** {@inheritDoc} */ - @Override public int getCurrentRunningJobsNumber() { + /** + * Gets number of currently running (not {@code 'held}) jobs. + * + * @return Number of currently running (not {@code 'held}) jobs. + */ + public int getCurrentRunningJobsNumber() { return runningNum; } - /** {@inheritDoc} */ - @Override public int getCurrentHeldJobsNumber() { + /** + * Gets number of currently {@code 'held'} jobs. + * + * @return Number of currently {@code 'held'} jobs. + */ + public int getCurrentHeldJobsNumber() { return heldNum; } - /** {@inheritDoc} */ - @Override public int getCurrentWaitJobsNumber() { + /** + * Gets current number of jobs that wait for the execution. + * + * @return Number of jobs that wait for execution. + */ + public int getCurrentWaitJobsNumber() { return waitingNum; } - /** {@inheritDoc} */ - @Override public int getCurrentActiveJobsNumber() { + /** + * Gets current number of jobs that are being executed. + * + * @return Number of active jobs. + */ + public int getCurrentActiveJobsNumber() { return runningNum + heldNum; } - /** {@inheritDoc} */ - @Override public int getTotalStolenJobsNumber() { + /** + * Gets total number of stolen jobs. + * + * @return Number of stolen jobs. + */ + public int getTotalStolenJobsNumber() { return totalStolenJobsNum.get(); } - /** {@inheritDoc} */ - @Override public int getCurrentJobsToStealNumber() { + /** + * Gets current number of jobs to be stolen. This is outstanding + * requests number. + * + * @return Number of jobs to be stolen. + */ + public int getCurrentJobsToStealNumber() { return stealReqs.get(); } + /** {@inheritDoc} */ @Override public Map getNodeAttributes() throws IgniteSpiException { HashMap res = new HashMap<>(4); @@ -445,7 +545,8 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi log.debug(configInfo("maxStealingAttempts", maxStealingAttempts)); } - registerMBean(igniteInstanceName, this, JobStealingCollisionSpiMBean.class); + registerMBean(igniteInstanceName, new JobStealingCollisionSpiMBeanImpl(this), + JobStealingCollisionSpiMBean.class); // Ack start. if (log.isDebugEnabled()) @@ -698,7 +799,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi // requested to be stolen. Note, that we use lose total steal request // counter to prevent excessive iteration over nodes under load. for (Iterator> iter = rcvMsgMap.entrySet().iterator(); - iter.hasNext() && stealReqs.get() > 0;) { + iter.hasNext() && stealReqs.get() > 0;) { Entry entry = iter.next(); UUID nodeId = entry.getKey(); @@ -998,6 +1099,13 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi } /** {@inheritDoc} */ + @Override public JobStealingCollisionSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(JobStealingCollisionSpi.class, this); } @@ -1047,4 +1155,99 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi } } + /** + * MBean implementation for JobStealingCollisionSpi. + */ + private class JobStealingCollisionSpiMBeanImpl extends IgniteSpiMBeanAdapter + implements JobStealingCollisionSpiMBean { + /** {@inheritDoc} */ + JobStealingCollisionSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public Map getStealingAttributes() { + return JobStealingCollisionSpi.this.getStealingAttributes(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentRunningJobsNumber() { + return JobStealingCollisionSpi.this.getCurrentRunningJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentHeldJobsNumber() { + return JobStealingCollisionSpi.this.getCurrentHeldJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentWaitJobsNumber() { + return JobStealingCollisionSpi.this.getCurrentWaitJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentActiveJobsNumber() { + return JobStealingCollisionSpi.this.getCurrentActiveJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getTotalStolenJobsNumber() { + return JobStealingCollisionSpi.this.getTotalStolenJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentJobsToStealNumber() { + return JobStealingCollisionSpi.this.getCurrentJobsToStealNumber(); + } + + /** {@inheritDoc} */ + @Override public void setActiveJobsThreshold(int activeJobsThreshold) { + JobStealingCollisionSpi.this.setActiveJobsThreshold(activeJobsThreshold); + } + + /** {@inheritDoc} */ + @Override public int getActiveJobsThreshold() { + return JobStealingCollisionSpi.this.getActiveJobsThreshold(); + } + + /** {@inheritDoc} */ + @Override public void setWaitJobsThreshold(int waitJobsThreshold) { + JobStealingCollisionSpi.this.setWaitJobsThreshold(waitJobsThreshold); + } + + /** {@inheritDoc} */ + @Override public int getWaitJobsThreshold() { + return JobStealingCollisionSpi.this.getWaitJobsThreshold(); + } + + /** {@inheritDoc} */ + @Override public void setMessageExpireTime(long msgExpireTime) { + JobStealingCollisionSpi.this.setMessageExpireTime(msgExpireTime); + } + + /** {@inheritDoc} */ + @Override public long getMessageExpireTime() { + return JobStealingCollisionSpi.this.getMessageExpireTime(); + } + + /** {@inheritDoc} */ + @Override public void setStealingEnabled(boolean isStealingEnabled) { + JobStealingCollisionSpi.this.setStealingEnabled(isStealingEnabled); + } + + /** {@inheritDoc} */ + @Override public boolean isStealingEnabled() { + return JobStealingCollisionSpi.this.isStealingEnabled(); + } + + /** {@inheritDoc} */ + @Override public void setMaximumStealingAttempts(int maxStealingAttempts) { + JobStealingCollisionSpi.this.setMaximumStealingAttempts(maxStealingAttempts); + } + + /** {@inheritDoc} */ + @Override public int getMaximumStealingAttempts() { + return JobStealingCollisionSpi.this.getMaximumStealingAttempts(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java index 9c49f70..8052936 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java @@ -52,7 +52,7 @@ public interface JobStealingCollisionSpiMBean extends IgniteSpiManagementMBean { @MXBeanDescription("Number of active jobs.") public int getCurrentActiveJobsNumber(); - /* + /** * Gets number of currently running (not {@code 'held}) jobs. * * @return Number of currently running (not {@code 'held}) jobs. @@ -87,10 +87,10 @@ public interface JobStealingCollisionSpiMBean extends IgniteSpiManagementMBean { /** * Sets number of jobs that can be executed in parallel. * - * @param activeJobsTreshold Number of jobs that can be executed in parallel. + * @param activeJobsThreshold Number of jobs that can be executed in parallel. */ @MXBeanDescription("Number of jobs that can be executed in parallel.") - public void setActiveJobsThreshold(int activeJobsTreshold); + public void setActiveJobsThreshold(int activeJobsThreshold); /** * Gets job count threshold at which this node will http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java index 8b75220..67a47a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java @@ -59,6 +59,13 @@ public class NoopCollisionSpi extends IgniteSpiAdapter implements CollisionSpi { } /** {@inheritDoc} */ + @Override public NoopCollisionSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(NoopCollisionSpi.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java index 9a6eb0e..47e81dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java @@ -37,6 +37,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiConsistencyChecked; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.collision.CollisionContext; import org.apache.ignite.spi.collision.CollisionExternalListener; @@ -174,8 +175,7 @@ import org.apache.ignite.spi.collision.CollisionSpi; */ @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = true) -public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi, - PriorityQueueCollisionSpiMBean { +public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi { /** * Default number of parallel jobs allowed (set to number of cores times 2). */ @@ -245,49 +245,89 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli @LoggerResource private IgniteLogger log; - /** {@inheritDoc} */ - @Override public int getParallelJobsNumber() { + /** + * Gets number of jobs that can be executed in parallel. + * + * @return Number of jobs that can be executed in parallel. + */ + public int getParallelJobsNumber() { return parallelJobsNum; } - /** {@inheritDoc} */ + /** + * Sets number of jobs that can be executed in parallel. + * + * @param parallelJobsNum Parallel jobs number. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setParallelJobsNumber(int parallelJobsNum) { + public PriorityQueueCollisionSpi setParallelJobsNumber(int parallelJobsNum) { A.ensure(parallelJobsNum > 0, "parallelJobsNum > 0"); this.parallelJobsNum = parallelJobsNum; + + return this; } - /** {@inheritDoc} */ - @Override public int getWaitingJobsNumber() { + /** + * Maximum number of jobs that are allowed to wait in waiting queue. If number + * of waiting jobs ever exceeds this number, excessive jobs will be rejected. + * + * @return Maximum allowed number of waiting jobs. + */ + public int getWaitingJobsNumber() { return waitJobsNum; } - /** {@inheritDoc} */ + /** + * Maximum number of jobs that are allowed to wait in waiting queue. If number + * of waiting jobs ever exceeds this number, excessive jobs will be rejected. + * + * @param waitJobsNum Maximium jobs number. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setWaitingJobsNumber(int waitJobsNum) { + public PriorityQueueCollisionSpi setWaitingJobsNumber(int waitJobsNum) { A.ensure(waitJobsNum >= 0, "waitJobsNum >= 0"); this.waitJobsNum = waitJobsNum; + + return this; } - /** {@inheritDoc} */ - @Override public int getCurrentWaitJobsNumber() { + /** + * Gets current number of jobs that wait for the execution. + * + * @return Number of jobs that wait for execution. + */ + public int getCurrentWaitJobsNumber() { return waitingCnt; } - /** {@inheritDoc} */ - @Override public int getCurrentActiveJobsNumber() { + /** + * Gets current number of jobs that are active, i.e. {@code 'running + held'} jobs. + * + * @return Number of active jobs. + */ + public int getCurrentActiveJobsNumber() { return runningCnt + heldCnt; } - /** {@inheritDoc} */ - @Override public int getCurrentRunningJobsNumber() { + /* + * Gets number of currently running (not {@code 'held}) jobs. + * + * @return Number of currently running (not {@code 'held}) jobs. + */ + public int getCurrentRunningJobsNumber() { return runningCnt; } - /** {@inheritDoc} */ - @Override public int getCurrentHeldJobsNumber() { + /** + * Gets number of currently {@code 'held'} jobs. + * + * @return Number of currently {@code 'held'} jobs. + */ + public int getCurrentHeldJobsNumber() { return heldCnt; } @@ -298,10 +338,13 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli * If not provided, default value is {@code {@link #DFLT_PRIORITY_ATTRIBUTE_KEY}}. * * @param taskPriAttrKey Priority session attribute key. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setPriorityAttributeKey(String taskPriAttrKey) { + public PriorityQueueCollisionSpi setPriorityAttributeKey(String taskPriAttrKey) { this.taskPriAttrKey = taskPriAttrKey; + + return this; } /** @@ -311,53 +354,102 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli * If not provided, default value is {@code {@link #DFLT_JOB_PRIORITY_ATTRIBUTE_KEY}}. * * @param jobPriAttrKey Job priority attribute key. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setJobPriorityAttributeKey(String jobPriAttrKey) { + public PriorityQueueCollisionSpi setJobPriorityAttributeKey(String jobPriAttrKey) { this.jobPriAttrKey = jobPriAttrKey; + + return this; } - /** {@inheritDoc} */ - @Override public String getPriorityAttributeKey() { + /** + * Gets key name of task priority attribute. + * + * @return Key name of task priority attribute. + */ + public String getPriorityAttributeKey() { return taskPriAttrKey; } - /** {@inheritDoc} */ - @Override public String getJobPriorityAttributeKey() { + /** + * Gets key name of job priority attribute. + * + * @return Key name of job priority attribute. + */ + public String getJobPriorityAttributeKey() { return jobPriAttrKey; } - /** {@inheritDoc} */ - @Override public int getDefaultPriority() { + /** + * Gets default priority to use if a job does not have priority attribute + * set. + * + * @return Default priority to use if a task does not have priority + * attribute set. + */ + public int getDefaultPriority() { return dfltPri; } - /** {@inheritDoc} */ + /** + * Sets default priority to use if a job does not have priority attribute set. + * + * @param priority default priority. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setDefaultPriority(int dfltPri) { - this.dfltPri = dfltPri; + public PriorityQueueCollisionSpi setDefaultPriority(int priority) { + this.dfltPri = priority; + + return this; } - /** {@inheritDoc} */ - @Override public int getStarvationIncrement() { + /** + * Gets value to increment job priority by every time a lower priority job gets + * behind a higher priority job. + * + * @return Value to increment job priority by every time a lower priority job gets + * behind a higher priority job. + */ + public int getStarvationIncrement() { return starvationInc; } - /** {@inheritDoc} */ + /** + * Sets value to increment job priority by every time a lower priority job gets + * behind a higher priority job. + * + * @param starvationInc Increment value. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setStarvationIncrement(int starvationInc) { + public PriorityQueueCollisionSpi setStarvationIncrement(int starvationInc) { this.starvationInc = starvationInc; + + return this; } - /** {@inheritDoc} */ - @Override public boolean isStarvationPreventionEnabled() { + /** + * Gets flag indicating whether job starvation prevention is enabled. + * + * @return Flag indicating whether job starvation prevention is enabled. + */ + public boolean isStarvationPreventionEnabled() { return preventStarvation; } - /** {@inheritDoc} */ + /** + * Sets flag indicating whether job starvation prevention is enabled. + * + * @param preventStarvation Flag indicating whether job starvation prevention is enabled. + * @return {@code this} for chaining. + */ @IgniteSpiConfiguration(optional = true) - @Override public void setStarvationPreventionEnabled(boolean preventStarvation) { + public PriorityQueueCollisionSpi setStarvationPreventionEnabled(boolean preventStarvation) { this.preventStarvation = preventStarvation; + + return this; } /** {@inheritDoc} */ @@ -386,7 +478,8 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli log.debug(configInfo("preventStarvation", preventStarvation)); } - registerMBean(igniteInstanceName, this, PriorityQueueCollisionSpiMBean.class); + registerMBean(igniteInstanceName, new PriorityQueueCollisionSpiMBeanImpl(this), + PriorityQueueCollisionSpiMBean.class); // Ack start. if (log.isDebugEnabled()) @@ -563,11 +656,6 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli return Collections.singletonList(createSpiAttributeName(PRIORITY_ATTRIBUTE_KEY)); } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(PriorityQueueCollisionSpi.class, this); - } - /** * Returns (possibly shared) comparator fo sorting GridCollisionJobContextWrapper * by priority. @@ -581,6 +669,18 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli return priComp; } + /** {@inheritDoc} */ + @Override public PriorityQueueCollisionSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PriorityQueueCollisionSpi.class, this); + } + /** * Comparator for by priority comparison of collision contexts. */ @@ -630,4 +730,96 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli return originalIdx; } } + + /** + * MBean implementation for PriorityQueueCollisionSpi. + */ + private class PriorityQueueCollisionSpiMBeanImpl extends IgniteSpiMBeanAdapter + implements PriorityQueueCollisionSpiMBean { + /** {@inheritDoc} */ + PriorityQueueCollisionSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public int getParallelJobsNumber() { + return PriorityQueueCollisionSpi.this.getParallelJobsNumber(); + } + + /** {@inheritDoc} */ + @IgniteSpiConfiguration(optional = true) + @Override public void setParallelJobsNumber(int parallelJobsNum) { + PriorityQueueCollisionSpi.this.setParallelJobsNumber(parallelJobsNum); + } + + /** {@inheritDoc} */ + @Override public int getWaitingJobsNumber() { + return PriorityQueueCollisionSpi.this.getWaitingJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public void setWaitingJobsNumber(int waitJobsNum) { + PriorityQueueCollisionSpi.this.setWaitingJobsNumber(waitJobsNum); + } + + /** {@inheritDoc} */ + @Override public String getPriorityAttributeKey() { + return PriorityQueueCollisionSpi.this.getPriorityAttributeKey(); + } + + /** {@inheritDoc} */ + @Override public String getJobPriorityAttributeKey() { + return PriorityQueueCollisionSpi.this.getJobPriorityAttributeKey(); + } + + /** {@inheritDoc} */ + @Override public int getDefaultPriority() { + return PriorityQueueCollisionSpi.this.getDefaultPriority(); + } + + /** {@inheritDoc} */ + @Override public void setDefaultPriority(int dfltPri) { + PriorityQueueCollisionSpi.this.setDefaultPriority(dfltPri); + } + + /** {@inheritDoc} */ + @Override public int getStarvationIncrement() { + return PriorityQueueCollisionSpi.this.getStarvationIncrement(); + } + + /** {@inheritDoc} */ + @Override public void setStarvationIncrement(int starvationInc) { + PriorityQueueCollisionSpi.this.setStarvationIncrement(starvationInc); + } + + /** {@inheritDoc} */ + @Override public boolean isStarvationPreventionEnabled() { + return PriorityQueueCollisionSpi.this.isStarvationPreventionEnabled(); + } + + /** {@inheritDoc} */ + @Override public void setStarvationPreventionEnabled(boolean preventStarvation) { + PriorityQueueCollisionSpi.this.setStarvationPreventionEnabled(preventStarvation); + } + + /** {@inheritDoc} */ + @Override public int getCurrentWaitJobsNumber() { + return PriorityQueueCollisionSpi.this.getCurrentWaitJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentActiveJobsNumber() { + return PriorityQueueCollisionSpi.this.getCurrentActiveJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentRunningJobsNumber() { + return PriorityQueueCollisionSpi.this.getCurrentRunningJobsNumber(); + } + + /** {@inheritDoc} */ + @Override public int getCurrentHeldJobsNumber() { + return PriorityQueueCollisionSpi.this.getCurrentHeldJobsNumber(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java index b7f8ba1..e6a8412 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java @@ -41,7 +41,7 @@ public interface PriorityQueueCollisionSpiMBean extends IgniteSpiManagementMBean @MXBeanDescription("Number of active jobs.") public int getCurrentActiveJobsNumber(); - /* + /** * Gets number of currently running (not {@code 'held}) jobs. * * @return Number of currently running (not {@code 'held}) jobs.