Dynamic generation of the spark SQL based on the input file field

Scenario: A process which generates monthly data in a s3 file. The number of fields could be different in each monthly run. Based on this data in s3,we load the data to a table and we manually (as number of fields could change in each run with addition or deletion of few columns) run a SQL for few metrics.There are more calculations/transforms on this data,but to have starter Im presenting the simpler version of the usecase.

Approach : Considering the schema-less nature, as the number of fields in the s3 file could differ in each run with addition/deletion of few fields,which requires manual changes every-time in the SQL, Im planning to explore Spark/Scala, so that we can directly read from s3 and dynamically generate SQL based on the fields.

Query: How I can achieve this in scala/spark-SQL/dataframe? s3 file contains only the required fields from each run.Hence there is no issue reading the dynamic fields from s3 as it is taken care by dataframe.The issue is how can we generate SQL dataframe-API/spark-SQL code to handle.

I can read s3 file via dataframe and register the dataframe as createOrReplaceTempView to write SQL, but I dont think it helps manually changing the spark-SQL, during addition of a new field in s3 during next run. what is the best way to dynamically generate the sql/any better ways to handle the issue?

Usecase-1:

  • First-run

dataframe: customer,1st_month_count (here dataframe directly points to s3, which has only required attributes)

--sample code SELECT customer,sum(month_1_count) FROM dataframe GROUP BY customer

--Dataframe API/SparkSQL dataframe.groupBy("customer").sum("month_1_count").show()

  • Second-Run - One additional column was added

dataframe: customer,month_1_count,month_2_count) (here dataframe directly points to s3, which has only required attributes)

--Sample SQL SELECT customer,sum(month_1_count),sum(month_2_count) FROM dataframe GROUP BY customer

--Dataframe API/SparkSQL dataframe.groupBy("customer").sum("month_1_count","month_2_count").show()

Im new to Spark/Scala, would be helpful if you can provide the direction so that I can explore further.

Also there could be multiple complex sql calculations, I just posted simpler usecase.

One way is to build dynamic SQL using a string builder or below are a few examples of the schema evolution.

https://medium.com/readme-mic/etl-with-standalone-spark-containers-for-ingesting-small-files-8d6ee2ebda63

https://github.com/micnews/load-to-redshift-public/blob/19788036aed746070ea70e2c4a38ca30e2e4a2df/src/main/scala/Main.scala#L93

https://stackoverflow.com/questions/56782404/reorder-source-spark-dataframe-columns-to-match-the-order-of-the-target-datafram

https://books.google.com/books?id=55lUDwAAQBAJ&pg=PA130&lpg=PA130&dq=add+column+to+downstream+table+based+on+avro+schema&source=bl&ots=ltLG-RI777&sig=ACfU3U3mF26sun2jEKpTdVMfC5vlKGphdQ&hl=en&sa=X&ved=2ahUKEwjZltuAvtHmAhXJm-AKHeSzCd4Q6AEwAnoECAoQAQ#v=onepage&q=add%20column%20to%20downstream%20table%20based%20on%20avro%20schema&f=false

Thanks

Sri

You can look. at my project here https://github.com/manju015/wranglar

What is the input file your passing as first argument ?

Did you find a solution for ur issue? I have similar problem . Please guide me how to implement?