标签 python 下的文章 - 空痕博客
首页
小记
php
python
uni
前端
其他
机器人
QQ机器人
APP
KHMD
KHMD-v3
其他页面
友情链接
用户留言
联系空痕
热门文章
PHP搭建QQ机器人(QQ官方)
下载文件到指定文件夹
欢迎回来 Typecho !
UTS引用原生jar包进行原生插件开发
PHP封装功能较全CURL函数
标签搜索
uniapp
python
PHP
APP
KongHen
机器人
QQ
UTS
ID3
pyinstaller
redis
Echarts
模板
邮箱
js
lyear
夸克网盘
发布
登录
找到
2
篇与
相关的结果
2025-07-06
上传文件到夸克网盘python代码
以下为根据个人需求,参考OpenList让DeepSeek编写的文件上传python脚本 简要说明 包含的功能 创建文件夹 分片上传文件 分享文件夹 获取分享链接 cookie存储在quark.cookie.txt文件中即可 基本变量说明: PARENT_DIR_ID:创建文件夹的父文件夹id,如果在根目录下创建文件夹,输入0 FOLDER_NAME:要创建的文件夹名 FILE_PATH:要上传的文件 上传频繁会被拉黑 脚本代码 import base64 import hashlib import io import json import logging import mimetypes import os import time import requests from datetime import datetime, timezone from typing import Dict, List, Tuple, Optional, Callable # 配置日志 log = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) try: from colorama import init, Fore, Style init(autoreset=True) COLOR_SUPPORT = True except ImportError: COLOR_SUPPORT = False print("未安装colorama库,将显示无颜色输出。安装命令: pip install colorama") class QuarkOrUC: def __init__(self, cookie: str, parent_dir_id: str): """ 初始化Quark网盘客户端 :param cookie: 完整的认证Cookie字符串 :param parent_dir_id: 默认父目录ID """ self.cookie = cookie self.parent_dir_id = parent_dir_id # 配置信息 self.config = { "api": "https://drive.quark.cn/1/clouddrive", "referer": "https://pan.quark.cn", "origin": "https://pan.quark.cn", "pr": "ucpro", "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" } def colored_print(self, message: str, color: str = "white") -> None: """带颜色打印消息""" if not COLOR_SUPPORT: print(message) return colors = { "red": Fore.RED, "green": Fore.GREEN, "yellow": Fore.YELLOW, "blue": Fore.BLUE, "cyan": Fore.CYAN, "white": Fore.WHITE, } print(colors.get(color, Fore.WHITE) + message + Style.RESET_ALL) def request(self, pathname: str, method: str, data: Optional[Dict] = None, query_params: Optional[Dict] = None) -> Tuple[Optional[Dict], Optional[Exception]]: """ 发送API请求 :param pathname: API路径 :param method: HTTP方法 (GET/POST) :param data: POST请求的数据 :param query_params: 额外的查询参数 :return: (响应数据, 错误对象) """ url = self.config['api'] + pathname headers = { "Cookie": self.cookie, "Accept": "application/json, text/plain, */*", "Referer": self.config['referer'], "User-Agent": self.config['ua'], "Origin": self.config['origin'], "Priority": "u=1, i" } # 基础查询参数 params = {"pr": self.config['pr'], "fr": "pc"} # 添加额外的查询参数 if query_params: params.update(query_params) try: if method.upper() == "GET": response = requests.get(url, headers=headers, params=params) elif method.upper() == "POST": headers["Content-Type"] = "application/json" response = requests.post(url, headers=headers, params=params, json=data) else: raise ValueError(f"不支持的HTTP方法: {method}") response.raise_for_status() json_resp = response.json() # 更新cookie if '__puus' in response.cookies: self.cookie = f"{self.cookie}; __puus={response.cookies['__puus']}" # 检查错误响应 if json_resp.get('status', 200) >= 400 or json_resp.get('code', 0) != 0: error_msg = json_resp.get('message', '未知错误') log.error(f"API错误: {error_msg}") return None, Exception(error_msg) return json_resp, None except requests.exceptions.RequestException as e: log.error(f"请求失败: {str(e)}") return None, e except Exception as e: log.error(f"处理响应时出错: {str(e)}") return None, e def create_folder(self, folder_name: str) -> Tuple[Optional[Dict], Optional[Exception]]: """ 创建文件夹 :param folder_name: 文件夹名称 :return: (响应数据, 错误对象) """ data = { "pdir_fid": self.parent_dir_id, "file_name": folder_name, "dir_path": "", "dir_init_lock": False } return self.request("/file", "POST", data) def create_share(self, fid_list: List[str], title: str, expired_type: int = 1, url_type: int = 1) -> Tuple[Optional[Dict], Optional[Exception]]: """ 创建分享任务 :param fid_list: 要分享的文件/文件夹ID列表 :param title: 分享标题 :param expired_type: 过期类型 (1: 永久) :param url_type: URL类型 (1: 标准链接) :return: (响应数据, 错误对象) """ data = { "fid_list": fid_list, "title": title, "url_type": url_type, "expired_type": expired_type } return self.request("/share", "POST", data) def check_share_task(self, task_id: str) -> Tuple[Optional[Dict], Optional[Exception]]: """ 检查分享任务状态 :param task_id: 分享任务ID :return: (响应数据, 错误对象) """ query_params = { "task_id": task_id, "retry_index": 0 } return self.request("/task", "GET", query_params=query_params) def get_share_info(self, share_id: str) -> Tuple[Optional[Dict], Optional[Exception]]: """ 获取分享链接信息 :param share_id: 分享ID :return: (响应数据, 错误对象) """ data = {"share_id": share_id} return self.request("/share/password", "POST", data) def up_pre(self, file_name: str, mimetype: str, size: int, parent_id: str) -> Tuple[Optional[Dict], Optional[Exception]]: """ 预上传请求 :param file_name: 文件名 :param mimetype: MIME类型 :param size: 文件大小 :param parent_id: 父目录ID :return: (预上传响应, 错误对象) """ now = int(time.time() * 1000) data = { "ccp_hash_update": True, "dir_name": '', "file_name": file_name, "format_type": mimetype, "l_created_at": now, "l_updated_at": now, "pdir_fid": parent_id, "size": size } return self.request("/file/upload/pre", "POST", data) def up_hash(self, md5_hash: str, sha1_hash: str, task_id: str) -> Tuple[bool, Optional[Exception]]: """ 提交文件哈希验证 :param md5_hash: MD5哈希值 :param sha1_hash: SHA1哈希值 :param task_id: 上传任务ID :return: (是否已完成上传, 错误对象) """ data = { "md5": md5_hash, "sha1": sha1_hash, "task_id": task_id } resp, err = self.request("/file/update/hash", "POST", data) if err: return False, err return (resp.get('data', {}).get('finish', False), None) def up_part(self, pre: Dict, mime_type: str, part_number: int, chunk_data: bytes) -> Tuple[Optional[str], Optional[Exception]]: """ 上传文件分片 :param pre: 预上传响应数据 :param mime_type: MIME类型 :param part_number: 分片编号 :param chunk_data: 分片数据 :return: (ETag, 错误对象) """ # 使用 timezone-aware datetime now = datetime.now(timezone.utc).strftime('%a, %d %b %Y %H:%M:%S GMT') auth_meta = f"PUT\n\n{mime_type}\n{now}\nx-oss-date:{now}\nx-oss-user-agent:aliyun-sdk-js/6.6.1 Chrome 98.0.4758.80 on Windows 10 64-bit\n/{pre['data']['bucket']}/{pre['data']['obj_key']}?partNumber={part_number}&uploadId={pre['data']['upload_id']}" auth_data = { "auth_info": pre['data']['auth_info'], "auth_meta": auth_meta, "task_id": pre['data']['task_id'] } # 获取上传授权 auth_resp, err = self.request("/file/upload/auth", "POST", auth_data) if err: return None, err # 执行分块上传 upload_url = f"https://{pre['data']['bucket']}.{pre['data']['upload_url'][7:]}/{pre['data']['obj_key']}" headers = { "Authorization": auth_resp['data']['auth_key'], "Content-Type": mime_type, "Referer": "https://pan.quark.cn/", "x-oss-date": now, "x-oss-user-agent": "aliyun-sdk-js/6.6.1 Chrome 98.0.4758.80 on Windows 10 64-bit" } params = { "partNumber": str(part_number), "uploadId": pre['data']['upload_id'] } try: response = requests.put( upload_url, headers=headers, params=params, data=chunk_data, timeout=30 ) response.raise_for_status() return response.headers.get('ETag'), None except requests.exceptions.RequestException as e: log.error(f"分片上传失败: {str(e)}") return None, e def up_commit(self, pre: Dict, etags: List[str]) -> Optional[Exception]: """ 提交分片上传完成 :param pre: 预上传响应数据 :param etags: 所有分片的ETag列表 :return: 错误对象 """ # 使用 timezone-aware datetime now = datetime.now(timezone.utc).strftime('%a, %d %b %Y %H:%M:%S GMT') # 构建XML body xml_parts = [] for i, etag in enumerate(etags, 1): xml_parts.append(f"<Part><PartNumber>{i}</PartNumber><ETag>{etag}</ETag></Part>") xml_body = f"""<?xml version="1.0" encoding="UTF-8"?> <CompleteMultipartUpload> {"".join(xml_parts)} </CompleteMultipartUpload>""" # 计算Content-MD5 md5 = hashlib.md5() md5.update(xml_body.encode('utf-8')) content_md5 = base64.b64encode(md5.digest()).decode('utf-8') # 处理回调数据 callback_json = json.dumps(pre['data']['callback']) callback_b64 = base64.b64encode(callback_json.encode('utf-8')).decode('utf-8') # 构建授权元数据 auth_meta = f"POST\n{content_md5}\napplication/xml\n{now}\nx-oss-callback:{callback_b64}\nx-oss-date:{now}\nx-oss-user-agent:aliyun-sdk-js/6.6.1 Chrome 98.0.4758.80 on Windows 10 64-bit\n/{pre['data']['bucket']}/{pre['data']['obj_key']}?uploadId={pre['data']['upload_id']}" auth_data = { "auth_info": pre['data']['auth_info'], "auth_meta": auth_meta, "task_id": pre['data']['task_id'] } # 获取提交授权 auth_resp, err = self.request("/file/upload/auth", "POST", auth_data) if err: return err # 执行提交请求 upload_url = f"https://{pre['data']['bucket']}.{pre['data']['upload_url'][7:]}/{pre['data']['obj_key']}" headers = { "Authorization": auth_resp['data']['auth_key'], "Content-MD5": content_md5, "Content-Type": "application/xml", "Referer": "https://pan.quark.cn/", "x-oss-callback": callback_b64, "x-oss-date": now, "x-oss-user-agent": "aliyun-sdk-js/6.6.1 Chrome 98.0.4758.80 on Windows 10 64-bit" } params = {"uploadId": pre['data']['upload_id']} try: response = requests.post( upload_url, headers=headers, params=params, data=xml_body, timeout=30 ) response.raise_for_status() return None except requests.exceptions.RequestException as e: log.error(f"提交上传失败: {str(e)}") return e def up_finish(self, pre: Dict) -> Optional[Exception]: """ 完成上传流程 :param pre: 预上传响应数据 :return: 错误对象 """ data = { "obj_key": pre['data']['obj_key'], "task_id": pre['data']['task_id'] } _, err = self.request("/file/upload/finish", "POST", data) time.sleep(1) # 等待服务器处理 return err def upload_file(self, file_name: str, file_data: bytes, parent_dir_id: Optional[str] = None, progress_callback: Optional[Callable] = None) -> Tuple[Optional[str], Optional[Exception]]: """ 完整的文件上传流程 :param file_name: 文件名 :param file_data: 文件二进制数据 :param parent_dir_id: 父目录ID(默认为初始化时设置的目录) :param progress_callback: 进度回调函数 :return: (文件ID, 错误对象) - 成功时返回文件ID,失败时返回错误 """ # 使用默认父目录ID(如果未提供) if parent_dir_id is None: parent_dir_id = self.parent_dir_id # 自动检测MIME类型 mime_type, _ = mimetypes.guess_type(file_name) if not mime_type: mime_type = "application/octet-stream" file_size = len(file_data) file_stream = io.BytesIO(file_data) # 步骤1: 计算文件哈希 md5_hash = hashlib.md5() sha1_hash = hashlib.sha1() file_stream.seek(0) # 计算哈希 while True: chunk = file_stream.read(8192) if not chunk: break md5_hash.update(chunk) sha1_hash.update(chunk) md5_hex = md5_hash.hexdigest() sha1_hex = sha1_hash.hexdigest() # 重置文件流指针 file_stream.seek(0) # 步骤2: 预上传请求 pre_resp, err = self.up_pre(file_name, mime_type, file_size, parent_dir_id) if err: return None, f"预上传失败: {str(err)}" # 步骤3: 提交哈希验证 finished, err = self.up_hash(md5_hex, sha1_hex, pre_resp['data']['task_id']) if err: return None, f"哈希验证失败: {str(err)}" if finished: self.colored_print("服务器已有相同文件,跳过上传", "yellow") return None, "服务器已有相同文件" # 步骤4: 分块上传 part_size = pre_resp['metadata']['part_size'] etags = [] part_number = 1 while True: chunk = file_stream.read(part_size) if not chunk: break etag, err = self.up_part( pre_resp, mime_type, part_number, chunk ) if err: return None, f"分片{part_number}上传失败: {str(err)}" etags.append(etag) part_number += 1 # 更新进度 if progress_callback: progress = min(100, int(file_stream.tell() / file_size * 100)) progress_callback(progress) # 步骤5: 提交上传 err = self.up_commit(pre_resp, etags) if err: return None, f"提交上传失败: {str(err)}" # 步骤6: 完成上传 err = self.up_finish(pre_resp) if err: return None, f"完成上传失败: {str(err)}" # 返回上传文件的ID file_id = pre_resp.get('data', {}).get('fid') if not file_id: return None, "未能获取上传文件ID" return file_id, None def load_cookie_from_file(file_path: str) -> str: """从文件加载Cookie""" try: with open(file_path, "r") as f: return f.read().strip() except Exception as e: raise Exception(f"加载Cookie文件失败: {str(e)}") if __name__ == "__main__": # 配置信息 COOKIE_FILE = "cookie.quark.txt" PARENT_DIR_ID = "0" FOLDER_NAME = "新建文件夹名" FILE_PATH = "ce5.zip" # 1. 加载Cookie try: cookie = load_cookie_from_file(COOKIE_FILE) if not cookie: raise Exception("Cookie文件为空") except Exception as e: print(f"❌ 错误: {str(e)}") exit(1) # 创建客户端实例 client = QuarkOrUC(cookie=cookie, parent_dir_id=PARENT_DIR_ID) # 2. 创建文件夹 client.colored_print("\n🟡 正在创建文件夹...", "yellow") create_resp, create_err = client.create_folder(FOLDER_NAME) if create_err: client.colored_print(f"❌ 未创建文件夹失败: {str(create_err)}", "red") exit(1) # 获取新创建的文件夹ID new_folder_id = create_resp.get('data', {}).get('fid') if not new_folder_id: client.colored_print("❌ 未能获取新文件夹ID", "red") exit(1) client.colored_print(f"✅ 文件夹创建成功! ID: {new_folder_id}", "green") # 3. 上传文件到新创建的文件夹 try: if not os.path.exists(FILE_PATH): raise Exception(f"文件不存在: {FILE_PATH}") # 获取文件大小(以字节为单位) file_size_bytes = os.path.getsize(FILE_PATH) file_size_mb = file_size_bytes / (1024 * 1024) with open(FILE_PATH, "rb") as f: file_data = f.read() # 确保读取的文件大小与实际文件大小一致 if len(file_data) != file_size_bytes: raise Exception(f"文件读取不完整: 预期 {file_size_bytes} 字节, 实际读取 {len(file_data)} 字节") # 进度回调函数 def progress_callback(percent): print(f"\r🟢 上传进度: {percent}%", end="", flush=True) file_name = os.path.basename(FILE_PATH) client.colored_print(f"\n🟡 开始上传文件: {file_name} ({file_size_mb:.2f} MB)", "yellow") file_id, upload_err = client.upload_file( file_name=file_name, file_data=file_data, parent_dir_id=new_folder_id, progress_callback=progress_callback ) if upload_err: client.colored_print(f"\n❌ 文件上传失败: {str(upload_err)}", "red") exit(1) else: client.colored_print(f"\n✅ 文件上传成功! 文件ID: {file_id}", "green") # 4. 创建分享任务 client.colored_print("\n🟡 正在创建分享任务...", "yellow") share_resp, share_err = client.create_share( fid_list=[new_folder_id], title=FOLDER_NAME ) if share_err: client.colored_print(f"❌ 创建分享任务失败: {str(share_err)}", "red") exit(1) # 获取分享任务ID task_id = share_resp.get('data', {}).get('task_id') if not task_id: client.colored_print("❌ 未能获取分享任务ID", "red") exit(1) client.colored_print(f"✅ 分享任务创建成功! 任务ID: {task_id}", "green") # 5. 轮询分享任务状态 interval = 5 # 每次间隔5秒 client.colored_print("\n🟡 正在监测分享状态...", "yellow") while True: print("\r进行中...", end="", flush=True) # 检查分享任务状态 task_resp, task_err = client.check_share_task(task_id) if task_err: client.colored_print(f"\n❌ 检查分享任务状态失败: {str(task_err)}", "red") exit(1) task_data = task_resp.get('data', {}) task_status = task_data.get('status') # 状态2表示分享成功 if task_status == 2: share_id = task_data.get('share_id') if not share_id: client.colored_print("\n❌ 分享成功但未获取到share_id", "red") exit(1) # 获取分享链接信息 share_info_resp, share_info_err = client.get_share_info(share_id) if share_info_err: client.colored_print(f"\n❌ 获取分享链接失败: {str(share_info_err)}", "red") exit(1) share_info_data = share_info_resp.get('data', {}) share_url = share_info_data.get('share_url', '') passcode = share_info_data.get('passcode', '') # 注意:实际键名是pwd_id # 如果获取到分享链接,退出循环 if share_url: print("\n", end="") # 清除"进行中..."提示 break # 检查任务是否失败 elif task_status in [3, 4]: # 3: 失败, 4: 取消 client.colored_print(f"\n❌ 分享任务失败: 状态码 {task_status}", "red") exit(1) # 等待下一次检查 time.sleep(interval) # 6. 显示分享结果 client.colored_print("\n✅ 分享创建成功!", "green") client.colored_print(f"📁 分享标题: {FOLDER_NAME}", "cyan") client.colored_print(f"🔗 分享链接: {share_url}", "cyan") if passcode: client.colored_print(f"🔑 提取密码: {passcode}", "cyan") except Exception as e: client.colored_print(f"❌ 操作失败: {str(e)}", "red") exit(1)运行演示 运行演示图片 参考文献 OpenList
所有文章
python
# python
# 夸克网盘
KongHen02
昨天
0
27
0
2025-02-07
使用pyinstaller打包python程序
准备内容 py程序文件main.py(以单文件为例) 程序图标favicon.ico(如果需要的话),可使用在线生成透明ICO图标生成。 python环境 创建虚拟环境 使用venv库(python3.3以后内置此库)创建虚拟环境 python3 -m venv music参数描述示例music虚拟环境名称,修改为自己的虚拟环境名称即可music# 示例 python3 -m venv music1.jpg图片 激活虚拟环境 # Mac source ./music/bin/activate # Windows .\music\scripts\activate2.jpg图片 安装pyinstaller pip install pyinstaller注意:安装pyinstaller,一定要在虚拟环境中安装,否则打包会丢失库,导致程序无法运行。 3.jpg图片 安装其他必须库 我编写的程序需要用到requests登其他库,所以需要安装,根据自己的程序按需安装即可。 编写程序 示例文件为main.py,并在虚拟环境并安装必须库。 4.jpg图片 测试运行 python3 main.py开始打包 pyinstaller --clean -F -i favicon.ico main.py参数描述示例--clean清理历史构建缓存--clean-F打包为单文件程序-F-i favicon.ico程序图标,如不需要删除即可-i favicon.icomain.pypython程序文件main.py输出以下内容表示打包成功 INFO: Building EXE from EXE-00.toc completed successfully.5.jpg图片 打包结果 打包结果程序位于dist目录下,与程序文件同名。 6.jpg图片 运行测试 7.jpg图片 退出虚拟环境 deactivate总结 pyinstaller需要安装的虚拟环境内,不能安装在全局 使用Mac只能打包为Mac程序,使用Windows只能打包Windows程序 参考 创建虚拟环境的四种方式(venv | pipenv | conda | poetry) Python pyinstaller打包exe最完整教程 pyinstaller 打包 提示 ModuleNotFoundError: No module named ‘xxx‘
小记
python
# python
# pyinstaller
KongHen02
2月7日
0
4
0
易航博客