def updateTableRows(tableName : String, rdd : RDD[_]){
rdd.foreachPartition(p => {
var conn:Connection = null
try{
conn = getConnection()
val stmt = conn.createStatement()
p.foreach(x => {
var sql = ""
x match {
case ((p1:Int,p2:String),p3:Int) => sql = "insert into " + tableName + " " + tblToCols(tableName)(1) + " values " + "(" + p1 + ",'" + escapeChar(p2) + "'," + p3 + ");"
case ((p1:Int,p2:Int,p3:Int,p4:Int),p5:Int) => sql = "insert into " + tableName + " " + tblToCols(tableName)(1) + " values " + "(" + p1 + "," + p2 + "," + p3 + "," + p4 + "," + p5 + ");"
}
stmt.execute(sql);
})
}catch{
case e:Exception => e.printStackTrace()
sys.exit(1)
}finally{
if(conn != null){
conn.close()
}
}
})
}
def getConnection() : Connection = {
var conn : Connection = null
try{
Class.forName(driver)
conn = DriverManager.getConnection(url, user, password)
}catch{
case e:Exception => e.printStackTrace()
}
conn
}
阅读(978) | 评论(0) | 转发(0) |