数据仓库课程项目经验心得

项目地址&README

项目地址:

https://github.com/Baokker/data-warehouse-project

其中内容包括:

  • 项目报告
  • 代码
    • 爬虫
    • 数据处理
    • 前端
    • 后端

两篇README的链接在此:

对项目的

  • 爬虫细节
  • 数据处理方法
  • 存储方案和优化方案

都做了比较多的介绍。这里我再谈一点我配置环境和数据处理中的心得。

pandas

pandas+jupyter真的是处理数据的一大利器,各位学一些pandas的基本语法加上jupyter的使用方法,数据处理的效率将会快的飞起

这里写一点我当时印象最深刻的一次debug。因为之前数据整理的格式不对,所以我在重新整理数据时,需要逐行处理,但是一开始时我的速度非常地慢,准确地说是一开始很快,然后越来越慢……大致代码如下:

1
2
3
4
5
6
7
for i,r in movies.iterrows():
if not pd.isna(r['Format']):
for f in str(r['Format']).split(','):
Format = Format.append({ 'format_name':f,'movie_id':r['ID'],'movie_title':r['Title']},ignore_index=True)
# 或者这种
# Format=pd.concat([Format,pd.DataFrame(data={'format_name':[f],'movie_id':[r['ID']]})],ignore_index=True)

可以看到,每次获取一行的数据后,我都需要重新concat整个数据,它的行数可能是一条,也可能是上万条,而后者的整理显然是费时的。假如处理的数据量为N,那么最后需要concat的量为1+2+3+..+N=O(N^2)!

正确的处理方法是,先用一个list装下所有单行的dataframe,再直接一次性concat,这样复杂度只有O(N),自然快了不少

1
2
3
4
5
6
7
8
9
10
11
12
13
collections=[]
count=0
for i,r in movies.iterrows():
if not pd.isna(r['Format']):
for f in str(r['Format']).split(','):
collections.append(pd.DataFrame(data={'format_name':[f],'movie_id':[r['ID']],'movie_title':r['Title']}))
count+=1
if count%1000==0:
print(count,end=' ')
gc.collect()

# 最后再concat 否则复杂度是N^2!
Format=pd.concat(collections,ignore_index=True)

另外,也要注意多看文档,多用pandas的内置函数,往往效率会高很多

安全性

配置项目的期间,我们组的两台服务器先后遭遇了黑客攻击,一次是我的MySQL被清空了数据,所幸数据早有备份;还有一次是组员的服务器被植入了xmrig挖矿病毒,最后也不知道怎么解决,直接把对应的端口关了,同时把植入的文件都删除了,算是勉强解决了问题

IMG_6501

在此提醒

  • 数据库密码一定要更改,不要设得太简单,例如123456,妥妥的被入侵
  • 尽量少开放端口,或者对端口开放的ip做限制

MySQL导入csv

我先在本地用pandas处理完数据,并将dataframe转为csv文件,再用sftp上传到服务器上,接下来,我再使用MySQL(使用docker部署)本地写入csv。一定要本地,不然速度会很慢,几万条的数据一条一条太慢了

首先开启本地模式进入MySQL

1
mysql -uroot --password --local-infile=1

再写入数据(已提前使用docker cp将对应csv数据拷贝到容器中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
load data local infile '/root/sql-data/Movie.csv'
into table Movie
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(movie_id,title,score,edition);

load data local infile '/root/sql-data/Asin.csv'
into table Asin
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(asin,movie_id);

load data local infile '/root/sql-data/Actor.csv'
into table Actor
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(name,actor_id);

load data local infile '/root/sql-data/Director.csv'
into table Director
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(name,director_id);

load data local infile '/root/sql-data/Format.csv'
into table Format
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(format_name,movie_id,movie_title,format_id);

load data local infile '/root/sql-data/Genre.csv'
into table Genre
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(genre_id,genre_name,movie_id,movie_title);

load data local infile '/root/sql-data/ReleaseDate.csv'
into table ReleaseDate
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(date,movie_id,year,month,day,season,weekday,time_id);

load data local infile '/root/sql-data/Act.csv'
into table Act
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(actor_id,movie_id,movie_title);

load data local infile '/root/sql-data/Direct.csv'
into table Direct
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(director_id,movie_id,movie_title);

load data local infile '/root/sql-data/Direct.csv'
into table Direct
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(director_id,movie_id,movie_title);

load data local infile '/root/sql-data/Cooperation.csv'
into table Cooperation
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(left_person_id,right_person_id,movie_id,type);

load data local infile '/root/sql-data/Review.csv'
into table Review
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(@dummy,asin,@dummy,username,helpfulness,review_score,review_time,review_summary,review_text,review_id,movie_id);

load data local infile '/root/sql-data/ReviewNoText.csv'
into table Review
fields terminated by ','
lines terminated by '\n'
ignore 1 rows
(asin,username,helpfulness,review_score,review_time,review_summary,review_id,movie_id);

Spark

项目要求建立起基于Spark的分布式查询系统。作为这个需求的负责人,我十分负责任地说这个需求非常逆天,配置难度个人认为比另外两个数据存储方案(关系型的MySQL和图数据库neo4j)都高出了不少

最后我们组(事实上还有另外两组,互帮互助)都采用了这篇方案

https://blog.csdn.net/sirobot/article/details/121750358

https://github.com/bambrow/docker-hadoop-workbench

这套方案的技术栈也比较复杂,包括

  • docker(使用docker模拟多台机器节点)
    • docker ce
    • docker compose
  • hadoop(分布式文件系统和MapReduce算法)
  • hive(对分布式文件系统进行类似于SQL的处理)
  • spark(在内存中计算,速度相较于MapReduce有大幅提升)

对命令行操作,阅读官方文档能力,debug能力,以及心理承受能力都有很高的要求

仿照这个项目给的参考例子,经过一番魔改和若干次宕机后,我总算让这个项目在我的服务器上跑了起来。接下来整理一下我配置环境中用到的步骤,及其对应的作用和命令

tips

  • 多读官方文档,哪怕觉得英文难读。你永远不知道过时的中文文档会给你带来多大的困难
  • 多用英文,谷歌搜索

配置

跑通这个项目,至少得是腾讯云4核8G的轻量应用服务器的配置(新人价388),内存越大越好。如果没有的话,考虑报销入一台,或者部署在本地电脑上

docker compose 安装

1
sudo curl -SL https://github.com/docker/compose/releases/download/v2.14.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose

clone项目

1
2
3
git clone https://gitee.com/horysk/docker-hadoop-workbench.git
cd docker-hadoop-workbench
chmod a+x *.sh # 添加运行权限

接下来修改配置文件,包括spark的运行内存,hive的数据库密码,docker-compose的配置文件等,具体可参考我在Github上的代码:

https://github.com/Baokker/docker-hadoop-workbench

修改完即可运行

1
2
3
# 运行 停止
sudo ./start_demo.sh
sudo ./stop_demo.sh

复制csv数据到hadoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 辅助容器
sudo docker run -d --network hadoop --env-file hadoop.env --name hadoop-base bde2020/hadoop-base:2.0.0-hadoop3.2.1-java8 tail -f /dev/null

# hadoop
sudo docker cp sql-data/ hadoop-base:/
sudo docker exec -it hadoop-base hdfs dfs -mkdir -p /dw_movie/
sudo docker exec -it hadoop-base bash

# in hadoop container
# 整理了一番..
hdfs dfs -put sql-data/* /dw_movie/
hdfs dfs -ls /dw_movie

# result
Found 11 items
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Act
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Actor
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Asin
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Cooperation
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Direct
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Director
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Format
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Genre
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Movie
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/ReleaseDate
drwxr-xr-x - root supergroup 0 2022-12-06 17:03 /dw_movie/Review

hdfs dfs -ls /dw_movie/Act

# result
Found 1 items
-rw-r--r-- 3 root supergroup 10736361 2022-12-06 17:03 /dw_movie/Act/Act.csv

hive建表

1
2
3
4
5
6
7
# hive
sudo docker cp create-table-postgresql.sql hive-server:create-table-postgresql.sql

# in hive container
sudo docker exec -it hive-server bash
cd ..
hive -f create-table-postgresql.sql

create-table-postgresql.sql 见Github项目

Spark配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# spark
sudo docker exec -it spark-master bash

# WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
cd docker-hadoop-workbench/
sudo docker-compose restart spark-worker

# in spark container
spark/bin/spark-shell
spark/bin/spark-sql

# 修改配置,原项目中有两个spark-defaults.conf,都要改,把16g改成4g
apk add --update vim # 安装vim
vim spark/conf/spark-defaults.conf

# in sql
show databases;
use dw_movie;
select * from movie where movie_id between 12 and 20;

安装pyspark

咱就是说,教程还是国外的劲

https://levelup.gitconnected.com/how-to-delete-and-install-pyspark-on-ubuntu-4e1bbefa11a3

debug:Spark找不到hadoop

这个bug卡了好几天……大概就是说,Spark可以运行,但是却找不到hdfs对应的文件位置。最后在原Github项目的issue里找到了问题:本地并不知道namenode表示什么。需要手动配置host

https://github.com/big-data-europe/docker-hadoop/issues/98

add hostname to /etc/hosts

1
2
127.0.0.1        localhost   namenode datanode1 datanode2
::1 localhost namenode datanode1 datanode2

运行

安装pyspark,然后尝试运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SparkContext, SparkConf
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, HiveContext

spark_session = (SparkSession
.builder
.master("local")
.appName('spark_hive')
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate()
)

spark_session.sql('show databases').show()
spark_session.sql('use dw_movie')
spark_session.sql('show tables').show()

spark_session.sql('select * from movie where movie_id between 12 and 20').show()

运行出来的时候,有一种回家的美好

总结

数仓这门项目,果真是让我收货良多啊…


数据仓库课程项目经验心得
http://baokker.github.io/2022/12/17/数据仓库课程项目经验心得/
作者
Baokker
发布于
2022年12月17日
更新于
2022年12月30日
许可协议