起因#
最近、実習で自分でウェブサイトを見つけてデータをexcel
表に保存するように求められました。
映画を見るときも豆瓣のTOP250
でフィルタリングしているので、手動でページをめくるのは面倒なので、クローリングすることにしました。
[2019-09-02 更新] その後、課題の発表をすることになり、mysql
データベースに保存することに変更しました。
コード実装#
ネット上のほとんどのクローリング記事とは異なり、私が欲しいのは各映画のあらすじ情報です。
そのため、まず各映画のリンクを取得し、次に各映画を個別にクローリングする必要があります。
全コードは以下の通りです:
# -*- coding: utf-8 -*-
'''
@author: soapffz
@function: 豆瓣TOP250映画情報の取得とmysqlデータベースへの保存(マルチスレッド)
@time: 2019-09-01
'''
import requests
from fake_useragent import UserAgent
from lxml import etree
from tqdm import tqdm
import threading
import pymysql
from re import split
"""
ライブラリが見つからない場合は以下のコマンドをコピーして解決してください
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple fake_useragent
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple tqdm
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple threading
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pymysql
"""
class Top250(object):
def __init__(self):
ua = UserAgent() # User-Agentを生成するため
self.headers = {"User-Agent": ua.random} # ランダムなUser-Agentを取得
self.bangdang_l = [] # ランキングのページを保存
self.subject_url_l = [] # 各映画のリンクを保存
self.connect_mysql()
def connect_mysql(self):
# データベースに接続、パスワードの後にデータベース名を追加できます
try:
self.mysql_conn = pymysql.connect(
'localhost', 'root', 'root', charset='utf8')
# SQL文を実行できるカーソルオブジェクトを取得し、実行後の結果はデフォルトでタプル形式で表示されます
self.cursor = self.mysql_conn.cursor()
print("データベース接続成功")
except Exception as e:
print("データベース接続エラー:{}\nクローリングを完了させてどこに保存するの?クローリングを中止します、プログラムを終了します!".format(e))
exit(0)
else:
self.create_db()
def create_db(self):
# データベースとテーブルを作成
# データベースが存在するか確認し、存在する場合は削除してから作成
sql_db_dection = "DROP DATABASE IF EXISTS `douban_top250`"
sql_create_db = "CREATE DATABASE `douban_top250` default charset utf8 COLLATE utf8_general_ci;"
sql_create_table = """
CREATE TABLE `movies_info` (
`映画名` varchar(255) NOT NULL,
`監督` varchar(511) NOT NULL,
`主演情報` varchar(511) NOT NULL,
`ジャンル` varchar(255) NOT NULL,
`公開日` varchar(255) NOT NULL,
`あらすじ` varchar(511) NOT NULL,
`ランキング` varchar(255) NOT NULL,
`上映時間` varchar(255) NOT NULL,
PRIMARY KEY (`ランキング`)
)DEFAULT CHARSET=utf8;
"""
try:
self.cursor.execute(sql_db_dection)
self.cursor.execute(sql_create_db)
self.cursor.execute('use douban_top250;') # 現在のデータベースを新しく作成したデータベースに設定
self.cursor.execute(sql_create_table)
except Exception as e: # すべての例外をキャッチして表示、python2ではException,e
print("データベース作成エラー:{}\nプログラムを終了します!".format(e))
self.mysql_conn.rollback() # エラーが発生した場合はロールバック
self.mysql_conn.close() # データベース接続を閉じる
exit(0)
else:
print("データベースとテーブルの作成成功、各映画のリンクを取得開始..")
self.get_subject_url()
def get_subject_url(self):
# 250のランキングを巡回して各映画の個別リンクを取得
self.bangdang_l = [
"https://movie.douban.com/top250?start={}&filter=".format(i) for i in range(0, 250, 25)]
self.multi_thread(self.bangdang_l, self.crawl_bangdang)
if len(self.subject_url_l) == 0:
print("IPがブロックされました、プログラムを終了します")
exit(0)
else:
print("{}本のTOP映画のリンクが取得完了しました、各映画のクローリングを開始しますので、しばらくお待ちください...".format(
len(self.subject_url_l)))
self.multi_thread(self.subject_url_l, self.get_one_movie_info)
def crawl_bangdang(self, url):
# 各ランキングページの映画リンクをクローリング
try:
req = requests.get(url, headers=self.headers)
req.encoding = "utf-8"
html = etree.HTML(req.text)
# すべてのaタグのリストを取得
url_l = html.xpath('//div[@class="hd"]//a/@href')
self.subject_url_l.extend(url_l)
except Exception as e:
print("ランキング情報のクローリング中にエラーが発生しました:{}".format(e))
else:
print("第{}ページのランキングデータを取得成功...\n".format(
int(int(split(r"=|&", url)[1])/25 + 1)))
def multi_thread(self, url_l, target_func):
# マルチスレッドクローリング関数、クローリングするURLリストとクローリングに使用する関数を渡す
threads = []
for i in range(len(url_l)):
t = threading.Thread(target=target_func, args=(url_l[i],))
threads.append(t)
for i in range(len(threads)):
threads[i].start()
for i in range(len(threads)):
threads[i].join()
print("クローリング完了")
def get_one_movie_info(self, subject):
# 単一のURLをクローリングする関数
try:
req = requests.get(subject, headers=self.headers)
html = etree.HTML(req.content)
except Exception as e:
print("クローリングエラー:".format(e))
else:
# 単一映画の情報を保存するため
info = []
# 映画名情報
movie_name = html.xpath(
"//span[@property='v:itemreviewed']/text()")
info.append(" ".join(movie_name))
# 監督情報
director = html.xpath("//a[@rel='v:directedBy']//text()")
info.append(" ".join(director))
# 主演情報
actor = html.xpath("//a[@rel='v:starring']//text()")
info.append(" ".join(actor))
# ジャンル
genre = html.xpath("//span[@property='v:genre']/text()")
info.append(" ".join(genre))
# 公開日
initialReleaseDate = html.xpath(
"//span[@property='v:initialReleaseDate']/text()")
info.append(" ".join(initialReleaseDate))
# あらすじ
reated_info = html.xpath("//span[@class='all hidden']/text()")
# あらすじが隠れている場合があるので、デフォルトで隠れたタグを取得し、隠れていないタグがあればそれを取得
if len(reated_info) == 0:
reated_info = html.xpath(
"//span[@property='v:summary']/text()")
reated_info = "".join([s.strip() for s in reated_info]).strip("\\")
reated_info = self.transferContent(reated_info)
info.append(reated_info)
# ランキング
no = html.xpath("//span[@class='top250-no']/text()")
if len(no) == 1:
info.append(no[0].split(".")[-1])
else:
info.append("取得失敗")
runtime = html.xpath("//span[@property='v:runtime']/text()")
if len(runtime) == 1:
info.append(runtime[0].split("分")[0])
else:
info.append("取得失敗")
self.db_insert(info)
def db_insert(self, info_l):
sql_insert_detection = """
insert ignore into `douban_top250`.`movies_info` (`映画名`,`監督`,`主演情報`,`ジャンル`,`公開日`,`あらすじ`,`ランキング`,`上映時間`)
values ("{l[0]}","{l[1]}","{l[2]}","{l[3]}","{l[4]}","{l[5]}","{l[6]}","{l[7]}");
""".format(l=info_l)
try:
self.cursor.execute(sql_insert_detection)
self.mysql_conn.commit()
except Exception as e:
self.mysql_conn.rollback()
print("データベースへのデータインポート中にエラーが発生しました:{}".format(e))
exit(0)
else:
print("{}の情報取得成功...\n".format(info_l[0]))
def transferContent(self, content):
# 同時に'と"を含む文字列をエスケープ
if content is None:
return None
else:
string = ""
for c in content:
if c == '"':
string += '\\\"'
elif c == "'":
string += "\\\'"
elif c == "\\":
string += "\\\\"
else:
string += c
return string
if __name__ == "__main__":
Top250()
デモ GIF は以下の通りです:
参考記事: