ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From diopek <deha.pe...@gmail.com>
Subject Re: Missing records Ignite cache size grows
Date Fri, 24 Feb 2017 18:31:45 GMT
package myignite.loading.test.cache.store;

import static myignite.loading.test.common.CommonUtils.CONFIG_DIR;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_PWD;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_URL;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_USR;
import static myignite.loading.test.common.CommonUtils.RWA_SQL_FETCH_SIZE;
import static
myignite.loading.test.common.CommonUtils.SQL_FETCH_SIZE_DEFAULT;
import static
myignite.loading.test.common.CommonUtils.TREAS_LIQUIDITY_CLASS_UNDEFINED;
import static myignite.loading.test.common.CommonUtils.REPLENISH;
import static myignite.loading.test.common.CommonUtils.EXISTING;
import static myignite.loading.test.common.CommonUtils.stopWatchEnd;
import static myignite.loading.test.common.CommonUtils.stopWatchStart;
import org.jooq.lambda.tuple.Tuple2;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

import javax.cache.integration.CacheLoaderException;

import org.apache.ignite.cache.store.CacheLoadOnlyStoreAdapter;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.datasource.SingleConnectionDataSource;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;

import myignite.loading.test.domain.MyDTO;

public class ExistingOrReplenishCacheLoadOnlyStore3
		extends CacheLoadOnlyStoreAdapter<Long, ArrayList&lt;MyDTO>,
Tuple2<Long,ArrayList&lt;MyDTO>>> {

	private final static Logger logger =
LoggerFactory.getLogger(ExistingOrReplenishCacheLoadOnlyStore3.class);

	private static int SQL_FETCH_SIZE = SQL_FETCH_SIZE_DEFAULT;
	private static String dataSourceUrl;
	private static String dbUser;
	private static String dbPwd;

	private SingleConnectionDataSource DATA_SRC;

	static {
		String configDir = System.getProperty(CONFIG_DIR);
		Assert.notNull(configDir, "config.dir should be passed as JVM
arguments...");
		StringBuffer filePath = new StringBuffer(configDir);
		filePath.append(File.separatorChar).append("rwa-batch.properties");
		Properties props = new Properties();
		// InputStream inputStream =
		//
ExistingCacheStore.class.getClassLoader().getResourceAsStream(configDir);
		try {
			InputStream inputStream = new FileInputStream(filePath.toString());
			Assert.notNull(inputStream,
					"FileNotFoundException - property file '" + filePath + "' not found in
file system");
			props.load(inputStream);
		} catch (IOException e) {
			e.printStackTrace();
		}
		dataSourceUrl = props.getProperty(DATA_SRC_URL);
		System.out.println(">>>dataSourceUrl::" + dataSourceUrl);
		Assert.notNull(dataSourceUrl, "'rwa.jdbc.url' should be provided in
rwa-batch.properties...");
		dbUser = props.getProperty(DATA_SRC_USR);
		Assert.notNull(dbUser, "'rwa.jdbc.usr' should be provided in
rwa-batch.properties...");
		dbPwd = System.getProperty(DATA_SRC_PWD);
		Assert.notNull(dbPwd, "'rwa.jdbc.pwd' should be provided in
rwa-batch.properties...");
		
		String fetchSize = props.getProperty(RWA_SQL_FETCH_SIZE);
		if (fetchSize != null) {
			SQL_FETCH_SIZE = Integer.valueOf(fetchSize);
		}
	}

	private JdbcTemplate jdbcTemplate;
	
	public ExistingOrReplenishCacheLoadOnlyStore3() {
		super();
		DATA_SRC = new SingleConnectionDataSource();
		DATA_SRC.setDriverClassName("oracle.jdbc.driver.OracleDriver");
		DATA_SRC.setUrl(dataSourceUrl);
		DATA_SRC.setUsername(dbUser);
		DATA_SRC.setPassword(dbPwd);
		jdbcTemplate = new JdbcTemplate(DATA_SRC);
	}

	@Override
	protected Iterator<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>
inputIterator(Object... args) throws CacheLoaderException {
		if (args == null || args.length < 6)
			throw new CacheLoaderException(
					"Expected asOf, scenId, HierarchyServiceProxy and replenish parameters
are not fully provided...");
		try {
			final Date asOf = (Date) args[0];
			final String datasetId = (String) args[1];
			final Integer scenId = (Integer) args[2];
			final String sql = (String) args[3];
			final Boolean replenishFlag = (Boolean) args[4];
			Integer startSize =(Integer) args[5];
		    logger.debug("AS_OF::{} DATASET_ID::{} SCEN_ID::{} REP_FLAG::{}
START_SIZE::{}", asOf, datasetId, scenId,
				replenishFlag, startSize);

			logger.debug("load{}Cache::SQL::{}", (replenishFlag ? "Replenish" :
"Existing"), sql);
			ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>> extOrRepList = null;
//			Iterator<Entry&lt;Integer, ArrayList&lt;MyDTO>>> iterator = null;
//			Iterator<ArrayList&lt;MyDTO>> iterator = null;
			Iterator<Tuple2&lt;Long,ArrayList&lt;MyDTO>>> iterator = null;

//			ResultSetExtractor<LinkedHashMap&lt;Integer, ArrayList&lt;MyDTO>>>
extOrRepMapResultSetExtractor = new
ResultSetExtractor<LinkedHashMap&lt;Integer, ArrayList&lt;MyDTO>>>() {
			ResultSetExtractor<ArrayList&lt;Tuple2&lt;Long,ArrayList&lt;MyDTO>>>>
extOrRepMapResultSetExtractor = new
ResultSetExtractor<ArrayList&lt;Tuple2&lt;Long,ArrayList&lt;MyDTO>>>>()
{
				@Override
//				public LinkedHashMap<Integer, ArrayList&lt;MyDTO>>
extractData(ResultSet rs)
				public ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>
extractData(ResultSet rs)
						throws SQLException, DataAccessException {

//					LinkedHashMap<Integer, ArrayList&lt;MyDTO>> extOrRepMap = new
LinkedHashMap<Integer, ArrayList&lt;MyDTO>>(
//							gockeyCnt, 1.0f);
//					ArrayList<ArrayList&lt;MyDTO>> extOrRepList = new
ArrayList<ArrayList&lt;MyDTO>>(
//							gockeyCnt);
					ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>> extOrRepList = new
ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>(startSize);
					String prevGoc = null, prevAcct = null, prevSac = null, prevCcy = null;
					Integer prevFpId = null;
					ArrayList<MyDTO> currDTOList = null, prevDTOList = null;
					MyDTO dto = null, prevDto = null;
//					final AtomicInteger entryCnt = new AtomicInteger(0);
					while (rs.next()) {
						int i = 1;
						dto = new MyDTO();
						dto.setAsOf(asOf);
						dto.setDatasetId(Integer.valueOf(datasetId));
						dto.setScnId(scenId);

						dto.setGoc(rs.getString(i++));
						dto.setAcct(rs.getString(i++));
						dto.setSumAffilCode(rs.getString(i++));
						dto.setCcyCode(rs.getString(i++));
						dto.setFrcstProdId(rs.getInt(i++));
						dto.setMngSeg(rs.getString(i++));
						dto.setMngGeo(rs.getString(i++));
						dto.setFrsBu(rs.getString(i++));
						if (replenishFlag) {
							dto.setReplenishFlag(REPLENISH);
						} else {
							dto.setRwaExposureType(rs.getString(i++));
							dto.setRiskAssetClass(rs.getString(i++));
							dto.setRiskSubAssetClass(rs.getString(i++));
							String treasLiqClass = rs.getString(i++);
							dto.setTreasLiqClass((treasLiqClass == null ?
TREAS_LIQUIDITY_CLASS_UNDEFINED :treasLiqClass));
							dto.setCounterpartyRating(rs.getString(i++));
							dto.setClearedStatus(rs.getString(i++));
							dto.setMaturityBand(rs.getString(i++));
							dto.setDerivativeType(rs.getString(i++));
							dto.setReplenishFlag(EXISTING);
						}
						dto.setStartDate(rs.getDate(i++));
						dto.setMaturityDate(rs.getDate(i++));
						dto.setAmount(rs.getDouble(i++));
						if(!replenishFlag) {
							dto.setEtlsource(rs.getString(i++));
						}else {
							dto.setInvestmentId(rs.getString(i++));
						}
						if (dto.getGoc().equals(prevGoc) && dto.getAcct().equals(prevAcct)
								&& dto.getSumAffilCode().equals(prevSac) &&
dto.getCcyCode().equals(prevCcy)
								&& dto.getFrcstProdId().equals(prevFpId)) {
							prevDTOList.add(prevDto);
						} else {
							if (prevDto != null) {
								prevDTOList.add(prevDto);
//								extOrRepMap.put(entryCnt.incrementAndGet(), prevDTOList);
								extOrRepList.add(new Tuple2<Long,
ArrayList&lt;MyDTO>>(entryCnt.incrementAndGet(),prevDTOList));
							}
							currDTOList = new ArrayList<MyDTO>();
						}
						prevDto = dto;
						prevDTOList = currDTOList;
						prevGoc = dto.getGoc();
						prevAcct = dto.getAcct();
						prevSac = dto.getSumAffilCode();
						prevCcy = dto.getCcyCode();
						prevFpId = dto.getFrcstProdId();
					}
					if (prevDto != null) {
						prevDTOList.add(prevDto);
//						extOrRepMap.put(entryCnt.incrementAndGet(), prevDTOList);
						extOrRepList.add(new Tuple2<Long,
ArrayList&lt;MyDTO>>(entryCnt.incrementAndGet(),prevDTOList));
					}
//					return extOrRepMap;
					return extOrRepList;
				}

			};

			jdbcTemplate.setFetchSize(SQL_FETCH_SIZE);
			StopWatch sw = new StopWatch();
			stopWatchStart(sw, "populatingDataMap");
			logger.debug("BEFORE populatingDataMap_STARTS!!!!");
//			extOrRepMap = jdbcTemplate.query(sql, extOrRepMapResultSetExtractor);			
			extOrRepList = jdbcTemplate.query(sql, extOrRepMapResultSetExtractor);
			logger.debug("BEFORE populatingDataMap_ENDS!!!!");
			stopWatchEnd(sw);

			if (extOrRepList != null) {
//				iterator = extOrRepMap.entrySet().iterator();
				/*
				 * RECORDS COUNT PRINTED CORRECTLY HERE BEFORE PASSING TO IGNITE
				 */
				logger.debug("+++++++ GOC_KEY COUNT::{}",extOrRepList.size());
				iterator = extOrRepList.iterator();
			}
			return iterator;
		} finally {
			DATA_SRC.destroy();
		}
	}

	@Override
	protected IgniteBiTuple<Long, ArrayList&lt;MyDTO>>
parse(Tuple2<Long,ArrayList&lt;MyDTO>> rec, Object... args) {
		return new T2<>(rec.v1(), rec.v2());
	}

}




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Missing-records-Ignite-cache-size-grows-tp10809p10871.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Mime
View raw message