Writing UDFs (User Defined Functions) in Apache Spark
In this tutorial, we will see how to create a User Defined Function aka UDF in Apache Spark.
UDF in Spark allows us to create a customized Column based functions. One important point we have to understand is Spark does not recommend using UDF as the framework cannot optimize the developer created functions. Therefore, use UDFs with caution.
We can create a UDF in 3 simple steps
- Create a scala function
- Pass the function as a parameter to
udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction
- Use it like any other Spark functions.
Let us try to understand with a simple example. Consider we want to validate a date column. We can create a UDF to perform this functionality.
Create a scala function which takes string date as the input, validates and returns boolean true or false.
val isValidDate = udf((date: String) => {
val set = mutable.HashSet.empty[String]
for (year <- 1900 to 2050) {
for (month <- 1 to 12) {
for (day <- 1 to daysInMonth(year, month)) {
val dateBuilder = new mutable.StringBuilder()
dateBuilder.append(f"$year%04d")
dateBuilder.append(f"$month%02d")
dateBuilder.append(f"$day%02d")
set += dateBuilder.toString()
}
}
}
if (set.contains(date)) true else false
})def daysInMonth(year: Int, month: Int): Int = {
val daysInMonth: Int = month match {
case 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31
case 2 => {
if (((year % 4 == 0) && (year % 100 != 0)) || (year % 400 == 0)) 29 else 28
}
case _ => 30
}
daysInMonth
}
Register this scala function as below
spark.udf.register("isValidDate", isValidDate)
We can use this UDF as any other spark column-based functions.
val datesTransformed = datesDF.withColumn("isValidDate", isValidDate(col("date")))
Here is the complete working code
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.udf
import scala.collection.mutable
object DateValidator {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("Map&FlatMap").master("local[*]").getOrCreate()
val dates = Seq("20171002", "hello", "20171035", "20190101", "20191010")
import spark.implicits._
val datesDF = spark.sparkContext.parallelize(dates).toDF("date")
println(daysInMonth(2011, 12))
val isValidDate = udf((date: String) => {
val set = mutable.HashSet.empty[String]
for (year <- 1900 to 2050) {
for (month <- 1 to 12) {
for (day <- 1 to daysInMonth(year, month)) {
val dateBuilder = new mutable.StringBuilder()
dateBuilder.append(f"$year%04d")
dateBuilder.append(f"$month%02d")
dateBuilder.append(f"$day%02d")
set += dateBuilder.toString()
}
}
}
if (set.contains(date)) true else false
})
spark.udf.register("isValidDate", isValidDate)
val datesTransformed = datesDF.withColumn("isValidDate", isValidDate(col("date")))
datesTransformed.show()
}
def daysInMonth(year: Int, month: Int): Int = {
val daysInMonth: Int = month match {
case 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31
case 2 => {
if (((year % 4 == 0) && (year % 100 != 0)) || (year % 400 == 0)) 29 else 28
}
case _ => 30
}
daysInMonth
}
}
The output of this looks like below
- — — — — + — — — — — -+
| date|isValidDate|
+ — — — — + — — — — — -+
|20171002| true|
| hello| false|
|20171035| false|
|20190101| true|
|20191010| true|
+ — — — — + — — — — — -+
Additional resources: — https://bit.ly/3frXGhR