SPARK: Load Data from Dataframe or RDD to DynamoDB / dealing with null values


#1

I am using spark 2.1 on EMR and i have a dataframe like this:

 ClientNum  | Value_1  | Value_2 | Value_3  | Value_4
     14     |    A     |    B    |   C      |   null
     19     |    X     |    Y    |  null    |   null
     21     |    R     |   null  |  null    |   null

I want to load data into DynamoDB table with ClientNum as key fetching:

Analyze Your Data on Amazon DynamoDB with Apache Spark

Using Spark SQL for ETL

here is my code that I tried to solve:

  var jobConf = new JobConf(sc.hadoopConfiguration)
  jobConf.set("dynamodb.servicename", "dynamodb")
  jobConf.set("dynamodb.input.tableName", "table_name")   
  jobConf.set("dynamodb.output.tableName", "table_name")   
  jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com")
  jobConf.set("dynamodb.regionid", "eu-west-1")
  jobConf.set("dynamodb.throughput.read", "1")
  jobConf.set("dynamodb.throughput.read.percent", "1")
  jobConf.set("dynamodb.throughput.write", "1")
  jobConf.set("dynamodb.throughput.write.percent", "1")
  
  jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

  #Import Data
  val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)

I performed a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.

Create a new RDD with those types in it, in the following map call:

  #Convert the dataframe to rdd
  val df_rdd = df.rdd
  > df_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[10] at rdd at <console>:41
  
  #Print first rdd
  df_rdd.take(1)
  > res12: Array[org.apache.spark.sql.Row] = Array([14,A,B,C,null])

  var ddbInsertFormattedRDD = df_rdd.map(a => {
  var ddbMap = new HashMap[String, AttributeValue]()

  var ClientNum = new AttributeValue()
  ClientNum.setN(a.get(0).toString)
  ddbMap.put("ClientNum", ClientNum)

  var Value_1 = new AttributeValue()
  Value_1.setS(a.get(1).toString)
  ddbMap.put("Value_1", Value_1)

  var Value_2 = new AttributeValue()
  Value_2.setS(a.get(2).toString)
  ddbMap.put("Value_2", Value_2)

  var Value_3 = new AttributeValue()
  Value_3.setS(a.get(3).toString)
  ddbMap.put("Value_3", Value_3)

  var Value_4 = new AttributeValue()
  Value_4.setS(a.get(4).toString)
  ddbMap.put("Value_4", Value_4)

  var item = new DynamoDBItemWritable()
  item.setItem(ddbMap)

  (new Text(""), item)
  })

This last call uses the job configuration that defines the EMR-DDB connector to write out the new RDD you created in the expected format:

ddbInsertFormattedRDD.saveAsHadoopDataset(jobConf)

fails with the follwoing error:

Caused by: java.lang.NullPointerException

null values caused the error, if I try with ClientNum and Value_1 it works data is correctly inserted on DynamoDB table.

Thanks for your help !!


#2

Hello,

You will probably have more luck asking at a Spark or DynamoDB forum.

 Best, Oliver

#3

I believe you can just simply check for null on a.get(2) before running ddbMap.put(“Value_2”, Value2). If it is null, then skip it.