flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Diego Fustes Villadóniga <dfus...@oesia.com>
Subject Calling external services/databases from DataStream API
Date Mon, 30 Jan 2017 11:56:58 GMT
Hi all,

I'm working on an application that enriches network connections with Geolocations, using the
GeoIP database, which is stored as a file in HDFS. To do so, I map every connection in my
stream, using this function:

def enrichIp(ip: String): Location = {

   val location = service.getLocation(ip)

   Location(location.countryName, Some(location.city), Some(latlon))

The variable service is a variable of type LookupService, declared this way:

private lazy val service = new LookupService(Paths.get(path).toFile, LookupService.GEOIP_MEMORY_CACHE)

I can see several problems in this architecture. First, I need to copy manually the file from
HDFS to the local filesytem prior to start the streaming. This would be solved with something
like a DistributedCache, but it is not available in the DataStream API, or at least I can't
see it.

Furthermore, I need to load the full GeoIp database into memory for every Flink task. This
might be OK in terms of performance, but the memory consumption is quite high. The other alternative
that I see is to load the GeoIP file into an external database (Redis, Postgres, etc) and
query it in real time, which might be also a s good solution... Is there any built-in mechanism
to do this?

Kind regards,


View raw message