抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

Spark.apache.org
Java Chen Spark

以下为手动计算流程 :

4.2 启动 spark-shell

1
SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/sqoop/mysql-connector-java-5.1.40.jar spark-shell

4.3 输入输出:相关变量

1
2
3
val inputTable = "mds_user_coupon_bhv"
val inputUrl = "jdbc:mysql://192.168.xxx.xx:3306/com_profile?user=your_name&password=your_password"
val outputTable = "mds_rs_shop_coupon_tmp"

4.4 核心程序代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.x.rs.service

import java.text.SimpleDateFormat
import java.util.Date
import java.util.Properties

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.mllib.recommendation.{ALS, Rating}

/**
* Date : 2017-04-20
* Author : Blair Chan
*/
object RsCouponCalc {
def main(args: Array[String]) {

println("start...")

if (args.length < 3) {
System.err.println("Usage: <file>")
System.exit(1)
}

val inputTable = args(0) // Should be some file on your system // conf = new SparkConf().setAppName(appName).setMaster("local");
val inputUrl = args(1)
val outputTable = args(2)

val conf = new SparkConf().setAppName("SparkRsOne");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// val rawData = sc.textFile(inputFile)
// val rawRatings = rawData.map(_.split("\t").take(3))

val url = inputUrl
val prop = new Properties()

val dfForRawData = sqlContext.read.jdbc(url, inputTable, prop)

val ratings_tmp = dfForRawData.map { row => (row(1).toString().toInt, row(4).toString().toInt, row(6).toString().toDouble) }

val ratings = ratings_tmp.map { case (uid, couponId, rating) => Rating(uid.toInt, couponId.toInt, rating.toDouble) }

val model = ALS.train(ratings, 50, 10, 0.01)

model.userFeatures.count

val K = 10

model.recommendProductsForUsers(K)

val originResultRdd1 = model.recommendProductsForUsers(K)

val curDate = new Date()
val createDateString = new SimpleDateFormat("yyyy-MM-dd").format(curDate)

val originResultRdd2 = originResultRdd1.map(tuple => {
val uid = tuple._1
val product = tuple._2.map { case Rating(user, product, score) => (product.toString, score.toString) }
(uid, product)
}).flatMap {
case (uid, product) => {
product.map { case (itemId, score) => Row.apply(uid.toLong, itemId.toString, score.toDouble, createDateString.toString) }
}
}

// println(originResultRdd2.first())


val schema = StructType(
StructField("uid", LongType) ::
StructField("coupon_id", LongType) ::
StructField("score", DoubleType) ::
StructField("calc_date", StringType) :: Nil)


val df = sqlContext.createDataFrame(originResultRdd2, schema)

df.insertIntoJDBC(url, outputTable, false)
// 设置为 true,则为 删除表,然后自动创建,再插入

println("end !")

}
}

DF 通过插入 RMDB. schema 可以通过反射来使得程序扩展性提高。

spark sql internet

Comments