tang45 space


  • Home

  • Archives

  • Tags

[学习笔记]编写hive udf

Posted on 2019-09-15

udf是什么?
当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数。

udf编写过程
1、创建一个java类
2、继承udf类(如果需要处理复杂类型参数,可以继承抽象类GenericUDF)
3、重写evaluate方法
4、编译成jar包
5、在hive执行add jar方法

1
hive> add jar /home/your_name/hive-udf-1.0.0-SNAPSHOT.jar;

6、在hive执行创建模板函数

1
hive> create function your_func as 'your_udf_class'

7、hive中使用

学习笔记-Spark中的groupbykey、reducebykey、combinebykey方法

Posted on 2019-01-21

groupByKey是对具有相同键的值进行分组,reduceByKey是合并具有相同键的值。

groupByKey合并之后会产生一个Iterable,reduceByKey不会。

数据量大的时候一般建议用reduceByKey,因为reduceByKey会在shuffling之前就聚合数据,而groupByKey是在之后。

combineByKey需要定义三个方法,第一组合,第二合并值,第三合并组合器,看起来比较适合复杂的计算。

自己写的实践代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.practice_spark

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

/*
学习一下 groupbykey reducebykey combinebykey 这三个方法
参考文档: https://github.com/vaquarkhan/Apache-Kafka-poc-and-notes/wiki/reduceByKey--vs-groupBykey-vs-aggregateByKey-vs-combineByKey
*/

object SparkHomework {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("SparkPractice")
.getOrCreate()

// load 商品信息表
val df = spark.read.parquet(
"your_parquet_file")

val rows: RDD[Row] = df.rdd
val pairs = rows.map(x => (x(4).toString, (x(3).toString.toDouble, 1)))

// 统计一下不同类型的商品的平均价格
groupbykeySample(pairs).saveAsTextFile("your_path_1")
reducebykeySample(pairs).saveAsTextFile("your_path_2")
combinebykeySample(rows).saveAsTextFile("your_path_3")
}

def groupbykeySample(
rdd: RDD[(String, (Double, Int))]): RDD[(String, Double)] = {
rdd
.groupByKey()
.map(t => {
val item = t._2
var totalPrice = 0.0
var totalNum = 0
for (a <- item) {
totalPrice += a._1
totalNum += a._2
}
(t._1, totalPrice / totalNum)
})
}

def reducebykeySample(
rdd: RDD[(String, (Double, Int))]): RDD[(String, Double)] = {
rdd
.reduceByKey {
case ((sumL, countL), (sumR, countR)) =>
(sumL + sumR, countL + countR)
}
.mapValues {
case (sum, count) => sum / count
}
}

def combinebykeySample(rdd: RDD[Row]): RDD[(String, Double)] = {
val pairs = rdd.map(x => (x(4).toString, x(3).toString.toDouble))
val createCombiner = (price: Double) => (1, price)

type priceCollector = (Int, Double)
val mergeValue = (collector: priceCollector, price: Double) => {
val (numberPirce, totalPrice) = collector
(numberPirce + 1, totalPrice + price)
}

val mergeCombiners =
(collector1: priceCollector, collector2: priceCollector) => {
val (numPrice1, totalPrice1) = collector1
val (numPrice2, totalPrice2) = collector2

(numPrice1 + numPrice2, totalPrice1 + totalPrice2)
}

type goodsPrice = (String, (Int, Double))

val scores = pairs
.combineByKey(createCombiner, mergeValue, mergeCombiners)
.map { case (key, value) => (key, value._2 / value._1) }
scores
}
}

kaggle入门第一题-Titanic

Posted on 2018-08-03

题目

题目链接

训练和测试数据

认识数据

导入训练集和训练集,使用SparkSQL探索数据集,发现数据的大致分布和规律
1
2
3
4
5
6
7
8
9
10
11
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder.appName("example").getOrCreate()

val df = sparkSession.read.option("header", "true").option("inferSchema", "true").csv("/YourPath/.../train.csv")

df.printSchema()

df.describe("Survived","Pclass","Sex","Age","SibSp","Parch","Ticket","Fare","Cabin","Embarked").show(false)

df.createGlobalTempView("people")

1-pic

1
2
// 舱位对存活率的影响
spark.sql("WITH t AS(SELECT count(*) as c,Survived,Pclass FROM global_temp.people group by Survived,Pclass order by Pclass) select Pclass,sum(case Survived when 1 then c else 0 end) survive_num, sum(c) total_num, round((sum(case Survived when 1 then c else 0 end)/sum(c)),4) survive_ratio from t group by Pclass").show()

1
2
// 年龄对存活率的影响
spark.sql("WITH t AS(SELECT count(*) as c,Survived,Floor(Mod(Age,10)) age_rank FROM global_temp.people group by Survived,age_rank order by age_rank) select age_rank,sum(case Survived when 1 then c else 0 end) survive_num, sum(c) total_num, round((sum(case Survived when 1 then c else 0 end)/sum(c)),4) survive_ratio from t group by age_rank").show()
1
2
// 性别对存活率的影响
spark.sql("WITH t AS(SELECT count(*) as c,Survived,Sex FROM global_temp.people group by Survived,Sex order by Sex) select Sex,sum(case Survived when 1 then c else 0 end) survive_num, sum(c) total_num, round((sum(case Survived when 1 then c else 0 end)/sum(c)),4) survive_ratio from t group by Sex").show()

2-pic

1
2
// 兄弟姐妹/配偶对存活率的影响
spark.sql("WITH t AS(SELECT count(*) as c,Survived,SibSp FROM global_temp.people group by Survived,SibSp order by SibSp) select SibSp,sum(case Survived when 1 then c else 0 end) survive_num, sum(c) total_num, round((sum(case Survived when 1 then c else 0 end)/sum(c)),4) survive_ratio from t group by SibSp").show()
1
2
// 父母/孩子的数量对存活率的影响
spark.sql("WITH t AS(SELECT count(*) as c,Survived,Parch FROM global_temp.people group by Survived,Parch order by Parch) select Parch,sum(case Survived when 1 then c else 0 end) survive_num, sum(c) total_num, round((sum(case Survived when 1 then c else 0 end)/sum(c)),4) survive_ratio from t group by Parch").show()
1
2
// 登船港口对存活率的影响
spark.sql("WITH t AS(SELECT count(*) as c,Survived,embarked FROM global_temp.people group by Survived,embarked order by embarked) select embarked,sum(case Survived when 1 then c else 0 end) survive_num, sum(c) total_num, round((sum(case Survived when 1 then c else 0 end)/sum(c)),4) survive_ratio from t group by embarked").show()
1
2
// 年龄对存活率的影响
spark.sql("WITH t AS(SELECT count(*) as c,Survived,Floor(Mod(Fare,10)) fare_rank FROM global_temp.people group by Survived,fare_rank order by fare_rank) select fare_rank,sum(case Survived when 1 then c else 0 end) survive_num, sum(c) total_num, round((sum(case Survived when 1 then c else 0 end)/sum(c)),4) survive_ratio from t group by fare_rank").show()
1
2
// 姓名对存活率的影响
spark.sql("WITH t AS(SELECT count(*) as c,Survived,regexp_extract(Name, '(,)([^\\.]+)', 2) name_fix FROM global_temp.people group by Survived,name_fix order by name_fix) select name_fix,sum(case Survived when 1 then c else 0 end) survive_num, sum(c) total_num, round((sum(case Survived when 1 then c else 0 end)/sum(c)),4) survive_ratio from t group by name_fix").show()

特征工程

根据之前对数据的探索,我们可以发现性别、年龄、家庭人数对存活率的影响,我们将其他特征做以下处理:
特征 描述 处理方式
survival 生存 当作label
pclass 票类别 onehot
Name 姓名 第一次不使用用,第二次用中间名做onehot
sex 性别 onehot
Age 年龄 用均值补全
sibsp 兄弟姐妹/配偶 直接使用
parch 父母/孩子的数量 直接使用
ticket 票号 不使用
fare 乘客票价 直接使用
cabin 客舱号码 不使用
embarked 登船港口 onehot

模型训练

由于特征较少,我们可以使用随机森林用作训练模型,代码可参考:github

提交预测数据

第一次没有使用姓名这个特征,第二使用以后排名提升了不少:3-pic

urllib, urllib2的一点区别

Posted on 2015-12-08

用一个命令行翻译小工具dict,发现只能翻译英文,不能翻译中文,阅读源代码,发现使用的是urllib2下的urlopen方法去获取有道翻译api的返回结果。

输入中文返回结果显示乱码,然而使用urllib的urlopen方法却能返回正常的结果,于是去查看python源码查找原因。

在urllib中:

1
2
3
4
5
6
7
8
9
10
11
12
13
def urlopen(url, data=None, proxies=None):
global _urlopener
if proxies is not None:
opener = FancyURLopener(proxies=proxies)
elif not _urlopener:
opener = FancyURLopener()
_urlopener = opener
else:
opener = _urlopener
if data is None:
return opener.open(url)
else:
return opener.open(url, data)

它创建了类 FancyURLopener 对象,并调用了它的 open方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def open(self, fullurl, data=None):
fullurl = unwrap(toBytes(fullurl))
fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|")
if self.tempcache and fullurl in self.tempcache:
filename, headers = self.tempcache[fullurl]
fp = open(filename, 'rb')
return addinfourl(fp, headers, fullurl)
urltype, url = splittype(fullurl)
if not urltype:
urltype = 'file'
if urltype in self.proxies:
proxy = self.proxies[urltype]
urltype, proxyhost = splittype(proxy)
host, selector = splithost(proxyhost)
url = (host, fullurl) # Signal special case to open_*()
else:
proxy = None
name = 'open_' + urltype
self.type = urltype
name = name.replace('-', '_')
if not hasattr(self, name):
if proxy:
return self.open_unknown_proxy(proxy, fullurl, data)
else:
return self.open_unknown(fullurl, data)
try:
if data is None:
return getattr(self, name)(url)
else:
return getattr(self, name)(url, data)
except socket.error, msg:
raise IOError, ('socket error', msg), sys.exc_info()[2]

主要是第三行中调用了quote方法,会将字符串url编码,所以有道的api可以解析,而urllib2不会将其转码,这里就不贴代码了。
改写一下,这个小工具就支持中英互译了:dict,使用方法:

1
2
3
4
5
dict dick
###################################
# dick 迪克 (U: dik E: dik )
# n. 阴茎,鸡巴;侦探;誓言
###################################
1
2
3
4
5
dict 鸡巴
###################################
# 鸡巴 dick (拼音: jī bā )
# dick
###################################

[感想] 随处可见数学之美

Posted on 2015-08-12

Alt text
看到这么一道题,题目很简单,但是有2个要求:
1) 给定了第一行:

1
for i in range(1,input()):
你只能往里再添加一行

2) 不能使用字符串

我的思考过程如下:
1.既然题目不能使用字符串,就不能使用拼接字符串的方法解题,那么这道题很有可能是找规律然后输出。
2.我发现了规律:
1x1
2x11
3x111
4x1111
…
3.根据这个规律,我们需要i以及10的i次方的一个累加,这里需要再定义一个变量,使其每次进行一个累加,顺着这个思路往下想的话,此题无解,没有办法只添加一行去实现它。
4.我后来发现了它其实还有另外一个规律:
1x10/9=1.11111111
2x100/9=22.2222222
3x1000/9=333.3333333
我们知道,python中的整数运算是会将小树转换成整数的,所以最后只要:

1
2
for i in range(1,input()):
print (pow(10, i)*i)/9

就可以输出题目要求的结果。

虽然这只是一个小问题,但是却可以从中发现数学的美妙。

用gulp-requirejs-optimize优化压缩js

Posted on 2015-08-07

优化前:
Alt text

(不仅丑,而且加载速度较慢)
优化后的结果:
Alt text
1.首先确保你的网站使用requirejs去模块化地管理js依赖

2.假如你已经装好了node.js,接下来我们需要安装gulp、uglify、gulp-requirejs-optimize、gulp-concat

1
2
3
4
$npm install gulp
$npm install uglify
$npm install gulp-requirejs-optimize
$npm install gulp-concat

以下是我的requirejs配置文件(main.js):
Alt text
3.然后在你的gulp中,添加一个gulp任务,执行它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var gulp = require("gulp");
var uglify = require("gulp-uglify");
var requirejsOptimize = require('gulp-requirejs-optimize');
var concat = require('gulp-concat');

gulp.task(“js-optimizert", function () {
return gulp.src(‘你的requirejs的配置文件')
.pipe(requirejsOptimize(function() {
//requirejsOptimize会根据你的配置文件去找到所有依赖的js文件
return {
mainConfigFile:
'你的requirejs的配置文件'
};
}))
.pipe(uglify()) //使用uglifyjs去压缩你的js
.pipe(concat("main-built.js”)) //将其合并为main-built.js

.pipe(gulp.dest("目的地址"));
});

4.最终只需要加载压缩后的main-built.js就可以代替之前requirejs管理的所有依赖的js文件,是不是很棒?

[学习笔记]mac配置nginx+laravel+php-fpm

Posted on 2015-07-16

首先先安装好brew,接下来
安装nginx

1
$brew install nginx

打开nginx

1
$nginx

nginx
重新加载|重启|停止|退出 nginx

1
$nginx -s reload|reopen|stop|quit

安装php

1
2
3
4
5
6
7
8
9
10
11
12
13
$brew tap homebrew/dupes
$brew tap josegonzalez/homebrew-php
$brew install php56
--with-debug
--with-fpm
--with-gmp
--with-homebrew-openssl
--with-imap --with-intl
--with-libmysql
--without-bz2
--without-mysql
--without-p:qcntl
--without-pear

5.1版的laravel需要php>=5.5.9,建议直接更新至php5.6

nginx配置
修改/usr/local/etc/下的nginx.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
server {

listen 你的端口号;
server_name 你的域名;
set $root_path '你的laravel项目路径,记得加上/public';
root $root_path;

index index.php index.html index.htm;

location / {
try_files $uri $uri/ /index.php?$query_string;
}

location ~ \.php {

fastcgi_pass 127.0.0.1:9000;
fastcgi_index /index.php;

include fastcgi_params;

fastcgi_split_path_info ^(.+\.php)(/.+)$;
fastcgi_param PATH_INFO $fastcgi_path_info;
fastcgi_param PATH_TRANSLATED $document_root$fastcgi_path_info;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
}

location ~* ^/(css|img|js|flv|swf|download)/(.+)$ {
root $root_path;
}

location ~ /\.ht {
deny all;
}
}

启动php-fpm

1
$sudo php-fpm

因为nginx本身不能处理PHP,它只是个web服务器,当接收到请求后,如果是php请求,则发给php解释器(PHP-fpm)处理,并把结果返回给客户端。

如果你的laravel项目时从git仓库里clone下来的,需要先composer install一下,它的使命就是帮你为项目自动安装所依赖的开发包,否则会找不到vendor文件。

如果提示”Mcrypt PHP extension required”(老版本laravel),则需要下载mcrypt:brew install mcrypt。

1
$brew install php56-mcrypt

tang45

tang45

7 posts
16 tags
GitHub 知乎
© 2019 tang45
Powered by Hexo
Theme - NexT.Muse