phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rahul (Jira)" <j...@apache.org>
Subject [jira] [Updated] (PHOENIX-5727) Intermittent Upserts with Kafka and Spark Streaming
Date Sat, 15 Feb 2020 06:18:00 GMT

     [ https://issues.apache.org/jira/browse/PHOENIX-5727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Rahul updated PHOENIX-5727:
---------------------------
    Description: 
Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table using Phoenix
JDBC thick client with commit size of 500 what i have observed is the job silently fails to
do upserts without throwing any errors this happens intermittently the frequency of data what
i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys than inserts.

is this is known issue with phoenix?

 

Sample Code

A and B are composite keys with commit size of 500

 

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver") 
                      val con_startTimeMillis = System.currentTimeMillis()   
                   

val con = DriverManager.getConnection("jdbc:phoenix:localhost")                 
     

println(">>>> time taken for connection::" + (System.currentTimeMillis() - con_startTimeMillis).toDouble
/ 1000 + " secs")                     

  con.setAutoCommit(false);                                         
      

for loop

{                                var a = rec.getAs("A").toString       
                     

  var b = rec.getAs("B").toString                               

var c = rec.getAs("C").toString                                v

ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString               
     

          var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString     
                         

var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString               
               

var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString               
               

var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString               
                \                             var upsert_stmt = "upsert
into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" + d + "','"
+ e  + "','" + f + "','" + g + "','" + h + "')"                           
   

println(">>>>upsert statement formed::" + upsert_stmt)                
               

var stmt = con.prepareStatement(upsert_stmt)                               

stmt.executeUpdate()                           

    bs=bs+1;                               

if (bs % commitSize == 0)

\\{                                    con.commit()             
                  }

                        }

 

                      con.commit()

                        con.close()

 

  was:
Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table using Phoenix
JDBC thick client with commit size of 500 what i have observed is the job silently fails to
do upserts without throwing any errors this happens intermittently the frequency of data what
i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys than inserts.

is this is known issue with phoenix?

 

Sample Code

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver") 
                      val con_startTimeMillis = System.currentTimeMillis()   
                    val con = DriverManager.getConnection("jdbc:phoenix:localhost") 
                      println(">>>> time taken for connection::" +
(System.currentTimeMillis() - con_startTimeMillis).toDouble / 1000 + " secs")         
              con.setAutoCommit(false);                             
                  for loop {                                var
a = rec.getAs("A").toString                                var b = rec.getAs("B").toString 
                              var c = rec.getAs("C").toString         
                      var d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString 
                              var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString 
                              var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString 
                              var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString 
                              var h = if (rec.getAs("H") == null) "0" else
rec.getAs("H").toString                                var i = if (rec.getAs("I")
== null) "" else rec.getAs("I").toString 
                                 var upsert_stmt = "upsert into " + phoenix_tablename
+ " values ('" + a + "','" + b  + "','" + c + "','" + d + "','" + e  + "','" + f + "','"
+ g + "','" + h + "')"                                println(">>>>upsert
statement formed::" + upsert_stmt)                                 var stmt
= con.prepareStatement(upsert_stmt)                                stmt.executeUpdate() 
                              bs=bs+1;                         
      if (bs % commitSize == 0) \{                                   
con.commit()                                }                    
    }

 

                      con.commit()

                        con.close()

 


> Intermittent Upserts with Kafka and Spark Streaming
> ---------------------------------------------------
>
>                 Key: PHOENIX-5727
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-5727
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 4.14.0
>            Reporter: Rahul
>            Priority: Major
>
> Hi,
> I have a spark job which reads from kafka stream and writes to a phoenix table using
Phoenix JDBC thick client with commit size of 500 what i have observed is the job silently
fails to do upserts without throwing any errors this happens intermittently the frequency
of data what i get is around 1000 rows/sec.
> And my Input data set is such that we will have more updates on the row keys than inserts.
> is this is known issue with phoenix?
>  
> Sample Code
> A and B are composite keys with commit size of 500
>  
> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver") 
                      val con_startTimeMillis = System.currentTimeMillis()   
                   
> val con = DriverManager.getConnection("jdbc:phoenix:localhost")               
       
> println(">>>> time taken for connection::" + (System.currentTimeMillis()
- con_startTimeMillis).toDouble / 1000 + " secs")                     
>   con.setAutoCommit(false);                                     
          
> for loop
> {                                var a = rec.getAs("A").toString   
                         
>   var b = rec.getAs("B").toString                               
> var c = rec.getAs("C").toString                                v
> ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString             
       
>           var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString 
                             
> var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString           
                   
> var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString           
                   
> var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString           
                    \                             var upsert_stmt
= "upsert into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','"
+ d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"                   
           
> println(">>>>upsert statement formed::" + upsert_stmt)            
                   
> var stmt = con.prepareStatement(upsert_stmt)                           
   
> stmt.executeUpdate()                           
>     bs=bs+1;                               
> if (bs % commitSize == 0)
> \\{                                    con.commit()           
                    }
>                         }
>  
>                       con.commit()
>                         con.close()
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message