flatMap RDD[(String,String)] to a RDD[YourCaseClass]

I have a case class as below.

case class MailRecord(MessageID: String, MailDate: String, MailFrom: String, MailTo: String, MailMessage: String)

And I have a RDD[(String,String)] output from wholeTextFiles. How do I flatMap RDD[(String,String)] to a RDD[MailRecord]? The second string in RDD[(String, String)] has the data for case class delimited by ‘\n’.

An Example output from RDD[(String, String)]

res8: Array[(String, String)] =
Array((abfss://xxxx@bigdata.dfs.core.windows.net/maildir/mattj/inbox/mail.txt,Message-ID: 8794327971.98213073.JavaMail.minds@thyme
Date: Sun, 28 Dec 2018 23:53:42 -0800 (PST)
From: helpdesk@platoon.hotmail.com
To: tanuk@gmail.com
Subject: test email

Hi There
regards
kk
))

Thanks.

Is every tuple in your RDD a single message? Then you can just use map instead of flatMap.

Yes, ever tuple in RDD is a single message. The first string in the tuple is the path of the message file in a folder. and the second string in the tuple is the entire message itself. how do I use map function to get the data from the message?

Thanks.

Something like:

val rddIn: RDD[(String, String)] = ...

val rddOut = rddIn.map({ case ((messageFileLoc, message)) =>
//...here you use messageFileLoc and message to retrieve any data you need to create a YourCaseClass instance...

YourCaseClass(/* data gathered immediately above as input parameters */)

})

Best regards,

Brian Maso

Thanks Brian.
When I retrieve any data from ‘message’ string, can I assign it to a variable within the block before I assign it to a case class instance. can you please provide an example?
Example: I want to retrieve the line that contains string "Message-ID: " in the string message within the tuple and assign that to MessageID in my case class.

Thanks and much appreciated.

Hi Satish,

Try it out before asking. Indeed you are allowed to create local variables within a function. I suggest you try things out, and if you get surprising results then post your full examploe and the errors that are confounding you to the list.

1 Like

Hi Brian
I was able to create local variables within a function. Thank you so much for your guidance.

val rddIn: RDD[(String, String)] = …

val rddOut = rddIn.map({ case ((messageFileLoc, message)) =>

mId = message.substring(message.indexOf(“Message-ID”) + 11,message.indexOf(“Date:”)).trim()
mDate = message.substring(message.indexOf(“Date:”) + 5,message.indexOf(“From:”)).trim()
mFrom = message.substring(message.indexOf(“From:”) + 5,message.indexOf(“To:”)).trim()
mSubject message.substring(message.indexOf(“To:”) + 3,message.indexOf(“Subject:”)).trim()

YourCaseClass(mId, mDate, mFrom, mSubject)

})