autowrite_day.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import asyncio
  2. import os
  3. import psutil
  4. import time
  5. from playwright.async_api import async_playwright
  6. # 执行自动化浏览器操作的协程函数
  7. def close_chrome():
  8. """关闭所有Chrome进程"""
  9. for proc in psutil.process_iter(['name']):
  10. try:
  11. if proc.info['name'] == 'chrome.exe':
  12. proc.kill()
  13. except (psutil.NoSuchProcess, psutil.AccessDenied):
  14. pass
  15. # 等待进程完全关闭
  16. time.sleep(2)
  17. async def run(playwright):
  18. # 确保Chrome已关闭
  19. close_chrome()
  20. # 使用用户实际的Chrome配置,配置查询chrome://version
  21. user_data_dir = r'C:\Users\zhens\AppData\Local\Google\Chrome\User Data'
  22. def text_to_div_html(text):
  23. """将多行文本转换为<div>...</div>格式的HTML"""
  24. lines = [line.strip() for line in text.strip().splitlines() if line.strip()]
  25. return ''.join(f'<div>{line}</div>' for line in lines)
  26. # 你可以在这里自定义要填写的内容
  27. summary_text = """
  28. 今日完成日报自动化脚本开发
  29. 修复了异常处理bug
  30. """
  31. plan_text = """
  32. 明天继续完善自动化脚本
  33. 增加更多异常场景测试
  34. """
  35. summary_html = text_to_div_html(summary_text)
  36. plan_html = text_to_div_html(plan_text)
  37. try:
  38. # 使用已有的Chrome配置文件启动浏览器
  39. context = await playwright.chromium.launch_persistent_context(
  40. user_data_dir,
  41. channel="chrome",
  42. headless=False,
  43. args=[
  44. '--start-maximized',
  45. '--disable-blink-features=AutomationControlled',
  46. '--profile-directory=Default',
  47. '--no-first-run',
  48. '--no-default-browser-check'
  49. ],
  50. ignore_default_args=['--enable-automation']
  51. )
  52. try:
  53. # 创建新页面
  54. page = await context.new_page()
  55. # 设置实际的User-Agent
  56. await page.set_extra_http_headers({
  57. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
  58. })
  59. # 访问页面并等待网络空闲
  60. await page.goto('https://doc.weixin.qq.com/forms/j/AFIADwd9AA4AUcA6AbCADgJXbq19fUR0j_base?page=6',
  61. wait_until='networkidle')
  62. # 等待元素出现并点击
  63. await page.wait_for_selector('div.TitleBarBtn_title__dKAyV:text("填写")', timeout=60000)
  64. await page.click('div.TitleBarBtn_title__dKAyV:text("填写")', delay=100)
  65. # 等待操作完成
  66. await asyncio.sleep(5)
  67. editors = await page.query_selector_all('.maileditor-editorview[contenteditable="true"]')
  68. print("找到输入框数量:", len(editors))
  69. # 打印每个输入框的父级HTML,帮助确认 data-qid
  70. # for i, editor in enumerate(editors):
  71. # parent = await editor.evaluate_handle('el => el.closest("div.question")')
  72. # parent_html = await parent.evaluate('el => el.outerHTML')
  73. # print(f"输入框{i+1}父级HTML:\n{parent_html}\n")
  74. # if len(editors) >= 2:
  75. # await editors[0].evaluate(f'el => el.innerHTML = `{summary_html}`')
  76. # await editors[1].evaluate(f'el => el.innerHTML = `{plan_html}`')
  77. # 今日工作总结
  78. await page.locator('div.question:has-text("今日工作总结") .maileditor-editorview[contenteditable="true"]').evaluate(
  79. f'el => el.innerHTML = `{summary_html}`'
  80. )
  81. # 明日工作计划
  82. await page.locator('div.question:has-text("明日工作计划") .maileditor-editorview[contenteditable="true"]').evaluate(
  83. f'el => el.innerHTML = `{plan_html}`'
  84. )
  85. # 等待操作完成
  86. await asyncio.sleep(5)
  87. finally:
  88. # 确保浏览器正常关闭
  89. await context.close()
  90. except Exception as e:
  91. print(f"发生错误: {str(e)}")
  92. raise
  93. # 主函数,用于启动 playwright 并调用 run 函数
  94. async def main():
  95. async with async_playwright() as playwright:
  96. await run(playwright)
  97. # 判断当前环境是否已经有事件循环在运行
  98. if __name__ == "__main__":
  99. try:
  100. # 尝试获取正在运行的事件循环(某些 IDE/Jupyter 会预先启动)
  101. loop = asyncio.get_running_loop()
  102. except RuntimeError:
  103. loop = None
  104. # 如果事件循环存在且正在运行(比如在 Jupyter Notebook 中)
  105. if loop and loop.is_running():
  106. print("检测到事件循环正在运行,使用 create_task 启动协程")
  107. asyncio.create_task(main()) # 使用 create_task 异步运行
  108. else:
  109. # 否则,正常使用 asyncio.run 启动主协程
  110. asyncio.run(main())