ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shrutika modi <shrutika.m...@innoeye.com>
Subject unable to create indexing on dataframe
Date Tue, 04 Sep 2018 10:25:03 GMT
package com.test;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spark.IgniteContext;
import org.apache.ignite.spark.IgniteDataFrameSettings;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.ignite.IgniteSparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.inn.sparkrunner.context.JobContext;
import com.inn.sparkrunner.context.JobContextImpl;
import com.inn.sparkrunner.createsparkcontext.CreateSparkContext;
import com.inn.sparkrunner.util.ConfigUtil;

public class JoinUsingIgnite {

	public static void main(String[] args) throws SecurityException, Exception
{
		Logger logger = LoggerFactory.getLogger(JoinUsingIgnite.class);

		ConfigUtil.setConfig();
		System.setProperty("IGNITE_HOME",
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin");
		
		JobContext jobContext = null;
		SparkSession sparkSession = null;
		sparkSession = CreateSparkContext.create(args);
		jobContext = new JobContextImpl(sparkSession);
		TcpDiscoverySpi spi = new TcpDiscoverySpi();
		TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
		ipFinder.setAddresses(java.util.Arrays.asList("sf3.start.com",
"sf2.start.com"));
		spi.setIpFinder(ipFinder);

		IgniteConfiguration cfg = new IgniteConfiguration();
		// cfg.setIgniteInstanceName("grid");
		cfg.setDiscoverySpi(spi);
		
		Ignite ignite = Ignition.start(cfg);
		IgniteContext igniteContext = new
IgniteContext(sparkSession.sparkContext(),
			
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml");

		IgniteSparkSession igniteSparkSession = new
IgniteSparkSession(igniteContext, sparkSession);

		Properties connectionproperties = new Properties();

		connectionproperties.put("user", "FORESIGHT_PRODUCT");
		connectionproperties.put("password", "FORESIGHT_PRODUCT@#^249");
		connectionproperties.setProperty("Driver", "com.mysql.jdbc.Driver");

		logger.info("spark start------");

		Dataset<Row> jdbc = igniteSparkSession.read().option("inferschema",
"false").jdbc(
				"jdbc:mysql://192.168.4.249:3306/FORESIGHT", "(SELECT * FROM
NetworkElement) emp",
				connectionproperties);
		

		Dataset<Row> jdbc1 = igniteSparkSession.read().option("inferschema",
"false").jdbc(
				"jdbc:mysql://192.168.4.249:3306/FORESIGHT", "(SELECT * FROM
MacroSiteDetail) emp1",
				connectionproperties);

		
		CacheConfiguration setSqlSchema = new CacheConfiguration("cachecfg")
				.setSqlSchema("PUBLIC");

		IgniteCache cache = ignite.getOrCreateCache(setSqlSchema);
	
	
		CacheConfiguration setSqlSchema2 = new CacheConfiguration("table2")
				.setSqlSchema("PUBLIC");
		IgniteCache cache2 = ignite.getOrCreateCache(setSqlSchema2);
			
		creatingTable(jdbc, cache, "ignitetab1", "networkelementid_pk");
		SqlFieldsQuery creatingInsertingCommand = creatingInsertingCommand(jdbc,
cache, "ignitetab1");
		InsertingData(jdbc, cache, creatingInsertingCommand);
		
		creatingTable(jdbc1, cache2, "ignitetab2", "macrositedetailid_pk");
		SqlFieldsQuery creatingInsertingCommand1 = creatingInsertingCommand(jdbc1,
cache2, "ignitetab2");
		InsertingData(jdbc1, cache2, creatingInsertingCommand1);

		Dataset<Row> df1 =
igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE())
				.option(IgniteDataFrameSettings.OPTION_TABLE(), "ignitetab1")
				.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(),
					
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml")

				.load().repartition(1);
		df1.createOrReplaceTempView("df1");
		Dataset<Row> df2 =
igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE())
				.option(IgniteDataFrameSettings.OPTION_TABLE(), "ignitetab2")
				.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(),
					
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml")

				.load().repartition(1);
		
		
		
		df2.createOrReplaceTempView("df2");
		
		logger.info("join query");
		Dataset<Row> ignite1 = igniteSparkSession.sql(
				"select * from df1 join df2 on
df1.networkelementid_pk=df2.macrositedetailid_pk");

		
logger.info("join query end");
		logger.info("ignite dataframe count------[{}]",ignite1.count());

		igniteSparkSession.close();
		Ignition.stop(true);
	}

	
	private static void creatingTable(Dataset<Row> employee, IgniteCache cache,
String tempTable, String index) {

		String query = gettingCreateQuery(employee);

		String str = "CREATE TABLE " + tempTable + " (" + query + ") WITH
\"template=partitioned\"";

		System.out.println("create query--->" + str);

		cache.query(new SqlFieldsQuery(str).setSchema("PUBLIC")).getAll();

		cache.query(new SqlFieldsQuery("CREATE INDEX on " + tempTable + " (" +
index + ")")).getAll();

	}

	private static String gettingCreateQuery(Dataset<Row> employee) {

		String str = "";
		StructField[] fields = employee.schema().fields();
		for (int i = 0; i < fields.length; i++) {
			String datatype =
fields[i].dataType().typeName().equalsIgnoreCase("string") ? "VARCHAR"
					: fields[i].dataType().typeName();
			if (i == 0) {
				str = str + fields[i].name() + " " + datatype + " PRIMARY KEY, ";
			} else if (i == fields.length - 1) {
				str = str + fields[i].name() + " " + datatype;
			} else {
				str = str + fields[i].name() + " " + datatype + " ,";
			}
		}

		return str;
	}

	private static void InsertingData(Dataset<Row> employee, IgniteCache cache,
SqlFieldsQuery employeeinsert) {

		List<Row> collectAsList = employee.collectAsList();

		for (int i = 0; i < collectAsList.size(); i++) {
			Row row = collectAsList.get(i);
			Object[] objectarray = new Object[row.size()];

			for (int j = 0; j < objectarray.length; j++) {
				objectarray[j] = row.get(j);
			}

		
cache.query(employeeinsert.setArgs(objectarray).setSchema("PUBLIC")).getAll();

		}

	}

	private static SqlFieldsQuery creatingInsertingCommand(Dataset<Row>
employee, IgniteCache cache, String tempTable) {

		String query = gettingInsertQuery(employee);

		String fields = Arrays.toString(employee.columns()).replaceAll("\\[",
"").replaceAll("\\]", "");

		String str = "INSERT INTO " + tempTable + " (" + fields + ")  VALUES (" +
query + ")";

		SqlFieldsQuery city = new SqlFieldsQuery(str);
		System.out.println("str-------------->" + str);
		return city;
	}

	private static String gettingInsertQuery(Dataset<Row> employee) {

		String str = "";

		for (int i = 0; i < employee.columns().length; i++) {
			if (i != employee.columns().length - 1) {
				str = str + "?, ";
			} else {
				str = str + "?";
			}

		}

		return str;
	}
}




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Mime
View raw message