1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| package com.x.rs.service
import java.text.SimpleDateFormat import java.util.Date import java.util.Properties
import org.apache.spark.sql.Row import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
object RsCouponCalc { def main(args: Array[String]) {
println("start...")
if (args.length < 3) { System.err.println("Usage: <file>") System.exit(1) }
val inputTable = args(0) // Should be some file on your system // conf = new SparkConf().setAppName(appName).setMaster("local"); val inputUrl = args(1) val outputTable = args(2)
val conf = new SparkConf().setAppName("SparkRsOne"); val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// val rawData = sc.textFile(inputFile) // val rawRatings = rawData.map(_.split("\t").take(3))
val url = inputUrl val prop = new Properties()
val dfForRawData = sqlContext.read.jdbc(url, inputTable, prop)
val ratings_tmp = dfForRawData.map { row => (row(1).toString().toInt, row(4).toString().toInt, row(6).toString().toDouble) }
val ratings = ratings_tmp.map { case (uid, couponId, rating) => Rating(uid.toInt, couponId.toInt, rating.toDouble) }
val model = ALS.train(ratings, 50, 10, 0.01)
model.userFeatures.count
val K = 10
model.recommendProductsForUsers(K)
val originResultRdd1 = model.recommendProductsForUsers(K)
val curDate = new Date() val createDateString = new SimpleDateFormat("yyyy-MM-dd").format(curDate)
val originResultRdd2 = originResultRdd1.map(tuple => { val uid = tuple._1 val product = tuple._2.map { case Rating(user, product, score) => (product.toString, score.toString) } (uid, product) }).flatMap { case (uid, product) => { product.map { case (itemId, score) => Row.apply(uid.toLong, itemId.toString, score.toDouble, createDateString.toString) } } }
// println(originResultRdd2.first())
val schema = StructType( StructField("uid", LongType) :: StructField("coupon_id", LongType) :: StructField("score", DoubleType) :: StructField("calc_date", StringType) :: Nil)
val df = sqlContext.createDataFrame(originResultRdd2, schema)
df.insertIntoJDBC(url, outputTable, false) // 设置为 true,则为 删除表,然后自动创建,再插入
println("end !")
} }
|
Checking if Disqus is accessible...