Parsing Text file and loading it into a DataFrame


#1

I am pretty newbie to Scala. I am into this situation now.
I have a semi-structured text file which I want to convert it to a Data Frame in Spark. I do have a schema on my mind which is shown below. However, I am finding it challenging to parse my text file and assign the schema.

Following is my sample text file:

    "good service"
    Tom Martin (USA) 17th October 2015    
    4    
    Long review..    
    Type Of Traveller	Couple Leisure    
    Cabin Flown	Economy    
    Route	Miami to Chicago    
    Date Flown	September 2015    
    Seat Comfort	12345    
    Cabin Staff Service	12345    
    Ground Service	12345    
    Value For Money	12345    
    Recommended	no

    "not bad"
    M Muller (Canada) 22nd September 2015
    6
    Yet another long review..
    Aircraft	TXT-101
    Type Of Customer	Couple Leisure
    Cabin Flown	FirstClass
    Route	IND to CHI
    Date Flown	September 2015
    Seat Comfort	12345
    Cabin Staff Service	12345
    Food & Beverages	12345
    Inflight Entertainment	12345
    Ground Service	12345
    Value For Money	12345
    Recommended	yes

.
.

The resulting schema with result that I expect to have as follows:

    +----------------+------------+--------------+---------------------+---------------+---------------------------+----------+------------------+-------------+--------------+-------------------+----------------+--------------+---------------------+-----------------+------------------------+----------------+---------------------+-----------------+
    | Review_Header  | User_Name  | User_Country |  User_Review_Date   | Overall Score |          Review           | Aircraft | Type of Traveler | Cabin Flown | Route_Source | Route_Destination |   Date Flown   | Seat Comfort | Cabin Staff Service | Food & Beverage | Inflight Entertainment | Ground Service | Wifi & Connectivity | Value for Money |
    +----------------+------------+--------------+---------------------+---------------+---------------------------+----------+------------------+-------------+--------------+-------------------+----------------+--------------+---------------------+-----------------+------------------------+----------------+---------------------+-----------------+
    | "good service" | Tom Martin | USA          | 17th October 2015   |             4 | Long review..             |          | Couple Leisure   | Economy     | Miami        | Chicago           | September 2015 |        12345 |               12345 |                 |                        |          12345 |                     |           12345 |
    | "not bad"      | M Muller   | Canada       | 22nd September 2015 |             6 | Yet another long review.. | TXT-101  | Couple Leisure   | FirstClass  | IND          | CHI               | September 2015 |        12345 |               12345 |           12345 |                  12345 |          12345 |                     |           12345 |
    +----------------+------------+--------------+---------------------+---------------+---------------------------+----------+------------------+-------------+--------------+-------------------+----------------+--------------+---------------------+-----------------+------------------------+----------------+---------------------+-----------------+

As you may notice, for each block of data in text file, the first four lines are mapped to user defined columns such as Review_Header, User_Name, User_Country, User_Review_Date, whereas rest other individual lines have defined columns.


#2
  1. analyze your input data to determine what types of blocks of data you need to deal with.
  2. associate each block of data with some type of class that maps to that particular block.
    3.process the class for the block
  3. repeat.

#3

Hi jOasis!!
Finally, did you implement the solution suggested by “sidhartha11”?? In this case, please can you show it to me? Thanks.
I need to do something like your task.

BR, Agustín.


#4

Just idea, you can check how to implement dataframe use spark. Check documentation in here : https://spark.apache.org/docs/latest/sql-programming-guide.html .

May be useful.


#5

Hi aris94! Thanks but i think i know to manage DF.
My main problem is knowing how to parse a file with different kind of lines.

My file, for example, have these lines:
01/08/18 05:17 TARJETA INTRODUCIDA
01/08/18 05:17 *** ERROR ATR ***
01/08/18 05:17 TRANS NUM: 9297
01/08/18 05:17 ERROR DE FALLBACK
01/08/18 05:17 TARJETA RETIRADA
01/08/18 05:17 TARJETA INTRODUCIDA
01/08/18 05:17 AID: A0000000032010 APLICACION EMV: VISA ELECTRON
01/08/18 05:17 PAN EMV: 491268******7440
01/08/18 05:17 ESTABLECER SESION
01/08/18 05:17 OBTENER CARD INFO
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 OBTENER INDICADORES
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 OBTENER LIMITES
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 -> TRANSACTION START
01/08/18 05:17 RETIRO DE EFECTIVO
01/08/18 05:17 NUM. CUENTA: **** **** ** ******4833
01/08/18 05:17 OBTENER ACCOUNT INFO
01/08/18 05:17 CONSULTA COMISION
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 COMISION: $ 0.00
01/08/18 05:18 IMPORTE INVALIDO: $ 700,000
01/08/18 05:18 IMPORTE INVALIDO: $ 8,887
01/08/18 05:18 CANCELADA EN INTRODUCCION IMPORTE
01/08/18 05:18 <- TRANSACTION END
01/08/18 05:18 TARJETA RETIRADA

And my regex is:
(TARJETA INTRODUCIDA(.?)TARJETA RETIRADA)|(-> TRANSACTION START(…)RETIRO SIN TARJETA(.?)<- TRANSACTION END)

I’m calling next function (parseTrx) passing like parameter a string with the lines (above) of the file.

def parseTrx(log: String): LogBlockTrx = {
try {
val res = PATTERN_TRX.findFirstMatchIn(log)
if (res.isEmpty) {
println("Rejected Log Line: " )
LogBlockTrx(“Empty”)
}
else {
val m = res.get
// NOTE: HEAD does not have a content size.
LogBlockTrx(m.group(1))
}
} catch {
case e: Exception =>
println(“Exception on line:” + log + “:” + e.getMessage)
LogBlockTrx(“Empty”)
}
}

I would like to separate the file in blocks using my regex.
In the example above, i would like to obtain 2 blocks:
FIRST BLOCK
01/08/18 05:17 TARJETA INTRODUCIDA
01/08/18 05:17 *** ERROR ATR ***
01/08/18 05:17 TRANS NUM: 9297
01/08/18 05:17 ERROR DE FALLBACK
01/08/18 05:17 TARJETA RETIRADA

SECOND BLOCK
01/08/18 05:17 TARJETA INTRODUCIDA
01/08/18 05:17 AID: A0000000032010 APLICACION EMV: VISA ELECTRON
01/08/18 05:17 PAN EMV: 491268******7440
01/08/18 05:17 ESTABLECER SESION
01/08/18 05:17 OBTENER CARD INFO
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 OBTENER INDICADORES
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 OBTENER LIMITES
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 -> TRANSACTION START
01/08/18 05:17 RETIRO DE EFECTIVO
01/08/18 05:17 NUM. CUENTA: **** **** ** ******4833
01/08/18 05:17 OBTENER ACCOUNT INFO
01/08/18 05:17 CONSULTA COMISION
01/08/18 05:17 AUTORIZADA
01/08/18 05:17 COMISION: $ 0.00
01/08/18 05:18 IMPORTE INVALIDO: $ 700,000
01/08/18 05:18 IMPORTE INVALIDO: $ 8,887
01/08/18 05:18 CANCELADA EN INTRODUCCION IMPORTE
01/08/18 05:18 <- TRANSACTION END
01/08/18 05:18 TARJETA RETIRADA

Thanks a lot.


#6

Sorry i think you just want to create dataframe for your text. For it, I can’t help. I hope your problem as soon finish.


#7

Hi. I am new to scala too. And I have never used Spark before. So I cannot assist you on that. But your problem seems familiar to me. Here is my approach:

1: Our file consists of lines, right?

2: A more semantic view on that file would be: it consists of different “sections”

3: 1. and 2. => there are two types of lines: those indicating the beginning of a section (and belonging to that section in your case) and those simply belonging to that section without indicating anything from a high-level point-of-view

4: a section needs to have an end in some way. In your case: whenever a new section starts, the old (previous) one ends

Given that, I would implement something like:

currentSection = empty string buffer
while not EOF:
   currentLine = getNextLine()
   if (currentLine indicates beginning of new section):
      handleCurrentSection()
      startNewSection()
   else: // currentSection is still "incomplete"
      currentSection.append(currentLine)

// when the file ends, the last section ends. We still need to handle it
handleCurrentSection()

where handleCurrentSection() can basically parse the content of currentSection and startNewSection() clears the currentSection (and - in your case - adds currentLine to the cleared buffer). As your sections seem to be “homogeneous”, handleCurrentSection() is always doing the same thing: generate a new row in your dataframe using the data currently given in currentSection.

So far for my approach.

Here is another thought on that: Your solution could easily be generalized to handle different types of sections differently. In that case, you end up with the very basic algorithm of interpreters: call your currentLine Token and your Section expression and iterate over all tokes your file consist of: whenever a token completes an expression, evaluate that expression :wink: