|
|
@@ -0,0 +1,134 @@
|
|
|
+import asyncio
|
|
|
+import os
|
|
|
+import psutil
|
|
|
+import time
|
|
|
+from playwright.async_api import async_playwright
|
|
|
+
|
|
|
+# 执行自动化浏览器操作的协程函数
|
|
|
+
|
|
|
+def close_chrome():
|
|
|
+ """关闭所有Chrome进程"""
|
|
|
+ for proc in psutil.process_iter(['name']):
|
|
|
+ try:
|
|
|
+ if proc.info['name'] == 'chrome.exe':
|
|
|
+ proc.kill()
|
|
|
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
|
+ pass
|
|
|
+ # 等待进程完全关闭
|
|
|
+ time.sleep(2)
|
|
|
+
|
|
|
+async def run(playwright):
|
|
|
+ # 确保Chrome已关闭
|
|
|
+ close_chrome()
|
|
|
+
|
|
|
+ # 使用用户实际的Chrome配置,配置查询chrome://version
|
|
|
+ user_data_dir = r'C:\Users\zhens\AppData\Local\Google\Chrome\User Data'
|
|
|
+
|
|
|
+ def text_to_div_html(text):
|
|
|
+ """将多行文本转换为<div>...</div>格式的HTML"""
|
|
|
+ lines = [line.strip() for line in text.strip().splitlines() if line.strip()]
|
|
|
+ return ''.join(f'<div>{line}</div>' for line in lines)
|
|
|
+
|
|
|
+ # 你可以在这里自定义要填写的内容
|
|
|
+ summary_text = """
|
|
|
+ 今日完成日报自动化脚本开发
|
|
|
+ 修复了异常处理bug
|
|
|
+ """
|
|
|
+ plan_text = """
|
|
|
+ 明天继续完善自动化脚本
|
|
|
+ 增加更多异常场景测试
|
|
|
+ """
|
|
|
+ summary_html = text_to_div_html(summary_text)
|
|
|
+ plan_html = text_to_div_html(plan_text)
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 使用已有的Chrome配置文件启动浏览器
|
|
|
+ context = await playwright.chromium.launch_persistent_context(
|
|
|
+ user_data_dir,
|
|
|
+ channel="chrome",
|
|
|
+ headless=False,
|
|
|
+ args=[
|
|
|
+ '--start-maximized',
|
|
|
+ '--disable-blink-features=AutomationControlled',
|
|
|
+ '--profile-directory=Default',
|
|
|
+ '--no-first-run',
|
|
|
+ '--no-default-browser-check'
|
|
|
+ ],
|
|
|
+ ignore_default_args=['--enable-automation']
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 创建新页面
|
|
|
+ page = await context.new_page()
|
|
|
+
|
|
|
+ # 设置实际的User-Agent
|
|
|
+ await page.set_extra_http_headers({
|
|
|
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
|
|
|
+ })
|
|
|
+
|
|
|
+ # 访问页面并等待网络空闲
|
|
|
+ await page.goto('https://doc.weixin.qq.com/forms/j/AFIADwd9AA4AUcA6AbCADgJXbq19fUR0j_base?page=6',
|
|
|
+ wait_until='networkidle')
|
|
|
+
|
|
|
+ # 等待元素出现并点击
|
|
|
+ await page.wait_for_selector('div.TitleBarBtn_title__dKAyV:text("填写")', timeout=60000)
|
|
|
+ await page.click('div.TitleBarBtn_title__dKAyV:text("填写")', delay=100)
|
|
|
+
|
|
|
+ # 等待操作完成
|
|
|
+ await asyncio.sleep(5)
|
|
|
+
|
|
|
+ editors = await page.query_selector_all('.maileditor-editorview[contenteditable="true"]')
|
|
|
+ print("找到输入框数量:", len(editors))
|
|
|
+
|
|
|
+ # 打印每个输入框的父级HTML,帮助确认 data-qid
|
|
|
+ # for i, editor in enumerate(editors):
|
|
|
+ # parent = await editor.evaluate_handle('el => el.closest("div.question")')
|
|
|
+ # parent_html = await parent.evaluate('el => el.outerHTML')
|
|
|
+ # print(f"输入框{i+1}父级HTML:\n{parent_html}\n")
|
|
|
+
|
|
|
+ # if len(editors) >= 2:
|
|
|
+ # await editors[0].evaluate(f'el => el.innerHTML = `{summary_html}`')
|
|
|
+ # await editors[1].evaluate(f'el => el.innerHTML = `{plan_html}`')
|
|
|
+ # 今日工作总结
|
|
|
+ await page.locator('div.question:has-text("今日工作总结") .maileditor-editorview[contenteditable="true"]').evaluate(
|
|
|
+ f'el => el.innerHTML = `{summary_html}`'
|
|
|
+ )
|
|
|
+ # 明日工作计划
|
|
|
+ await page.locator('div.question:has-text("明日工作计划") .maileditor-editorview[contenteditable="true"]').evaluate(
|
|
|
+ f'el => el.innerHTML = `{plan_html}`'
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+ # 等待操作完成
|
|
|
+ await asyncio.sleep(5)
|
|
|
+
|
|
|
+ finally:
|
|
|
+ # 确保浏览器正常关闭
|
|
|
+ await context.close()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"发生错误: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+# 主函数,用于启动 playwright 并调用 run 函数
|
|
|
+
|
|
|
+
|
|
|
+async def main():
|
|
|
+ async with async_playwright() as playwright:
|
|
|
+ await run(playwright)
|
|
|
+
|
|
|
+# 判断当前环境是否已经有事件循环在运行
|
|
|
+if __name__ == "__main__":
|
|
|
+ try:
|
|
|
+ # 尝试获取正在运行的事件循环(某些 IDE/Jupyter 会预先启动)
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
+ except RuntimeError:
|
|
|
+ loop = None
|
|
|
+
|
|
|
+ # 如果事件循环存在且正在运行(比如在 Jupyter Notebook 中)
|
|
|
+ if loop and loop.is_running():
|
|
|
+ print("检测到事件循环正在运行,使用 create_task 启动协程")
|
|
|
+ asyncio.create_task(main()) # 使用 create_task 异步运行
|
|
|
+ else:
|
|
|
+ # 否则,正常使用 asyncio.run 启动主协程
|
|
|
+ asyncio.run(main())
|
|
|
+
|