flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrea Spina <andrea.sp...@radicalbit.io>
Subject Async Functions and Scala async-client for mySql/MariaDB database connection
Date Thu, 30 Mar 2017 16:50:10 GMT
Dear Flink community,

I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve
enriching information from MariaDB database. In order to do that, I firstly
employed classical jdbc library (org.mariadb.jdbc) and it worked has
expected.

Due to the blocking behavior of jdbc, I'm trying to use this library
https://github.com/mauricio/postgresql-async/tree/master/mysql-async
which promises to offer a subset of features in a non-blocking fashion.

Sadly I'm not able to use it.

Following the async function code.

*
object AsyncEnricher {
  case class OutputType(field1: FieldType, field2: FieldType)
}

class AsyncEnricher(configuration: MariaDBConfig)
    extends AsyncFunction[InputType, OutputType]
    with Serializable
    with AutoCloseable
    with LazyLogging {

  private val queryString = s"SELECT <column> FROM [table] WHERE
<column_name> = <value>;"

  implicit lazy val executor =
ExecutionContext.fromExecutor(Executors.directExecutor())

  private lazy val mariaDBClient: Connection = {
    val config = createConfiguration(configuration)
    val connection = new MySQLConnection(config)
    Await.result(connection.connect, 5 seconds)
  }

  override def asyncInvoke(input: InputType, collector:
AsyncCollector[OutputType]): Unit = {

    val queryResult = mariaDBClient.sendPreparedStatement(queryString,
Seq(input.fieldToSearch))

    queryResult.map(_.rows) onSuccess {
      case Some(resultSet) =>
        Try {
          resultSet.head(0).asInstanceOf[FieldType]
        } match {
          case Success(value) =>
            collector.collect(Iterable(OutputType(value, value)))
          case Failure(e) =>
            logger.error(s"retrieving value from MariaDB raised $e:
$queryString executed")
        }
      case _ => logger.error(s"value not found: $queryString executed")
    }

    queryResult onFailure {
      case e: Throwable =>
        logger.error(s"retrieving location volume from MariaDB raised $e:
$queryString executed")
    }

  }

  override def close(): Unit = {
    Try(mariaDBClient.disconnect).recover {
      case t: Throwable => logger.info(s"MariaDB cannot be closed -
${t.getMessage}")
    }
  }

}
*

Follows the stack

/
TimerException{java.lang.IllegalStateException: Timer service is shut down}
	at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Timer service is shut down
	at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118)
	at
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82)
	at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
	... 7 more

java.lang.NullPointerException
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343)
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
	at java.lang.Thread.run(Thread.java:745)
/

I think it's involving connection.connect returning object which is a Future
and so the Await. This is different than jdbc driver, which worked like a
charm. I tried to move away the await from the lazy val.

Can't wait for your opinion. Thank you so much in advance.

Andrea 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message