I am getting a runtime exception “Exception in User Class: org.apache.spark.SparkException : Job aborted due to stage failure: Task 0 in stage 34.0 failed 4 times, most recent failure: Lost task 0.3 in stage 34.0 (TID 46) (10.67.233.121 executor 1): java.lang.RuntimeException: Duplicate map key was found, please check the input data. If you want to remove the duplicated keys, you can set spark.sql.mapKeyDedupPolicy to LAST_WIN so that the key inserted at last takes precedence.”
I have verified there are no duplicate records with the key.I have set the spark.sql.mapKeyDedupPolicy as suggested above to LAST_WIN. Still getting the same error.
Please see the code snippet below
sourceDF.rdd.foreachPartition(partition => {
partition.grouped(UPDATE_BATCH_SIZE).foreach(batch => {
batch.foreach(row => {
logger.info(s"row : $row")
})
})
})
tried updating the policy as shown below.
val sparkSession: SparkSession = {
val spark: SparkContext = SparkContext.getOrCreate()
val conf: SparkConf = spark.getConf
conf.set(“spark.sql.mapKeyDedupPolicy”, “LAST_WIN”)
SparkSession.builder().config(conf).getOrCreate()
}
Any one have any idea on this issue?
Note that this really isn’t a very good place to ask about Spark – this forum is about the Scala programming language in general: while there are undoubtedly some Spark programmers here, they’re probably a small fraction of the users. Spark is only one of many uses for Scala, and it’s a very unusual one in many ways.
I suspect there are more-specialized Spark forums that might give you better help.
Whatever the cause of the error is, it’s probably in sourceDF
and not in the code you posted. The only spark-specific function in your code is foreachPartition
which is pretty straightforward.
The only slightly suspicious thing is that you’re capturing the logger
in your lambda function, which could potentially cause some unwanted (de)serialization of objects towards the executors. But I’m not sure how that relates to this error.