B站评论/弹幕批量删除脚本


等等,先别急……

开始动手前先停下想想——这些历史信息就像旧照片,也是宝贵的财产,为什么非要删了呢?

为了消除令人尴尬的黑历史,还是为了保护个人隐私?

如果你单纯是不想要号了,可以直接去注销账号,那样更快。

也许你只是想进行一次数字断舍离,和过去的自己做个决裂……好吧,这也不是坏事。

不过删除就无法恢复了,一定请想好之后再动手,本人曾经就把QQ相册清空导致至今后悔不已。三思!


前面的废话

Web技术原理我就不说了,我不是专家也说不清楚,既然你能看到这你肯定是有基础的。

按我自己的理解,这种自动化脚本有两种思路:

  1. 调接口:这种方式可能需要你去逆向研究网站的接口。这个方式开发困难、容易被封,但运行高效,适合大规模爬虫程序。
  2. 模拟人工:就是脚本用一个真浏览器替你操作。技术原理简单,不容易被发现,但运行效率低。

或者更普通的思路,1和2的结合,也是本文的思路。

本脚本的运行过程是这样:

  1. 用playwright有头模式,打开站点,手动登录,用户登录状态信息缓存在一个本地目录下;
  2. 随便打开一个视频,获得vd_source。这或许和用户标识有关的一个值,由前端js生成并追加,但为了不去麻烦研究js计算代码,我们直接随便打开一个视频,等它加载完直接从地址栏拿到这个值;
  3. 进入“消息中心”页,获取所有“回复我的”通知,针对通知里的评论作批量删除,并顺带把通知也删除;
  4. 仍然在“消息中心”页,获取所有“收到的赞”的通知,针对通知里的评论和弹幕作批量删除,并顺带把通知删除;

存在的问题:

  1. B站没有提供一个能全局查询某用户所有评论/弹幕的接口,所以只能从通知中心入手,间接删除和别人有交互的评论/弹幕;
  2. 脚本没有单独处理在别人动态下的评论,我的这种情况不多,所以没有专门写处理逻辑,跑完脚本剩下的手动删除即可。

脚本

删除脚本:

import asyncio
from playwright.async_api import async_playwright
import random

from auto_scripts.bvid_aid_conversion import av2bv


async def get_replied_comments(context, max_num=20):
    async def get_single(id=None, reply_time=None):
        params = [
            "platform=web",
            "build=0",
            "mobi_app=web",
            "web_location=333.40164"
        ]
        if id and reply_time:
            params.append(f"id={id}")
            params.append(f"reply_time={reply_time}")
        request_url = f"https://api.bilibili.com/x/msgfeed/reply?{'&'.join(params)}"
        try:
            res_json = await context.pages[0].evaluate(f"""
                async () => {{
                    const res = await fetch('{request_url}', {{
                        method: 'GET',
                        headers: {{
                            'Referer': 'https://message.bilibili.com/',
                            'Origin': 'https://message.bilibili.com'
                        }},
                        credentials: 'include'
                    }});
                    return await res.json();
                }}
            """)
            res_json_data = res_json.get('data')
            items = res_json_data.get("items")
            is_end = res_json_data.get("cursor").get("is_end")
            next_id = res_json_data.get("cursor").get("id")
            next_reply_time = res_json_data.get("cursor").get("time")
            return items, is_end, next_id, next_reply_time
        except Exception as e:
            print(f"获取“回复我的”失败: {e}")
            return [], True, None, None


    items, is_end, next_id, next_reply_time = await get_single()
    while not is_end:
        if len(items) >= max_num:
            break
        item, is_end, next_id, next_reply_time = await get_single(next_id, next_reply_time)
        if len(item) > 0:
            items.extend(item)
        print(f"已获取回复评论条目: {len(items)}")
        await asyncio.sleep(1)

    return items


async def get_liked_comments(context, max_num=20):
    async def get_single(id=None, like_time=None):
        params = [
            "platform=web",
            "build=0",
            "mobi_app=web",
            "web_location=333.40164"
        ]
        if id and like_time:
            params.append(f"id={id}")
            params.append(f"like_time={like_time}")
        request_url = f"https://api.bilibili.com/x/msgfeed/like?{'&'.join(params)}"
        try:
            res_json = await context.pages[0].evaluate(f"""
                async () => {{
                    const res = await fetch('{request_url}', {{
                        method: 'GET',
                        headers: {{
                            'Referer': 'https://message.bilibili.com/',
                            'Origin': 'https://message.bilibili.com'
                        }},
                        credentials: 'include'
                    }});
                    return await res.json();
                }}
            """)
            res_json_data = res_json.get('data').get("total")
            items = res_json_data.get("items")
            is_end = res_json_data.get("cursor").get("is_end")
            next_id = res_json_data.get("cursor").get("id")
            next_like_time = res_json_data.get("cursor").get("time")
            return items, is_end, next_id, next_like_time
        except Exception as e:
            print(f"获取“回复我的”失败: {e}")
            return [], True, None, None


    items, is_end, next_id, next_like_time = await get_single()
    while not is_end:
        if len(items) >= max_num:
            break
        item, is_end, next_id, next_like_time = await get_single(next_id, next_like_time)
        if len(item) > 0:
            items.extend(item)
        print(f"已获取被赞评论条目: {len(items)}")
        await asyncio.sleep(1)

    return items

async def get_csrf(context):
    cookies = await context.cookies("https://www.bilibili.com")
    bili_jct_cookie = next((c for c in cookies if c['name'] == 'bili_jct'), None)
    return bili_jct_cookie['value'] if bili_jct_cookie else None

async def delete_notification(context, notification_id, tp):
    # tp=0 被赞通知, tp=1 被回复通知
    csrf_value = await get_csrf(context)
    try:
        response = await context.pages[0].evaluate(f"""
                fetch('https://api.bilibili.com/x/msgfeed/del', {{
                    method: 'POST',
                    headers: {{
                        'Content-Type': 'application/x-www-form-urlencoded',
                        'Referer': 'https://message.bilibili.com/',
                        'Origin': 'https://message.bilibili.com',
                        'Sec-Fetch-Mode': 'cors',
                        'Sec-Fetch-Site': 'same-origin'
                    }},
                    body: new URLSearchParams({{
                        'tp': '{tp}',
                        'id': '{notification_id}',
                        'build': '0',
                        'mobi_app': 'web',
                        'csrf': '{csrf_value}'
                    }}),
                    credentials: 'include'
                }}).then(res => res.json())
            """)
        return response.get('code') == 0
    except Exception as e:
        print(f"删除“回复我的”通知失败: {e}")
        return False

async def delete_comment(context, aid, comment_id, vd_source):
    referer = f"https://www.bilibili.com/video/{av2bv(aid)}/?vd_source={vd_source}"
    csrf_value = await get_csrf(context)
    try:
        response = await context.pages[0].evaluate(f"""
                fetch('https://api.bilibili.com/x/v2/reply/del', {{
                    method: 'POST',
                    headers: {{
                        'Content-Type': 'application/x-www-form-urlencoded',
                        'Referer': '{referer}',
                        'Origin': 'https://www.bilibili.com',
                        'Sec-Fetch-Mode': 'cors',
                        'Sec-Fetch-Site': 'same-origin'
                    }},
                    body: new URLSearchParams({{
                        'oid': '{aid}',
                        'type': '1',
                        'rpid': '{comment_id}',
                        'csrf': '{csrf_value}'
                    }}),
                    credentials: 'include'
                }}).then(res => res.json())
            """)
        return response.get('code') == 0
    except Exception as e:
        print(f"删除“回复我的”评论失败: {e}")
        return False

async def delete_danmu(context, aid, dmid, cid, vd_source, dm_progress):
    referer = f"https://www.bilibili.com/video/{av2bv(aid)}/?dm_progress={dm_progress}&dmid={dmid}&vd_source={vd_source}"
    csrf_value = await get_csrf(context)
    try:
        response = await context.pages[0].evaluate(f"""
                    fetch('https://api.bilibili.com/x/v2/dm/recall', {{
                        method: 'POST',
                        headers: {{
                            'Content-Type': 'application/x-www-form-urlencoded',
                            'Referer': '{referer}',
                            'Origin': 'https://www.bilibili.com',
                            'Sec-Fetch-Mode': 'cors',
                            'Sec-Fetch-Site': 'same-origin'
                        }},
                        body: new URLSearchParams({{
                            'dmid': '{dmid}',
                            'cid': '{cid}',
                            'type': '1',
                            'csrf': '{csrf_value}'
                        }}),
                        credentials: 'include'
                    }}).then(res => res.json())
                """)
        return response.get('code') == 0
    except Exception as e:
        print(f"删除弹幕失败: {e}")
        return False

async def delete_replied_comments(context, items, vd_source):
    processed_comment = set()
    for item in items:
        if item.get("item").get("business") == "评论":
            comment_id = item.get("item").get("target_id")
            my_comment = item.get("item").get("target_reply_content")[:20]
            if my_comment == "":
                my_comment = item.get("item").get("root_reply_content")[:20]
            print(f"\n---\n评论内容:{my_comment}... || id: '{comment_id}'")
            # input("请按回车键确认删除...")

            if comment_id not in processed_comment:
                comment_deleted = await delete_comment(context, item.get("item").get("subject_id"), comment_id, vd_source)
                print(f"评论: '{comment_id}' 删除状态: {comment_deleted}")
                if not comment_deleted:
                    print(f"删除失败:{item}")
                processed_comment.add(comment_id)
            else:
                print("评论已经处理过,跳过")
        else:
            print(f"预期外的类型,跳过: {item.get("item").get("business")}")

        # 删除对应的通知
        ok = await delete_notification(context, item.get("id"), 1)
        print(f"通知: '{item.get("id")}' 删除状态: {ok}")

        await asyncio.sleep(random.randint(0, 1))

async def delete_liked_commets(context, items, vd_source):
    processed_comment = set()
    for item in items:
        if item.get("item").get("business") == "评论":
            comment_id = item.get("item").get("item_id")
            my_comment = item.get("item").get("title")[:20]
            print(f"\n---\n评论内容:{my_comment}... || id: '{comment_id}'")
            # input("请按回车键确认删除...")
            # 从native_uri中提取视频aid,uri形如:bilibili://video/{xxxx}?...
            native_uri = item.get("item").get("native_uri")
            import re
            pattern = r"bilibili://video/(\d+)"  # 仅处理了视频评论,如果是动态评论可以改下pattern,此时native_uri例如'bilibili://comment/detail/11/381146508/288325939232/?subType=0&anchor=288326706768&showEnter=1&extraIntentId=0&scene=1&enterName=查看动态详情&enterUri=bilibili://opus/detail/1156902561345699849'
            match = re.search(pattern, native_uri)
            if not match:
                print(f"删除失败:未能提取oid,native_uri={native_uri}")
                continue
            aid = int(match.group(1))
            # 开始删除评论
            if comment_id not in processed_comment:
                comment_deleted = await delete_comment(context, aid, comment_id, vd_source)
                print(f"评论: '{comment_id}' 删除状态: {comment_deleted}")
                if not comment_deleted:
                    print(f"删除失败:{item}")
                processed_comment.add(comment_id)
            else:
                print("评论已经处理过,跳过")
        elif item.get("item").get("business") == "弹幕":
            # 从native_uri中提取 aid, dm_progress, cid, dmid
            native_uri = item.get("item").get("native_uri")
            import re
            pattern = r"bilibili://video/(\d+)\?dm_progress=(\d+)&cid=(\d+)&dmid=(\d+)"
            match = re.search(pattern, native_uri)
            if not match:
                print(f"删除失败:未能提取oid,native_uri={native_uri}")
                continue
            aid = int(match.group(1))
            dm_progress = int(match.group(2))
            cid = int(match.group(3))
            dmid = int(match.group(4))
            print(f"\n---\n弹幕内容:{item.get("item").get("title")}... || id: '{dmid}'")
            # 开始删除弹幕
            if dmid not in processed_comment:
                danmu_deleted = await delete_danmu(context, aid, dmid, cid, vd_source, dm_progress)
                print(f"弹幕: '{dmid}' 删除状态: {danmu_deleted}")
                if not danmu_deleted:
                    print(f"删除失败:{item}")
                processed_comment.add(dmid)
            else:
                print("弹幕已经处理过,跳过")
        # 删除对应的通知
        ok = await delete_notification(context, item.get("id"), 0)
        print(f"通知: '{item.get("id")}' 删除状态: {ok}")

        await asyncio.sleep(random.randint(0, 1))


async def run_cleaner():
    async with async_playwright() as p:
        user_data_dir = "./bilibili_user_data"
        context = await p.chromium.launch_persistent_context(
            user_data_dir,
            headless=False,  # 必须为 False,我们要手动登录
            args=["--start-maximized"],
            no_viewport=True
        )
        page = context.pages[0]

        # 获取vd_source
        await page.goto("https://www.bilibili.com/video/BV1NQtDeEErj")  # 随便打开一个视频
        await page.wait_for_function(
            "window.location.href.includes('vd_source')",
            timeout=10000
        )
        vd_source = page.url.split("vd_source=")[1]
        await asyncio.sleep(1)

        await page.goto("https://message.bilibili.com")
        await page.wait_for_load_state(state="domcontentloaded")

        # 被回复的评论
        replied_items = await get_replied_comments(context, 10000)
        await delete_replied_comments(context, replied_items, vd_source)

        # 被点赞的评论和弹幕
        for i in range(10):
            liked_items = await get_liked_comments(context, 200)
            await delete_liked_commets(context, liked_items, vd_source)

        # 任务完成后保持一会儿
        await asyncio.sleep(1)
        await context.close()


if __name__ == "__main__":
    asyncio.run(run_cleaner())

BV号 <=> aid 转换,可以调接口,但有人已经把计算代码公开了:

XOR_CODE = 23442827791579
MASK_CODE = 2251799813685247
MAX_AID = 1 << 51
ALPHABET = "FcwAPNKTMug3GV5Lj7EJnHpWsx4tb8haYeviqBz6rkCy12mUSDQX9RdoZf"
ENCODE_MAP = 8, 7, 0, 5, 1, 3, 2, 4, 6
DECODE_MAP = tuple(reversed(ENCODE_MAP))

BASE = len(ALPHABET)
PREFIX = "BV1"
PREFIX_LEN = len(PREFIX)
CODE_LEN = len(ENCODE_MAP)

def av2bv(aid: int) -> str:
    bvid = [""] * 9
    tmp = (MAX_AID | aid) ^ XOR_CODE
    for i in range(CODE_LEN):
        bvid[ENCODE_MAP[i]] = ALPHABET[tmp % BASE]
        tmp //= BASE
    return PREFIX + "".join(bvid)

def bv2av(bvid: str) -> int:
    assert bvid[:3] == PREFIX

    bvid = bvid[3:]
    tmp = 0
    for i in range(CODE_LEN):
        idx = ALPHABET.index(bvid[DECODE_MAP[i]])
        tmp = tmp * BASE + idx
    return (tmp & MASK_CODE) ^ XOR_CODE

assert av2bv(113179954776959) == "BV1NQtDeEErj"
assert bv2av("BV1NQtDeEErj") == 113179954776959

一个好/坏消息

好,你应该已经删完了吧?我现在有一个好(坏?)消息要告诉你。

好消息是,如果你以后后悔了,想找回被自己手贱删除的历史,可以在这个 aicu.cc 网站查到。

这是一个非官方站点,爬取了整个B站所有的公开评论/弹幕(B站官方你们的技术团队在干什么……不过想想上次整个站点源代码被上传github这种事,站点被爬又算得了什么)。

虽然不能恢复,但它也提供了一种历史查看手段。

但这同时也是一个坏消息,如果你删历史是为了隐藏自己的黑历史,那你失算了,任何人都可以轻易在这个站点上查到你的黑历史。享受裸奔的快乐吧ヾ(o◕∀◕)ノヾ。