ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kvipin <vipinkuma...@yahoo.com>
Subject Re: Couchbase as persistent store
Date Fri, 30 Sep 2016 11:43:54 GMT
Guys,

read-through is working fine for me but write-through is not working. I'm
not getting error/exception either. Following is my configuration file and
relevant code blocks,

*Sever node configuration file:*

$ cat test-tool-server.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">

    <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
        
        <property name="peerClassLoadingEnabled" value="true"/>

<property name="binaryConfiguration">
<bean class="org.apache.ignite.configuration.BinaryConfiguration">
<property name="compactFooter" value="false" />
<property name="idMapper">
<bean class="org.apache.ignite.binary.BinaryBasicIdMapper">
<constructor-arg name="isLowerCase" value="true" />
</bean>
</property>
<property name="nameMapper">
<bean class="org.apache.ignite.binary.BinaryBasicNameMapper">
<constructor-arg name="isSimpleName" value="true" />
</bean>
</property>
</bean>
</property>

        <property name="cacheConfiguration">
            <list>
                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                
                <property name="readThrough" value="true"></property>
                <property name="writeThrough" value="true"></property>
                <property name="writeBehindEnabled" value="true"></property>
                <property name="storeKeepBinary" value="true"></property>
                
                    <property name="cacheStoreFactory">
                      <bean
class="javax.cache.configuration.FactoryBuilder.ClassFactory">
			<constructor-arg value="TestTableStore"/>
                      </bean>
                    </property>

                    <property name="name" value="TestTable"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="atomicityMode" value="TRANSACTIONAL"/>
                    <property name="writeSynchronizationMode"
value="FULL_SYNC"/>

                    
                    <property name="queryEntities">
                        <list>
                            <bean
class="org.apache.ignite.cache.QueryEntity">
                                <property name="keyType"
value="java.lang.Long"/>
                                <property name="valueType"
value="TestTable"/>

                                <property name="fields">
                                    <map>
                                        <entry key="tid"
value="java.lang.Short"/>
                                        <entry key="idint"
value="java.lang.Int"/>
                                        <entry key="idbigint"
value="java.lang.Long"/>
                                        <entry key="idchar"
value="java.lang.String"/>
                                        <entry key="idbinary"
value="java.lang.byte[]"/>
                                        <entry key="idvarbinary"
value="java.lang.byte[]"/>
                                        <entry key="idvarchar"
value="java.lang.String"/>
                                        <entry key="idts"
value="java.lang.Timestamp"/>
                                    </map>
                                </property>
                                <property name="indexes">
				    <list>
					<bean class="org.apache.ignite.cache.QueryIndex">
					    <constructor-arg value="tid"/>
					    <constructor-arg value="SORTED"/>
					</bean>
					
					<bean class="org.apache.ignite.cache.QueryIndex">
					    <constructor-arg>
						<list>
						    <value>tid</value>
						    <value>idint</value>
						</list>
					    </constructor-arg>
					    <constructor-arg value="SORTED"/>
					</bean>
				    </list>
                                </property>
                            </bean>
                        </list>
                    </property>
                </bean>

                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="readThrough" value="true"></property>
                <property name="writeThrough" value="true"></property>
                <property name="writeBehindEnabled" value="true"></property>
                <property name="storeKeepBinary" value="true"></property>
                    <property name="cacheStoreFactory">
                      <bean
class="javax.cache.configuration.FactoryBuilder.ClassFactory">
			<constructor-arg value="TestTableStore"/>
                      </bean>
                    </property>

                    <property name="name" value="TestTable1"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="atomicityMode" value="TRANSACTIONAL"/>
                    <property name="writeSynchronizationMode"
value="FULL_SYNC"/>

                    
                    <property name="queryEntities">
                        <list>
                            <bean
class="org.apache.ignite.cache.QueryEntity">
                                <property name="keyType"
value="java.lang.Long"/>
                                <property name="valueType"
value="TestTable"/>

                                <property name="fields">
                                    <map>
                                        <entry key="tid"
value="java.lang.Short"/>
                                        <entry key="idint"
value="java.lang.Int"/>
                                        <entry key="idbigint"
value="java.lang.Long"/>
                                        <entry key="idchar"
value="java.lang.String"/>
                                        <entry key="idbinary"
value="java.lang.byte[]"/>
                                        <entry key="idvarbinary"
value="java.lang.byte[]"/>
                                        <entry key="idvarchar"
value="java.lang.String"/>
                                        <entry key="idts"
value="java.lang.Timestamp"/>
                                    </map>
                                </property>
                                <property name="indexes">
                                    <list>
                                        <bean
class="org.apache.ignite.cache.QueryIndex">
                                            <constructor-arg value="tid"/>
					    <constructor-arg value="SORTED"/>
                                        </bean>
                                        
                                        <bean
class="org.apache.ignite.cache.QueryIndex">
                                            <constructor-arg>
                                                <list>
                                                    <value>tid</value>
                                                    <value>idint</value>
                                                </list>
                                            </constructor-arg>
                                            <constructor-arg
value="SORTED"/>
                                        </bean>
                                    </list>
                                </property>
                            </bean>
                        </list>
                    </property>
                </bean>
            </list>
        </property>

        
        <property name="discoverySpi">
            <bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    
                    
                    
                    <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                
                                <value>127.0.0.1:47550..47551</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

*C++ test program which talks to Apache Ignite*

$ cat main.cc
...
void* Insert_i(void* arg, const std::string& cname) {
	std::clog << __func__ << " called." << std::endl;
	Cache<int64_t, TestTable> ttcache = Ignition::Get().GetCache<
				int64_t, TestTable>(cname.c_str());
	// Clear cache before running the example.
	//ttcache.Clear();

	GenTestTableRec recgen;
	std::map<int64_t, TestTable> ttrecords;
	//TestTable rec = recgen();
	std::string tmpstr("Getting Started contains a list of tasks you might want
to perform when you set up your computer. Tasks in Getting Started
include:Transferring files from another computer. Adding new users to your
computer. Adding new users to your computer");
	std::vector<int>* targ = static_cast<std::vector&lt;int>* >(arg);
	int16_t tid = targ->operator[](1);
	TestTable rec(tid, tid, idbigint, std::string("Name_") +
std::to_string(tid), tmpstr + "_" + std::to_string(tid), time(0));
	sprintf((char*)rec.idbinary, "binary data for testing %d", tid);
	sprintf((char*)rec.idvarbinary, "%s_%d", tmpstr.c_str(), tid);
	rec.idbinlen = strlen((char*)rec.idbinary);
	rec.idvarbinlen = strlen((char*)rec.idvarbinary);
	int nentries = targ->operator[](0);
	for(int i = 0; i < nentries; i++) {
		rec.idts = ignite::Timestamp(time(0)*1000);
		rec.idint = idint++;
		rec.idbigint = idbigint++;
		ttrecords[key++] = rec;
	}
	std::clog << "Inserting " << ttrecords.size() << " records to " <<
				cname << ":" << std::endl;
	//for(auto itr = ttrecords.begin(); itr != ttrecords.end(); ++itr)
	//	std::clog << itr->first << ": " << itr->second.ToString()
	//			<< std::endl;
 	struct timeval t0;
	gettimeofday(&t0, 0);
	ttcache.PutAll(ttrecords);
 	struct timeval t1;
	gettimeofday(&t1, 0);
	ofs << time(0) << "," << ttrecords.size() << "," << t1.tv_sec
* 1000000 +
t1.tv_usec
				- t0.tv_sec * 1000000 - t0.tv_usec << ",+,";
	std::clog << "last inserted record key: " << key - 1 << "." << std::endl;

	return (void*)0;
}


*CacheStoreAdapter implementation*

$ cat TestTableStore.java 
...
public class TestTableStore extends CacheStoreAdapter<Long, TestTable>
implements Serializable {

	/** Auto-injected store session. */
	@CacheStoreSessionResource
	private CacheStoreSession ses;
	private Cluster cluster;

	public TestTableStore create() {
		return new TestTableStore();
	}
	// Complete transaction or simply close connection if there is no
transaction.
	@Override public void sessionEnd(boolean commit) {
		Bucket conn = ses.attachment();
		if (conn != null && ses.isWithinTransaction()) {
			//if (commit)
			//	conn.commit();
			//else
			//	conn.rollback();
		}
	}

	// This mehtod is called whenever "get(...)" methods are called on
IgniteCache.
	@Override public TestTable load(Long key) {
		Bucket conn = connection();
		JsonDocument jd = conn.get(key.toString());
		if(jd != null) {
			JsonObject jo = jd.content();
			TestTable tt = new TestTable(((Integer)jo.get("tid")).shortValue(),
(Integer)jo.get("idint"), (Long)jo.get("idbigint"),
(String)jo.get("idchar"), (String)jo.get("idvarchar"), new
Timestamp((Integer)jo.get("idts")));
			tt.setIdbinary(SerializationUtils.serialize((String)jo.get("idbinary")));
		
tt.setIdvarbinary(SerializationUtils.serialize((String)jo.get("idvarbinary")));
			return tt;
		}
		return null;
	}

	// This mehtod is called whenever "put(...)" methods are called on
IgniteCache.
	@Override public void write(Cache.Entry<? extends Long, ? extends
TestTable> entry) throws CacheWriterException {
		Bucket conn = connection();
		TestTable val = entry.getValue();
		conn.insert(JsonDocument.create(entry.getKey().toString(),
JsonObject.create().put("tid", val.getTid()).put("idint",
val.getIdint()).put("idbigint", val.getIdbigint()).put("idchar",
val.getIdchar()).put("idbinary", val.getIdbinary()).put("idvarbinary",
val.getIdvarbinary()).put("idvarchar", val.getIdvarchar()).put("idts",
val.getIdts())));
	}

	// This mehtod is called whenever "remove(...)" methods are called on
IgniteCache.
	@Override public void delete(Object key) {
		Bucket conn = connection();
		conn.remove((String)key);
	}

	// This mehtod is called whenever "loadCache()" and "localLoadCache()"
	// methods are called on IgniteCache. It is used for bulk-loading the
cache.
	// If you don't need to bulk-load the cache, skip this method.
	@Override public void loadCache(IgniteBiInClosure<Long, TestTable> clo,
Object... args) {
		if (args == null || args.length == 0 || args[0] == null)
			throw new CacheLoaderException("Expected entry count parameter is not
provided.");

		final int entryCnt = (Integer)args[0];

		Bucket conn = connection();
		/*try (PreparedStatement st = conn.prepareStatement(
			"select tid, idint, idbigint, idchar, idvarchar"
                        + ", idbinary, idvarbinary, idts from TestTable"
                        + " where tid = ?")) {
			try (ResultSet rs = st.executeQuery()) {
				int cnt = 0;

				while (cnt < entryCnt && rs.next()) {
					TestTable tt = new TestTable(rs.getShort(1), rs.getInt(2),
rs.getLong(3), rs.getString(4), rs.getString(5), rs.getTimestamp(8));
					tt.setIdbinary(rs.getObject(6));
					tt.setIdvarbinary(rs.getObject(7));

					//clo.apply(tt.getTid(), tt);
					clo.apply(tt.getIdbigint(), tt);

					cnt++;
				}
			}
		}*/
	}

	// Opens JDBC connection and attaches it to the ongoing
	// session if within a transaction.
	//private Bucket connection() throws SQLException {
	private Bucket connection() {
		if (ses.isWithinTransaction()) {
			Bucket conn = ses.attachment();

			if (conn == null) {
				conn = openConnection(false);

				// Store connection in the session, so it can be accessed
				// for other operations within the same transaction.
				ses.attach(conn);
			}

			return conn;
		}
		// Transaction can be null in case of simple load or put operation.
		else
			return openConnection(true);
	}

	// Opens JDBC connection.
	//private Bucket openConnection(boolean autocommit) throws SQLException {
	private Bucket openConnection(boolean autocommit) {
		// Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2,
Microsoft SQL, etc.)
		// In this example we use H2 Database for simplification.
		//Bucket conn =
DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
		if (this.cluster == null)
			this.cluster = CouchbaseCluster.create();
		Bucket conn = cluster.openBucket("TestTable");

		//conn.setAutoCommit(autocommit);

		return conn;
	}

	// This mehtod is called whenever "getAll(...)" methods are called on
IgniteCache.
	@Override public Map<Long, TestTable> loadAll(Iterable<? extends Long>
keys) throws CacheLoaderException {
		Bucket conn = connection();
		Map<Long, TestTable> loaded = new HashMap<Long, TestTable>();

		for (Long key : keys) {
			JsonDocument jd = conn.get(key.toString());
			if(jd != null) {
				JsonObject jo = jd.content();
				TestTable tt = new TestTable(((Integer)jo.get("tid")).shortValue(),
(Integer)jo.get("idint"), (Long)jo.get("idbigint"),
(String)jo.get("idchar"), (String)jo.get("idvarchar"), new
Timestamp((Integer)jo.get("idts")));
			
tt.setIdbinary(SerializationUtils.serialize((String)jo.get("idbinary")));
			
tt.setIdvarbinary(SerializationUtils.serialize((String)jo.get("idvarbinary")));
				loaded.put(key, tt);
			}
		}
		return loaded;
	}

	// This mehtod is called whenever "putAll(...)" methods are called on
IgniteCache.
	@Override public void writeAll(Collection<Cache.Entry&lt;? extends Long, ?
extends TestTable>> entries) throws CacheWriterException {
		Bucket conn = connection();
		// Syntax of MERGE statement is database specific and should be adopted
for your database.
		// If your database does not support MERGE statement then use sequentially
update, insert statements.
		for (Cache.Entry<? extends Long, ? extends TestTable> entry : entries) {
			TestTable val = entry.getValue();
			conn.upsert(JsonDocument.create(entry.getKey().toString(),
JsonObject.create().put("tid", val.getTid()).put("idint",
val.getIdint()).put("idbigint", val.getIdbigint()).put("idchar",
val.getIdchar()).put("idbinary", val.getIdbinary()).put("idvarbinary",
val.getIdvarbinary()).put("idvarchar", val.getIdvarchar()).put("idts",
val.getIdts())));
			System.out.println(conn.get(entry.getKey().toString()));
		}
	}
 
	// This mehtod is called whenever "removeAll(...)" methods are called on
IgniteCache.
	@Override public void deleteAll(Collection<?> keys) throws
CacheWriterException {
		Bucket conn = connection();
		for (Object key : keys)
			conn.remove((String)key);
	}

	@Override public String toString() {
		return "TestTableStore [ses=" + ses + ", cluster=" + cluster + ']';
	}
}


What can be the possible issue?

thanks & regards,



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Couchbase-as-persistent-store-tp7476p8033.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Mime
View raw message