Writing UDFs (User Defined Functions) in Apache Spark

Phani Kumar Yadavilli
3 min readSep 28, 2019

In this tutorial, we will see how to create a User Defined Function aka UDF in Apache Spark.

UDF in Spark allows us to create a customized Column based functions. One important point we have to understand is Spark does not recommend using UDF as the framework cannot optimize the developer created functions. Therefore, use UDFs with caution.

We can create a UDF in 3 simple steps

  • Create a scala function
  • Pass the function as a parameter to udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction
  • Use it like any other Spark functions.

Let us try to understand with a simple example. Consider we want to validate a date column. We can create a UDF to perform this functionality.

Create a scala function which takes string date as the input, validates and returns boolean true or false.

val isValidDate = udf((date: String) => {
val set = mutable.HashSet.empty[String]

for (year <- 1900 to 2050) {
for (month <- 1 to 12) {
for (day <- 1 to daysInMonth(year, month)) {
val dateBuilder = new mutable.StringBuilder()
dateBuilder.append(f"$year%04d")
dateBuilder.append(f"$month%02d")
dateBuilder.append(f"$day%02d")
set += dateBuilder.toString()
}
}
}

if (set.contains(date)) true else false
})
def daysInMonth(year: Int, month: Int): Int = {
val daysInMonth: Int = month match {
case 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31
case 2 => {
if (((year % 4 == 0) && (year % 100 != 0)) || (year % 400 == 0)) 29 else 28
}
case _ => 30
}

daysInMonth
}

Register this scala function as below

spark.udf.register("isValidDate", isValidDate)

We can use this UDF as any other spark column-based functions.

val datesTransformed = datesDF.withColumn("isValidDate", isValidDate(col("date")))

Here is the complete working code

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.udf

import scala.collection.mutable

object DateValidator {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("Map&FlatMap").master("local[*]").getOrCreate()

val dates = Seq("20171002", "hello", "20171035", "20190101", "20191010")

import spark.implicits._

val datesDF = spark.sparkContext.parallelize(dates).toDF("date")

println(daysInMonth(2011, 12))

val isValidDate = udf((date: String) => {
val set = mutable.HashSet.empty[String]

for (year <- 1900 to 2050) {
for (month <- 1 to 12) {
for (day <- 1 to daysInMonth(year, month)) {
val dateBuilder = new mutable.StringBuilder()
dateBuilder.append(f"$year%04d")
dateBuilder.append(f"$month%02d")
dateBuilder.append(f"$day%02d")
set += dateBuilder.toString()
}
}
}
if (set.contains(date)) true else false
})
spark.udf.register("isValidDate", isValidDate)

val datesTransformed = datesDF.withColumn("isValidDate", isValidDate(col("date")))
datesTransformed.show()
}

def daysInMonth(year: Int, month: Int): Int = {
val daysInMonth: Int = month match {
case 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31
case 2 => {
if (((year % 4 == 0) && (year % 100 != 0)) || (year % 400 == 0)) 29 else 28
}
case _ => 30
}
daysInMonth
}
}

The output of this looks like below

  • — — — — + — — — — — -+
    | date|isValidDate|
    + — — — — + — — — — — -+
    |20171002| true|
    | hello| false|
    |20171035| false|
    |20190101| true|
    |20191010| true|
    + — — — — + — — — — — -+

Additional resources: — https://bit.ly/3frXGhR

--

--

Phani Kumar Yadavilli

I am a Big Data Analytics Engineer passionate about writing good code and building highly scalable distributed systems.