来自以下网址
抖音爬虫(主页、喜欢列表全部下载) - 『编程语言区』 - 吾爱破解 - LCG - LSG |安卓破解|病毒分析|www.52pojie.cn

| 本帖最后由 d8349565 于 2022-10-7 23:13 编辑
br>抖音爬虫(主页、喜欢列表全部下载) 说明
以下代码仅供交流,主要爬取抖音单个用户的主页或喜欢中涉及的所有视频;

文件夹内共以下内容:

  1. 爬虫:douyin_spider.py
  2. 多线程下载器:douyin_download_N_thread.py
  3. data.json:记录爬取到的信息,供下载器使用
    1. image-20221007222739414
  4. 文件夹download_files:储存下载的视频文件
  5. </ol> 演示
    2022-10-07 22-21-07
    爬虫代码:(data里面还有很多信息,自己可以打印出来看看)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    import requests
    import json
    import time
    import os

    os.chdir(os.path.dirname(os.path.realpath(__file__)))
    def get_data(sec_uid,max_cursor,mode):
    headers = {
    'Connection': 'keep-alive',
    'Accept': 'application/json, text/plain, */*',
    'Agw-Js-Conv': 'str',
    'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Mobile Safari/537.36',
    'Sec-Fetch-Site': 'same-origin',
    'Sec-Fetch-Mode': 'cors',
    'Sec-Fetch-Dest': 'empty',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    }
    params = {
    'reflow_source': 'reflow_page',
    'sec_uid': sec_uid,
    'count': '100',
    'max_cursor': max_cursor,
    }
    response = requests.get(f'https://m.douyin.com/web/api/v2/aweme/{mode}/', params=params, headers=headers)
    data = response.json()
    for d in data['aweme_list']:
    output = {}
    output['title'] = d['desc']
    output['VideoUrl'] = d['video']['play_addr']['url_list'][-1]
    output['img'] = d['video']['dynamic_cover']['url_list'][-1]
    output['id'] = d['aweme_id']
    result.append(output)
    print(output['title'])
    return data['max_cursor'],data['has_more']

    def run(sec_uid,mode='post'):
    max_cursor = '0'
    for i in range(100):
    x = get_data(sec_uid,max_cursor,mode)
    if x[1]:
    max_cursor = x[0]
    get_data(sec_uid,max_cursor,mode)
    else:
    break

    if __name__ == '__main__':
    result = []
    choice = input('''
    *******************************************************
    请选择需要下载的类别序号(1/2):
    1、该账号主页的所有视频;
    2、改账号喜欢列表所有视频。
    *******************************************************\n
    ''')
    sec_uid = input('''
    *******************************************************
    请选择需账号的sec_uid码:
    例如:主页链接(PC网页打开)
    链接:https://www.douyin.com/user/MS4wLjABAAAAfeHUJALUV_hro9kN7QT5I9pe9DNVDSkiCTiqfK0ziZo?vid=7151405922777107753
    sec_uid码:MS4wLjABAAAAfeHUJALUV_hro9kN7QT5I9pe9DNVDSkiCTiqfK0ziZo
    *******************************************************\n''')
    sel = {'1':'post','2':'like'}
    mode = sel[choice]
    path = f'data.json'
    try:
    run(sec_uid,mode)
    print('完成搜索数量:',len(result))
    with open(path,'w',encoding='utf-8') as f:
    json.dump(result,f,indent=4, ensure_ascii=False)
    except:
    print('完成搜索数量(lost_part):',len(result))
    with open(path,'w',encoding='utf-8') as f:
    json.dump(result,f,indent=4, ensure_ascii=False)
    time.sleep(3)

    多线程下载器代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    import queue
    import threading
    import time
    import requests
    import json
    import re
    import os

    os.chdir(os.path.dirname(os.path.realpath(__file__)))

    class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
    threading.Thread.__init__(self)
    self.threadID = threadID
    self.name = name
    self.q = q
    def run(self):
    # print ("开启线程:" + self.name)
    process_data(self.name, self.q)
    # print ("退出线程:" + self.name)

    def process_data(threadName, q):
    while not exitFlag:
    queueLock.acquire()
    if not workQueue.empty():
    task_arg = q.get()
    queueLock.release()
    main(task_arg)

    print ("%s processing %s" % (threadName, '*'*20))
    else:
    queueLock.release()
    time.sleep(1)

    def thread_task(threadList,task_args,n):
    global workQueue,queueLock,exitFlag
    queueLock = threading.Lock()
    workQueue = queue.Queue(n)
    exitFlag = 0
    threads = []
    threadID = 1
    # 创建新线程
    for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1
    # 填充队列
    queueLock.acquire()
    for task_arg in task_args:
    workQueue.put(task_arg)
    queueLock.release()
    # 等待队列清空
    while not workQueue.empty():
    pass
    # 通知线程是时候退出
    exitFlag = 1
    # 等待所有线程完成
    for t in threads:
    t.join()
    print("退出主线程")

    def main(data):
    # print(data)
    url = data['VideoUrl']
    id = data['id']
    title = data['title']
    if title == "":
    name = id
    else:
    name = title
    intab = r'[?*/\|.:><]'
    name = re.sub(intab, "", name).replace(" ","")
    try:
    response = requests.get(url, headers=headers)
    except:
    print('网页请求错误:\n','*'*100+'\n',name+'\n','*'*100)
    try:
    with open (f'{outout_dir}//{name}.mp4','wb') as b:
    b.write(response.content)
    print('已下载:',name)
    except:
    print('下载错误:\n','*'*100+'\n',name+'\n','*'*100)

    if __name__ == '__main__':
    threadList = []
    for i in range(50):
    threadList.append(f'Thread-{i+1}')
    outout_dir = r'./download_files'
    from faker import Factory
    fake = Factory().create('zh_CN')
    headers = {'User-Agent': fake.user_agent()}
    with open('data.json','r',encoding='utf-8') as f:
    data_json = json.load(f)
    task_args = data_json
    thread_task(threadList, task_args,len(task_args))