Fostering modularity

Modular programming with Spark Column expressions is simply not possible. For instance, let’s try to solve a simple problem in a modular way. We start from the following DataFrame, whose columns finish with the same suffix “_user”:

userDF.show()
// +---------+---------+--------+
// |name_user|city_user|age_user|
// +---------+---------+--------+
// |      Foo|   Madrid|      35|
// |      Bar| New York|      40|
// |     John|    Paris|      30|
// +---------+---------+--------+
// 
userDF.printSchema()
// root
//  |-- name_user: string (nullable = true)
//  |-- city_user: string (nullable = true)
//  |-- age_user: integer (nullable = false)
//

We may refer to these columns using their full names:

f.col("name_user")
// res2: Column = name_user
f.col("age_user")
// res3: Column = age_user
f.col("city_user")
// res4: Column = city_user
//and many more

But we may also want to create a reusable function which abstract away the common suffix and allows us to focus on the unique part of the column names (thus obtaining a more modular solution):

def userCol(colName: String): Column =
  f.col(colName + "_user")

If we refer to an existing prefix, everything is fine. But, if we refer to a non-existing column in a select statement, for instance, then we would like to know where exactly that reference was made. The problem is that Spark will refer us to the line of the select statement instead:

val userc = userCol("name1") // actual location of error :S
userDF.select(userc)        // error location reported by Spark
// org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `name1_user` cannot be resolved. Did you mean one of the following? [`name_user`, `age_user`, `city_user`].;
// 'Project ['name1_user]
// +- Project [_1#316 AS name_user#323, _2#317 AS city_user#324, _3#318 AS age_user#325]
//    +- LocalRelation [_1#316, _2#317, _3#318]
// 
// 	at org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:306)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:141)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6(CheckAnalysis.scala:299)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6$adapted(CheckAnalysis.scala:297)
// 	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:244)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:297)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:297)
// 	at scala.collection.immutable.Stream.foreach(Stream.scala:533)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:297)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:215)

Towards the source of error

Doric includes in the exception the exact line of the malformed column reference (cf. error reporting in doric), and this will be of great help in solving our problem. However, the following doric function doesn’t really work:

import doric.types.SparkType
def user[T: SparkType](colName: String): DoricColumn[T] = {
  col[T](colName + "_user")
}

Indeed, the following code will point out to the implementation of the user function:

val age = user[Int]("name")
val team = user[String]("team")
userDF.select(age, team)
// doric.sem.DoricMultiError: Found 2 errors in select
//   The column with name 'name_user' was expected to be IntegerType but is of type StringType
//   	located at . (modularity.md:79)
//   [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `team_user` cannot be resolved. Did you mean one of the following? [`name_user`, `city_user`, `age_user`].
//   	located at . (modularity.md:79)
// 
// 	at doric.sem.package$ErrorThrower.$anonfun$returnOrThrow$1(package.scala:9)
// 	at cats.data.Validated.fold(Validated.scala:50)
// 	at doric.sem.package$ErrorThrower.returnOrThrow(package.scala:9)
// 	at doric.sem.TransformOps$DataframeTransformationSyntax.select(TransformOps.scala:140)
// 	at repl.MdocSession$MdocApp$$anonfun$2.apply(modularity.md:89)
// 	at repl.MdocSession$MdocApp$$anonfun$2.apply(modularity.md:86)

To get the right line of code in the reported error, we can just add the following implicit parameter to the user function:

import doric._
import doric.sem.Location
import doric.types.SparkType

def user[T: SparkType](colName: String)(implicit location: Location): DoricColumn[T] = {
  col[T](colName + "_user")
}

Now, if we repeat the same error we will be referred to the real problem:

val age = user[Int]("name")
val team = user[String]("team")
userDF.select(age, team)
// doric.sem.DoricMultiError: Found 2 errors in select
//   The column with name 'name_user' was expected to be IntegerType but is of type StringType
//   	located at . (modularity.md:135)
//   [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `team_user` cannot be resolved. Did you mean one of the following? [`name_user`, `city_user`, `age_user`].
//   	located at . (modularity.md:136)
// 
// 	at doric.sem.package$ErrorThrower.$anonfun$returnOrThrow$1(package.scala:9)
// 	at cats.data.Validated.fold(Validated.scala:50)
// 	at doric.sem.package$ErrorThrower.returnOrThrow(package.scala:9)
// 	at doric.sem.TransformOps$DataframeTransformationSyntax.select(TransformOps.scala:140)
// 	at repl.MdocSession$MdocApp5$$anonfun$3.apply(modularity.md:137)
// 	at repl.MdocSession$MdocApp5$$anonfun$3.apply(modularity.md:134)