From user-return-21626-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Tue Sep 4 12:25:13 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id AD5F8180629 for ; Tue, 4 Sep 2018 12:25:12 +0200 (CEST) Received: (qmail 91785 invoked by uid 500); 4 Sep 2018 10:25:11 -0000 Mailing-List: contact user-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@ignite.apache.org Delivered-To: mailing list user@ignite.apache.org Received: (qmail 91775 invoked by uid 99); 4 Sep 2018 10:25:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Sep 2018 10:25:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 369FC1A1219 for ; Tue, 4 Sep 2018 10:25:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.001 X-Spam-Level: X-Spam-Status: No, score=-0.001 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_PASS=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Wy-J6BGTkPa7 for ; Tue, 4 Sep 2018 10:25:09 +0000 (UTC) Received: from n6.nabble.com (n6.nabble.com [162.255.23.37]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 9864D5F175 for ; Tue, 4 Sep 2018 10:25:09 +0000 (UTC) Received: from n6.nabble.com (localhost [127.0.0.1]) by n6.nabble.com (Postfix) with ESMTP id 21F5794258A0 for ; Tue, 4 Sep 2018 03:25:03 -0700 (MST) Date: Tue, 4 Sep 2018 03:25:03 -0700 (MST) From: shrutika modi To: user@ignite.apache.org Message-ID: <1536056703105-0.post@n6.nabble.com> Subject: unable to create indexing on dataframe MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit package com.test; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.spark.IgniteContext; import org.apache.ignite.spark.IgniteDataFrameSettings; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.ignite.IgniteSparkSession; import org.apache.spark.sql.types.StructField; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.inn.sparkrunner.context.JobContext; import com.inn.sparkrunner.context.JobContextImpl; import com.inn.sparkrunner.createsparkcontext.CreateSparkContext; import com.inn.sparkrunner.util.ConfigUtil; public class JoinUsingIgnite { public static void main(String[] args) throws SecurityException, Exception { Logger logger = LoggerFactory.getLogger(JoinUsingIgnite.class); ConfigUtil.setConfig(); System.setProperty("IGNITE_HOME", "/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin"); JobContext jobContext = null; SparkSession sparkSession = null; sparkSession = CreateSparkContext.create(args); jobContext = new JobContextImpl(sparkSession); TcpDiscoverySpi spi = new TcpDiscoverySpi(); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); ipFinder.setAddresses(java.util.Arrays.asList("sf3.start.com", "sf2.start.com")); spi.setIpFinder(ipFinder); IgniteConfiguration cfg = new IgniteConfiguration(); // cfg.setIgniteInstanceName("grid"); cfg.setDiscoverySpi(spi); Ignite ignite = Ignition.start(cfg); IgniteContext igniteContext = new IgniteContext(sparkSession.sparkContext(), "/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml"); IgniteSparkSession igniteSparkSession = new IgniteSparkSession(igniteContext, sparkSession); Properties connectionproperties = new Properties(); connectionproperties.put("user", "FORESIGHT_PRODUCT"); connectionproperties.put("password", "FORESIGHT_PRODUCT@#^249"); connectionproperties.setProperty("Driver", "com.mysql.jdbc.Driver"); logger.info("spark start------"); Dataset jdbc = igniteSparkSession.read().option("inferschema", "false").jdbc( "jdbc:mysql://192.168.4.249:3306/FORESIGHT", "(SELECT * FROM NetworkElement) emp", connectionproperties); Dataset jdbc1 = igniteSparkSession.read().option("inferschema", "false").jdbc( "jdbc:mysql://192.168.4.249:3306/FORESIGHT", "(SELECT * FROM MacroSiteDetail) emp1", connectionproperties); CacheConfiguration setSqlSchema = new CacheConfiguration("cachecfg") .setSqlSchema("PUBLIC"); IgniteCache cache = ignite.getOrCreateCache(setSqlSchema); CacheConfiguration setSqlSchema2 = new CacheConfiguration("table2") .setSqlSchema("PUBLIC"); IgniteCache cache2 = ignite.getOrCreateCache(setSqlSchema2); creatingTable(jdbc, cache, "ignitetab1", "networkelementid_pk"); SqlFieldsQuery creatingInsertingCommand = creatingInsertingCommand(jdbc, cache, "ignitetab1"); InsertingData(jdbc, cache, creatingInsertingCommand); creatingTable(jdbc1, cache2, "ignitetab2", "macrositedetailid_pk"); SqlFieldsQuery creatingInsertingCommand1 = creatingInsertingCommand(jdbc1, cache2, "ignitetab2"); InsertingData(jdbc1, cache2, creatingInsertingCommand1); Dataset df1 = igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()) .option(IgniteDataFrameSettings.OPTION_TABLE(), "ignitetab1") .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), "/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml") .load().repartition(1); df1.createOrReplaceTempView("df1"); Dataset df2 = igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()) .option(IgniteDataFrameSettings.OPTION_TABLE(), "ignitetab2") .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), "/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml") .load().repartition(1); df2.createOrReplaceTempView("df2"); logger.info("join query"); Dataset ignite1 = igniteSparkSession.sql( "select * from df1 join df2 on df1.networkelementid_pk=df2.macrositedetailid_pk"); logger.info("join query end"); logger.info("ignite dataframe count------[{}]",ignite1.count()); igniteSparkSession.close(); Ignition.stop(true); } private static void creatingTable(Dataset employee, IgniteCache cache, String tempTable, String index) { String query = gettingCreateQuery(employee); String str = "CREATE TABLE " + tempTable + " (" + query + ") WITH \"template=partitioned\""; System.out.println("create query--->" + str); cache.query(new SqlFieldsQuery(str).setSchema("PUBLIC")).getAll(); cache.query(new SqlFieldsQuery("CREATE INDEX on " + tempTable + " (" + index + ")")).getAll(); } private static String gettingCreateQuery(Dataset employee) { String str = ""; StructField[] fields = employee.schema().fields(); for (int i = 0; i < fields.length; i++) { String datatype = fields[i].dataType().typeName().equalsIgnoreCase("string") ? "VARCHAR" : fields[i].dataType().typeName(); if (i == 0) { str = str + fields[i].name() + " " + datatype + " PRIMARY KEY, "; } else if (i == fields.length - 1) { str = str + fields[i].name() + " " + datatype; } else { str = str + fields[i].name() + " " + datatype + " ,"; } } return str; } private static void InsertingData(Dataset employee, IgniteCache cache, SqlFieldsQuery employeeinsert) { List collectAsList = employee.collectAsList(); for (int i = 0; i < collectAsList.size(); i++) { Row row = collectAsList.get(i); Object[] objectarray = new Object[row.size()]; for (int j = 0; j < objectarray.length; j++) { objectarray[j] = row.get(j); } cache.query(employeeinsert.setArgs(objectarray).setSchema("PUBLIC")).getAll(); } } private static SqlFieldsQuery creatingInsertingCommand(Dataset employee, IgniteCache cache, String tempTable) { String query = gettingInsertQuery(employee); String fields = Arrays.toString(employee.columns()).replaceAll("\\[", "").replaceAll("\\]", ""); String str = "INSERT INTO " + tempTable + " (" + fields + ") VALUES (" + query + ")"; SqlFieldsQuery city = new SqlFieldsQuery(str); System.out.println("str-------------->" + str); return city; } private static String gettingInsertQuery(Dataset employee) { String str = ""; for (int i = 0; i < employee.columns().length; i++) { if (i != employee.columns().length - 1) { str = str + "?, "; } else { str = str + "?"; } } return str; } } -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/