flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Date Thu, 24 Aug 2017 21:54:43 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r135117112
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
---
    @@ -307,6 +312,139 @@ object UserDefinedFunctionUtils {
       // ----------------------------------------------------------------------------------------------
     
       /**
    +    * Analyze the constructor to get the type information of the MapView or ListView
type variables
    +    * inside the accumulate.
    +    *
    +    * @param aggFun aggregate function
    +    * @return the data view specification
    +    */
    +  def getDataViewTypeInfoFromConstructor(
    +    aggFun: AggregateFunction[_, _])
    +  : mutable.HashMap[String, TypeInformation[_]] = {
    +
    +    val resultMap = new mutable.HashMap[String, TypeInformation[_]]
    +    val acc = aggFun.createAccumulator()
    +    val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true)
    +    for (i <- 0 until fields.size()) {
    +      val field = fields.get(i)
    +      field.setAccessible(true)
    +      if (classOf[DataView].isAssignableFrom(field.getType)) {
    +        if (field.getType == classOf[MapView[_, _]]) {
    +          val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
    +          if (mapView != null) {
    +            val keyTypeInfo = mapView.keyTypeInfo
    +            val valueTypeInfo = mapView.valueTypeInfo
    +
    +            if (keyTypeInfo != null && valueTypeInfo != null) {
    +              resultMap.put(field.getName, new MapViewTypeInfo(keyTypeInfo, valueTypeInfo))
    +            }
    +          } else {
    +            resultMap.put(field.getName, null)
    +          }
    +        } else if (field.getType == classOf[ListView[_]]) {
    +          val listView = field.get(acc).asInstanceOf[ListView[_]]
    +          val elementTypeInfo = listView.elementTypeInfo
    +
    +          if (elementTypeInfo != null) {
    +            resultMap.put(field.getName, new ListViewTypeInfo(elementTypeInfo))
    +          }
    +        }
    +      }
    +    }
    +
    +    resultMap
    +  }
    +
    +  /**
    +    * Remove StateView fields from accumulator type information.
    +    *
    +    * @param index index of aggregate function
    +    * @param aggFun aggregate function
    +    * @param accType accumulator type information, only support pojo type
    +    * @param isStateBackedDataViews is data views use state backend
    +    * @return mapping of accumulator type information and data view config which contains
id,
    +    *         field name and state descriptor
    +    */
    +  def removeStateViewFieldsFromAccTypeInfo(
    +    index: Int,
    +    aggFun: AggregateFunction[_, _],
    +    accType: TypeInformation[_],
    +    isStateBackedDataViews: Boolean)
    +  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
    +
    +    var hasDataView = false
    +    val acc = aggFun.createAccumulator()
    +    accType match {
    +      case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
    +        val arity = pojoType.getArity
    +        val newPojoFields = new util.ArrayList[PojoField]()
    +        val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
    +        for (i <- 0 until arity) {
    +          val pojoField = pojoType.getPojoFieldAt(i)
    +          val field = pojoField.getField
    +          val fieldName = field.getName
    +          field.setAccessible(true)
    +
    +          pojoField.getTypeInformation match {
    +            case map: MapViewTypeInfo[Any, Any] =>
    +              val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
    +              if (mapView != null) {
    +                val keyTypeInfo = mapView.keyTypeInfo
    +                val valueTypeInfo = mapView.valueTypeInfo
    +                val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo !=
null) {
    +                  hasDataView = true
    +                  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
    +                } else {
    +                  map
    +                }
    +
    +                var spec = MapViewSpec(
    +                  "agg" + index + "$" + fieldName, // generate unique name to be used
as state name
    +                  field,
    +                  newTypeInfo)
    +
    +                accumulatorSpecs += spec
    +                if (!isStateBackedDataViews) { // add data view field which not use state
backend
    +                  newPojoFields.add(new PojoField(field, newTypeInfo))
    +                }
    +              }
    +
    +            case list: ListViewTypeInfo[Any] =>
    +              val listView = field.get(acc).asInstanceOf[ListView[_]]
    +              if (listView != null) {
    +                val elementTypeInfo = listView.elementTypeInfo
    +                val newTypeInfo = if (elementTypeInfo != null) {
    +                  hasDataView = true
    +                  new ListViewTypeInfo(elementTypeInfo)
    +                } else {
    +                  list
    +                }
    +
    +                var spec = ListViewSpec(
    +                  "agg" + index + "$" + fieldName, // generate unique name to be used
as state name
    +                  field,
    +                  newTypeInfo)
    +
    +                accumulatorSpecs += spec
    +                if (!isStateBackedDataViews) { // add data view field which not use state
backend
    +                  newPojoFields.add(new PojoField(field, newTypeInfo))
    +                }
    +              }
    +
    +            case _ => newPojoFields.add(pojoField)
    +          }
    +        }
    +        (new PojoTypeInfo(accType.getTypeClass, newPojoFields), Some(accumulatorSpecs))
    +
    +      case _ => if (!hasDataView) {
    --- End diff --
    
    change to
    
    ```
    case _ if (!hasDataView) => (accType, None)
    case _ => throw new TableException(...)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message