banner
肥皂的小屋

肥皂的小屋

github
steam
bilibili
douban
tg_channel

Python--豆瓣TOP250映画情報を取得する

起因#

最近、実習で自分でウェブサイトを見つけてデータをexcel表に保存するように求められました。

映画を見るときも豆瓣のTOP250でフィルタリングしているので、手動でページをめくるのは面倒なので、クローリングすることにしました。

[2019-09-02 更新] その後、課題の発表をすることになり、mysqlデータベースに保存することに変更しました。

コード実装#

ネット上のほとんどのクローリング記事とは異なり、私が欲しいのは各映画のあらすじ情報です。

そのため、まず各映画のリンクを取得し、次に各映画を個別にクローリングする必要があります。

全コードは以下の通りです:

# -*- coding: utf-8 -*-
'''
@author: soapffz
@function: 豆瓣TOP250映画情報の取得とmysqlデータベースへの保存(マルチスレッド)
@time: 2019-09-01
'''

import requests
from fake_useragent import UserAgent
from lxml import etree
from tqdm import tqdm
import threading
import pymysql
from re import split

"""
ライブラリが見つからない場合は以下のコマンドをコピーして解決してください
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple fake_useragent
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple tqdm
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple threading
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pymysql
"""


class Top250(object):
    def __init__(self):
        ua = UserAgent()  # User-Agentを生成するため
        self.headers = {"User-Agent": ua.random}  # ランダムなUser-Agentを取得
        self.bangdang_l = []  # ランキングのページを保存
        self.subject_url_l = []  # 各映画のリンクを保存
        self.connect_mysql()

    def connect_mysql(self):
        # データベースに接続、パスワードの後にデータベース名を追加できます
        try:
            self.mysql_conn = pymysql.connect(
                'localhost', 'root', 'root', charset='utf8')
            # SQL文を実行できるカーソルオブジェクトを取得し、実行後の結果はデフォルトでタプル形式で表示されます
            self.cursor = self.mysql_conn.cursor()
            print("データベース接続成功")
        except Exception as e:
            print("データベース接続エラー:{}\nクローリングを完了させてどこに保存するの?クローリングを中止します、プログラムを終了します!".format(e))
            exit(0)
        else:
            self.create_db()

    def create_db(self):
        # データベースとテーブルを作成
        # データベースが存在するか確認し、存在する場合は削除してから作成
        sql_db_dection = "DROP DATABASE IF EXISTS `douban_top250`"
        sql_create_db = "CREATE DATABASE `douban_top250` default charset utf8 COLLATE utf8_general_ci;"
        sql_create_table = """
            CREATE TABLE `movies_info` (
                `映画名` varchar(255) NOT NULL,
                `監督` varchar(511) NOT NULL,
                `主演情報` varchar(511) NOT NULL,
                `ジャンル` varchar(255) NOT NULL,
                `公開日` varchar(255) NOT NULL,
                `あらすじ` varchar(511) NOT NULL,
                `ランキング` varchar(255) NOT NULL,
                `上映時間` varchar(255) NOT NULL,
                PRIMARY KEY (`ランキング`)
            )DEFAULT CHARSET=utf8;
            """
        try:
            self.cursor.execute(sql_db_dection)
            self.cursor.execute(sql_create_db)
            self.cursor.execute('use douban_top250;')  # 現在のデータベースを新しく作成したデータベースに設定
            self.cursor.execute(sql_create_table)
        except Exception as e:  # すべての例外をキャッチして表示、python2ではException,e
            print("データベース作成エラー:{}\nプログラムを終了します!".format(e))
            self.mysql_conn.rollback()  # エラーが発生した場合はロールバック
            self.mysql_conn.close()  # データベース接続を閉じる
            exit(0)
        else:
            print("データベースとテーブルの作成成功、各映画のリンクを取得開始..")
            self.get_subject_url()

    def get_subject_url(self):
        # 250のランキングを巡回して各映画の個別リンクを取得
        self.bangdang_l = [
            "https://movie.douban.com/top250?start={}&filter=".format(i) for i in range(0, 250, 25)]
        self.multi_thread(self.bangdang_l, self.crawl_bangdang)
        if len(self.subject_url_l) == 0:
            print("IPがブロックされました、プログラムを終了します")
            exit(0)
        else:
            print("{}本のTOP映画のリンクが取得完了しました、各映画のクローリングを開始しますので、しばらくお待ちください...".format(
                len(self.subject_url_l)))
            self.multi_thread(self.subject_url_l, self.get_one_movie_info)

    def crawl_bangdang(self, url):
        # 各ランキングページの映画リンクをクローリング
        try:
            req = requests.get(url, headers=self.headers)
            req.encoding = "utf-8"
            html = etree.HTML(req.text)
            # すべてのaタグのリストを取得
            url_l = html.xpath('//div[@class="hd"]//a/@href')
            self.subject_url_l.extend(url_l)
        except Exception as e:
            print("ランキング情報のクローリング中にエラーが発生しました:{}".format(e))
        else:
            print("第{}ページのランキングデータを取得成功...\n".format(
                int(int(split(r"=|&", url)[1])/25 + 1)))

    def multi_thread(self, url_l, target_func):
        # マルチスレッドクローリング関数、クローリングするURLリストとクローリングに使用する関数を渡す
        threads = []
        for i in range(len(url_l)):
            t = threading.Thread(target=target_func, args=(url_l[i],))
            threads.append(t)
        for i in range(len(threads)):
            threads[i].start()
        for i in range(len(threads)):
            threads[i].join()
        print("クローリング完了")

    def get_one_movie_info(self, subject):
        # 単一のURLをクローリングする関数
        try:
            req = requests.get(subject, headers=self.headers)
            html = etree.HTML(req.content)
        except Exception as e:
            print("クローリングエラー:".format(e))
        else:
            # 単一映画の情報を保存するため
            info = []
            # 映画名情報
            movie_name = html.xpath(
                "//span[@property='v:itemreviewed']/text()")
            info.append(" ".join(movie_name))
            # 監督情報
            director = html.xpath("//a[@rel='v:directedBy']//text()")
            info.append(" ".join(director))
            # 主演情報
            actor = html.xpath("//a[@rel='v:starring']//text()")
            info.append(" ".join(actor))
            # ジャンル
            genre = html.xpath("//span[@property='v:genre']/text()")
            info.append(" ".join(genre))
            # 公開日
            initialReleaseDate = html.xpath(
                "//span[@property='v:initialReleaseDate']/text()")
            info.append(" ".join(initialReleaseDate))
            # あらすじ
            reated_info = html.xpath("//span[@class='all hidden']/text()")
            # あらすじが隠れている場合があるので、デフォルトで隠れたタグを取得し、隠れていないタグがあればそれを取得
            if len(reated_info) == 0:
                reated_info = html.xpath(
                    "//span[@property='v:summary']/text()")
            reated_info = "".join([s.strip() for s in reated_info]).strip("\\")
            reated_info = self.transferContent(reated_info)
            info.append(reated_info)
            # ランキング
            no = html.xpath("//span[@class='top250-no']/text()")
            if len(no) == 1:
                info.append(no[0].split(".")[-1])
            else:
                info.append("取得失敗")
            runtime = html.xpath("//span[@property='v:runtime']/text()")
            if len(runtime) == 1:
                info.append(runtime[0].split("分")[0])
            else:
                info.append("取得失敗")
            self.db_insert(info)

    def db_insert(self, info_l):
        sql_insert_detection = """
            insert ignore into `douban_top250`.`movies_info` (`映画名`,`監督`,`主演情報`,`ジャンル`,`公開日`,`あらすじ`,`ランキング`,`上映時間`)
            values ("{l[0]}","{l[1]}","{l[2]}","{l[3]}","{l[4]}","{l[5]}","{l[6]}","{l[7]}");
        """.format(l=info_l)
        try:
            self.cursor.execute(sql_insert_detection)
            self.mysql_conn.commit()
        except Exception as e:
            self.mysql_conn.rollback()
            print("データベースへのデータインポート中にエラーが発生しました:{}".format(e))
            exit(0)
        else:
            print("{}の情報取得成功...\n".format(info_l[0]))

    def transferContent(self, content):
        # 同時に'と"を含む文字列をエスケープ
        if content is None:
            return None
        else:
            string = ""
            for c in content:
                if c == '"':
                    string += '\\\"'
                elif c == "'":
                    string += "\\\'"
                elif c == "\\":
                    string += "\\\\"
                else:
                    string += c
            return string


if __name__ == "__main__":
    Top250()

デモ GIF は以下の通りです:

image

参考記事:

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。