数据仓库ETL爬虫&数据处理小记

Github Repository

这个链接将在本学期末本人将此repository设为public后可以访问:

https://github.com/Baokker/data-warehouse/tree/main/homework-1-etl

要求

1)获取用户评价数据中的7,911,684个用户评价

2)从Amazon网站中利用网页中所说的方法利用爬虫获取253,059个Product信息页面

3)挑选其中的电影页面,通过ETL从数据中获取

  • 电影ID,评论用户ID,评论用户ProfileName,评论用户评价Helpfulness,评论用户Score,评论时间Time,评论结论Summary,评论结论Text,电影上映时间,电影风格,电影导演,电影主演,电影演员,电影版本等信息

4)在网页中不同网页可能是相同的电影(如同一部电影的蓝光、DVD版本,同一部电影的不同语言的版本等),通过ETL对相同的电影(需要给出你所认为的相同的定义)进行合并

5)在网页中电影演员、电影导演、电影主演等会出现同一个人但有不同名字的情况(如middle name,名字缩写等),通过ETL对相同的人名进行合并

4)在网页中部分电影没有上映时间,可以通过第三方数据源(如IMDB、豆瓣等)或者从评论时间来获取

5)通过ETL工具存储Amazon页面和最终合并后的电影之间的数据血缘关系,即可以知道某个电影的某个信息是从哪些网站或者数据源获取的,在合并的过程中最终我们采用的信息是从哪里来的。

  • 可以参考的工具:
  1. Pentaho Data Integration: https://sourceforge.net/projects/pentaho/ (Links to an external site.)
  1. Web爬虫:https://scrapy.org

Tree(文件大纲)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
.
├── Data
│ └── Raw # 原本的数据
├── FilmInfoSpider # 爬取电影信息,并存储电影
│ ├── FilmInfoSpider
│ ├── WebPages # 存储的网页
│ └── productId.csv # 产品ASIN值表
├── JupyterScripts
│ ├── extract_movies_txt.ipynb # 提取movies.txt中的信息(comments,product id)
│ ├── extract_no_title_prime_video_id.ipynb # 提取没有标题的数据的ASIN值
│ ├── get_date_from_comments.ipynb # 从评论获取日期
│ ├── merge_movies_info.ipynb # 合并电影信息(重新补爬的信息)
│ ├── merge_same_title_and_record_source.ipynb # 合并同一部电影,并记录数据血缘
│ └── merge_similar_names.ipynb # 合并相似人名
├── PrimeVideoInfoSpider # 补爬Prime Video标题的爬虫脚本
├── README.assets # 文档配图
└── README.md # 文档

获取用户评价

首先打开要求中给出的snap网页,观察可发现,它提供了一个下载链接,用于下载十多年来由约九十万用户发出的与二十五万产品相关的近八百万条评论。

image-20221020205139744

下载压缩包后解压,会得到一个将近9G的movies.txt文件,里面的文本格式基本如下所示,其中productId即为每个产品对应的亚马逊产品号(ASIN),并且都是唯一的。

image-20221020205159989

由于文本格式非常整齐,因此可以直接用Python逐行读取,提取内容。

具体代码见extract_movies_txt.ipynb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import pandas as pd

i = 0

productId = []
userId = []
profileName = []
helpfulness = []
score = []
time = []
summary = []
text = []

for line in open("../Data/Raw/movies.txt", 'r', encoding='UTF-8', errors='ignore'):
split = line.split(' ', 1)
# print(split)
if split == []:
continue

if split[0] == "product/productId:":
productId.append(split[1])
elif split[0] == "review/userId:":
userId.append(split[1])
elif split[0] == "review/profileName:":
profileName.append(split[1])
elif split[0] == "review/helpfulness:":
helpfulness.append(split[1])
elif split[0] == "review/score:":
score.append(split[1])
elif split[0] == "review/time:":
time.append(split[1])
elif split[0] == "review/summary:":
summary.append(split[1])
elif split[0] == "review/text:":
text.append(split[1])
i += 1

if i % 1000000 == 0:
print(i)

print(i)
# 字典中的key值即为csv中列名
dataframe = pd.DataFrame({'productId':productId,'userId':userId,'profileName':profileName,'helpfulness':helpfulness,'score':score,'time':time,'summary':summary,'text':text})

# 将DataFrame存储为csv,index表示是否显示行名,default=True
dataframe.to_csv("comments.csv",index=False,sep=',')

逐行读取并用split函数提取内容,再用pandas导出为csv保存,最后得到一个约8G的csv文件,里面包含评论的各种信息

image-20221017193813962

可以看到,提取过程中并没有对换行做出很好的处理,因此用pandas导入后再进行处理,去除换行(get_date_from_comments.ipynb

1
2
3
4
5
comments['productId'] = comments['productId'].str.strip()
comments['userId'] = comments['userId'].str.strip()
comments['profileName'] = comments['profileName'].str.strip()
comments['helpfulness'] = comments['helpfulness'].str.strip()
comments['summary'] = comments['summary'].str.strip()

可以看到,经过处理后,基本达到了一个较好的效果

image-20221020205034420

爬取网页

获取ASIN

由官网介绍可得,每件产品对应的网址,其实就是amazon.com/dp/$(ASIN),其中ASIN为对应的产品编号

image-20221020205208740

image-20221020205225156

为此,首先在movies.txt中提取ASIN(extract_movies_txt.ipynb),并保存为productId.csv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pandas as pd

productId = []
for line in open("../Data/Raw/movies.txt", 'r', encoding='UTF-8', errors='ignore'):
split = line.split()
# print(split)
if split == []:
continue
if split[0] == "product/productId:":
if split[1] not in productId:
productId.append(split[1])
# print(productId)

# 字典中的key值即为csv中列名
dataframe = pd.DataFrame({'productId':productId})

# 将DataFrame存储为csv,index表示是否显示行名,default=True
dataframe.to_csv("productId.csv",index=False,sep=',')

定位网页内容

接下来对网页内容进行爬取。在此处使用Scrapy+Selenium进行爬取网页内容,具体内容如下:

首先安装Scrapy,再使用scrapy自带的命令行查看和分析单个网页

https://www.amazon.com/dp/B00006HAXW/

1
scrapy shell 'https://www.amazon.com/dp/B00006HAXW/'

之后可以在命令行中通过view(response)方法查看页面,也可以通过response.xpath('...').get()的方法获取数据。(xpath是一种XML路径语言,相比css更容易确定HTML页面中的位置

xpath的语法不难,但是浏览器自带的开发者工具提供了更为方便的方式。在网页内容中右键选中对应的标签块(例如,<span><h1),右键即可复制其对应的XPath,并查看相关内容

Snipaste_2022-10-12_15-36-44

经观察,亚马逊的电影网页分成正常和Prime Video两种

正常页面如下

image-20221020205501754

经观察,爬取了这部分以及标题的内容

image-20221020205514216

Prime Video的页面如下

image-20221020205807719

主要选取了以下部分内容

image-20221020205821572

开爬

接下来使用scrapy创建项目

1
scrapy startproject FilmInfoSpider

FilmInfoSpider/FilmInfoSpider下建立py文件,并写入爬虫脚本,爬取的链接为上文中提取的25w个产品对应的网址

核心部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def parse(self, response):
# 初始化yield返回数据
attributes = {'ASIN':'','Title':'','Language':'','Release date':'','Date First Available':'','Run time':'','Producers':'','Directors':'','Writers':'','Actors':'','Media Format':'','Subtitles':'', 'Genres':''}

# 确定ASIN值
asin_begin_position = 26
asin_length = 10
attributes['ASIN'] = response.url[asin_begin_position:asin_begin_position + asin_length]

# 判断类别
product_type = response.xpath('//*[@id="nav-search-label-id"]/text()').get()

# 如果都不是 直接return
if product_type != 'Movies & TV' and product_type != 'Prime Video':
return

# 写入文件 以备不时之需
path = 'WebPages/'
# path = '/Volumes/PortableSSD/WebPages'
filename = f'{path}/{attributes["ASIN"]}.html'
with open(filename, 'wb') as f:
f.write(response.body)

# Movies & TV
if product_type == 'Movies & TV':
# 获取标题
Title = response.xpath('//*[@id="productTitle"]/text()').extract_first().strip()
attributes['Title'] = Title

# 获取Product details
result = response.xpath('//*[@id="detailBullets_feature_div"]/ul/li/span/span/text()').getall()
result = [r.replace(':','').replace('\u200f','').replace('\u200e','').replace('\t', '').replace('\n', '').replace('\r', '').strip() for r in result]

columns = result[0::2]
value = result[1::2]

for i in range(len(columns)):
for key in attributes.keys():
if columns[i] in key:
attributes[key] = value[i]

# Prime Video
elif product_type == 'Prime Video':
# 是否为电影
if '"titleType":"movie"' not in response.xpath('//*[@id="a-page"]/div[2]/script[15]/text()').get():
return

# Title = response.xpath('//*[@id="a-page"]/div[2]/div[4]/div/div/div[2]/div[3]/div/h1/text()').get() # 原先代码有误
Title = response.xpath('//*[@id="a-page"]/div[2]/div[4]/div/div/div[2]/div[2]/div/h1/text()').get()
if Title == None:
Title = response.xpath('//*[@id="a-page"]/div[2]/div[4]/div/div/div[2]/div[1]/div/h1/text()').get()

attributes['Title'] = Title

columns_1 = response.xpath('//*[@id="btf-product-details"]/div/dl/dt/span/text()').getall()
value_1 = response.xpath('//*[@id="btf-product-details"]/div/dl/dd/*/text()').getall()

for i in range(len(columns_1)):
for key in attributes.keys():
if columns_1[i] in key:
attributes[key] = value_1[i]

columns_2 = response.xpath('//*[@id="meta-info"]/div/dl/dt/span/text()').getall()
for i in range(len(columns_2)):
# 根据columns内容判断
if columns_2[i] == 'Directors' or columns_2[i] == 'Genres':
attributes[columns_2[i]] = response.xpath('//*[@id="meta-info"]/div/dl[' + str(i + 1) + ']/dd/a/text()').get()
elif columns_2[i] == 'Starring':
attributes['Actors'] = str(response.xpath('//*[@id="meta-info"]/div/dl[' + str(i + 1) + ']/dd/a/text()').getall())[1:-2].replace("'",'')

yield attributes

具体来说,首先根据搜索框里类别来判断网页的类型,如果是Movies & TV,那么就按普通页面的的内容获取标题和内容;如果是Prime Video,则再判断该类型是否为movie,是则提取网页内容。

此外,为以防万一,将每次访问的网页内容也写入到文件,方便后续调用

反反爬

然而亚马逊具有很强的反爬机制,没过多久就要求我输入验证码验证,大致图片如下:

img

另外,对于请求过度频繁的ip,亚马逊也会禁止其访问内容

对此,需要进行反反爬措施。有ip池代理+伪造请求头的方法,也有使用Selenium模拟手动打开的方法。Selenium本身是为Web浏览器提供的一个测试工具,为测试的自动化提供了一系列方法,由于其在某种程度上说类似于模拟人进行浏览器的操作,相比单纯的发送ip请求,可以绕过更多限制,减少反爬的可能,因此也有越来越多采用Selenium进行爬虫的方案。此处采用了Selenium作为scrapy的middleware,先用Selenium打开一个浏览器,再打开制定的网页,并作为response返回。

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def process_request(self, request, spider):
# Called for each request that goes through the downloader
# middleware.

# Must either:
# - return None: continue processing this request
# - or return a Response object
# - or return a Request object
# - or raise IgnoreRequest: process_exception() methods of
# installed downloader middleware will be called
self.driver.get(request.url+"/?language=en_US")
self.driver.refresh()

# 检测机器人
robot_sentence = "Sorry, we just need to make sure you're not a robot. For best results, please make sure your browser is accepting cookies."
if robot_sentence in self.driver.page_source:
from lxml import etree
html = etree.HTML(self.driver.page_source)

from amazoncaptcha import AmazonCaptcha
link = html.xpath("/html/body/div/div[1]/div[3]/div/div/form/div[1]/div/div/div[1]/img/@src")[0]
captcha = AmazonCaptcha.fromlink(link)
solution = captcha.solve()

from selenium.webdriver.common.by import By
input_element = self.driver.find_element(By.ID,"captchacharacters")
input_element.send_keys(solution)

button = self.driver.find_element(By.XPATH,"//button")
button.click()
time.sleep(3)

source = self.driver.page_source
response = HtmlResponse(url = self.driver.current_url, body = source, request = request, encoding = 'utf-8')
return response

这里主要有两个要点:

  • 经过一定的爬取后,发现数据并不完整,经过debug发现,网页在加载时不一定能完全加载完,下面的Product details可能不会加载成功,而是变成一段sorry的话,再刷新后才会正常显示。因此在每次Selenium获取网页后,调用refresh()方法再次刷新
  • 爬取时有较小概率出现验证码的问题。经过查阅,发现github上有一库为amazoncaptcha,可以根据传入的图片链接获得对应的验证码数字。于是先用xpath获取验证码图片的链接,再用amazoncaptcha得出结果,并用Selenium模拟输入验证码并按下按钮,即可解决验证码的问题

解决这些问题后,连开五个进程进行了爬取:

Snipaste_2022-10-12_11-00-55

(忘记截屏15w-2ow的了..)

1-5

5-10

10-15

20-25

补爬

爬取完数据后,发现部分数据没有Title,检查后发现是Prime video检索标题的Xpath有误

image-20221017174931183

所幸在之前保留了网页的文件,因此修改代码后,使用scrapy对本地文件爬取即可

1
scrapy startproject PrimeVideoSpider

由于仅是本地文件,也不存在反爬的可能,直接用默认的middleware即可

合并相同电影

观察电影标题,可以发现很多电影其实是一部,无非是版本(VHS,DVD,Blu-ray),或者是放映年份的区别。为保证数据的质量,对相似的电影进行合并

image-20221017181359390

在此处,主要采用pandas+fuzzywuzzy的方法合并数据。pandas是一个著名的python的数据分析库,在处理数据方面工具齐全,性能较快;fuzzywuzzy是一个匹配字符串的库,采用Levenshtein Distance计算字符串直接的相似度

image-20221021161242643

image-20221022131351128

针对合并相同电影的需求,给出如下方法:

  1. 首先去除Title中关于版本的信息(VHS,DVD等),并删除最外边多余的引号
  2. 利用Levenshtein Distance计算相似度,选取那些得分高于95的,视为同一电影
  3. 在合并电影时
    • Title选名字最短的
    • Release Date,Date First Available选最早的
    • Run time,Producers,Directors,Writers,Actors,Genres一般出现的都不会有不同,选取第一个出现的
    • Producers,Directors,Writers,Actors使用列表分割
    • Media Format,Subtitles将所有可能的结果纳入到一个集合中

核心代码见merge_same_title_and_record_source.ipynb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import datetime

i = 0

merged_movies_collection = pd.DataFrame(columns=['Title','Language','Release date','Date First Available','Run time','Producers','Directors','Writers','Actors','Media Format','Subtitles','Genres'])
merged_movies_source_asin_collection = pd.DataFrame(columns=['Title','Language','Release date','Date First Available','Run time','Producers','Directors','Writers','Actors','Media Format','Subtitles','Genres'])

whole_length = movies_info.shape[0]

for index, row in movies_info.iterrows():
i += 1
if whole_length - index <= 10: # 不能直接用shape[0],因为会不断变化
end = whole_length
else:
end = index + 10

similar_title_info = process.extract(row['Title'], movies_info.loc[index:end, 'Title'] , limit = 10)
similar_title_order = [s[2] for s in similar_title_info if s[1] >= 95]

# WARNING:root:Applied processor reduces input query to empty string, all comparisons will have score 0. [Query: '-']
# '$','-'
if similar_title_order == [] and index in movies_info.index:
similar_title_order = [index]
# has been added
else if similar_title_order == []:
continue

for_merge_info = movies_info.loc[similar_title_order,:]

merged_movie_info = pd.DataFrame(data=[['',[],'','','','','',[],[],[],[],'']], columns=['Title','Language','Release date','Date First Available','Run time','Producers','Directors','Writers','Actors','Media Format','Subtitles','Genres'])
merged_movie_source_asin = pd.DataFrame(data=[[[],[],[],[],'','','','',[],[],[],'']], columns=['Title','Language','Release date','Date First Available','Run time','Producers','Directors','Writers','Actors','Media Format','Subtitles','Genres'])

# 初始
for index, row in for_merge_info.iterrows():
# Title 默认加上所有ASIN
merged_movie_source_asin.loc[0,'Title'].append(row['ASIN'])
# Title 选最短名
if merged_movie_info.loc[0,'Title'] == '' or len(merged_movie_info.loc[0,'Title']) > len(row['Title']):
merged_movie_info.loc[0,'Title'] = row['Title']
# Language 有且不重复就加进去
if not pd.isna(row['Language']):
merged_movie_source_asin.loc[0,'Language'].append(row['ASIN'])
if row['Language'] not in merged_movie_info.loc[0,'Language']:
merged_movie_info.loc[0,'Language'].append(row['Language'])
# ...省略

merged_movies_collection = pd.concat([merged_movies_collection,merged_movie_info], ignore_index=True)
merged_movies_source_asin_collection = pd.concat([merged_movies_source_asin_collection, merged_movie_source_asin], ignore_index=True)

movies_info.drop(similar_title_order, inplace=True)

if i % 1000 == 0:
print(i)

另外,考虑到fuzzywuzzy在计算大量距离时耗时较长,因此先将电影名排序,再将每部电影与之后10部电影的相似度进行比较,从而减少了运行的时间

合并人名

人名的格式不尽相同,有时相同的人会有不同的名字,情况如下:

  • 人名颠倒,或者缺少空格

image-20221020211208243

  • 大小写不同

image-20221020211345194

  • middle name缩写

    image-20221020211639154

image-20221020211706280

经过测试,仍采用fuzzywuzzy库进行相似度比较

  1. 首先读取电影信息,抽离出所有导演,编剧和演员的名字,合并到names数组中
  2. 对其进行去重,降序排序
    • 这样是为了保证更加规范的小写字母的名字在前,以确保替换时以先遍历到的规范名字在前
  3. 遍历数组,计算首字母相同的名字之间的相似度,选取得分高于95的进行替换

(具体代码见merge_similar_names.ipynb

上映日期

部分电影缺少上映日期。对此,从之前提取的评论数据出发,选取对应ASIN值的评论中最早的时间,将其定义为上映日期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import math
i = 0
for index, row in movies_info.iterrows():
if pd.isnull(row['Release date']):
ID = row['ASIN']
print(ID, end = ' ')
earliest_time = comments.loc[comments['productId']==ID,'time'].min()
print(earliest_time)
if not math.isnan(earliest_time):
earliest_time_str = time.strftime("%B %d, %Y", time.localtime(earliest_time))
# movies_info.loc[movies_info['ASIN'] == ID, 'Release date'] = earliest_time_str
row['Release date'] = earliest_time_str
i += 1
if i % 1000 == 0:
print(i)
# print(row)

结果如下

image-20221018101059482

此外,经调研发现,imdb提供了若干API接口,只需要在imdb developer上注册一个账号,申请API_KEY,就可以根据文档描述进行查询。简单的示例如下:

image-20221031190941823

数据血缘

在合并相同电影的同时,建立了一个新的名为source_asin的DataFrame,列的信息与电影信息相同,行数也保持一致,而每个单元格的信息则是对应电影信息表中的数据的来源ASIN值。通过这种方式,可以找到每个信息的来源

image-20221023102747788

image-20221023102706332


数据仓库ETL爬虫&数据处理小记
http://baokker.github.io/2022/10/11/数据仓库ETL爬虫-数据处理小记/
作者
Baokker
发布于
2022年10月11日
更新于
2022年10月31日
许可协议