#!/usr/bin/env python3 # -*- coding: utf-8 -*- import pymysql import requests import json import calendar from datetime import datetime # 数据库配置 DB_CONFIG = { 'host': '192.168.5.23', 'port': 3316, 'user': 'kali', 'password': '123456', 'database': 'wizksent', 'charset': 'utf8mb4' } # 企业微信机器人webhook地址 # WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=42952463-05dd-4935-a2d1-8304de70f1e8" # 子兰生活 WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=795d0b1e-20e9-40ab-979c-a5752cd2de678" def send_wechat_robot_message(content, msg_type="markdown_v2"): """通过企业微信机器人发送消息""" if msg_type == "markdown_v2": payload = { "msgtype": "markdown_v2", "markdown_v2": { "content": content } } elif msg_type == "text": payload = { "msgtype": "text", "text": { "content": content } } try: response = requests.post(WEBHOOK_URL, json=payload, timeout=10) result = response.json() if result.get('errcode') != 0: print(f"发送消息失败: {result}") return False return True except Exception as e: print(f"发送消息异常: {e}") return False def get_month_date_range(): """获取当月1号到月底的时间范围""" today = datetime.now() first_day = today.replace(day=1) last_day = today.replace(day=calendar.monthrange(today.year, today.month)[1]) start_date = first_day.strftime('%Y-%m-%d 00:00:00') end_date = last_day.strftime('%Y-%m-%d 23:59:59') return start_date, end_date def get_document_statistics(): """获取文档统计数据""" start_date, end_date = get_month_date_range() try: connection = pymysql.connect(**DB_CONFIG) with connection.cursor() as cursor: sql = """ SELECT wk.KB_NAME as 一级目录, wt.TAG_NAME as 二级目录, COUNT(*) as 文档数量, MIN(d.DT_CREATED) as 最早创建时间, MAX(d.DT_CREATED) as 最晚创建时间 FROM wizksent.wiz_document d INNER JOIN wizksent.wiz_document_tag dt ON d.DOCUMENT_GUID = dt.DOCUMENT_GUID INNER JOIN wizksent.wiz_tag wt ON dt.TAG_GUID = wt.TAG_GUID INNER JOIN wizasent.wiz_kb wk ON HEX(wt.KB_GUID) = UPPER(REPLACE(wk.KB_GUID, '-', '')) WHERE wk.KB_NAME IS NOT NULL AND d.DT_CREATED >= %s AND d.DT_CREATED <= %s GROUP BY wk.KB_NAME, wt.TAG_NAME ORDER BY wk.KB_NAME, wt.TAG_NAME """ cursor.execute(sql, (start_date, end_date)) results = cursor.fetchall() return results, start_date, end_date except pymysql.Error as e: raise Exception(f"数据库查询失败: {e}") finally: if 'connection' in locals() and connection: connection.close() def format_markdown_v2_message(data, start_date, end_date): """格式化企业微信markdown_v2消息 - 使用正确的表格格式""" current_month = datetime.now().strftime('%Y年%m月') total_docs = sum(row[2] for row in data) if data else 0 message = f"# 📊 为知笔记文档统计报告 ({current_month})\n\n" message += f"**统计周期**: {start_date.split(' ')[0]} 至 {end_date.split(' ')[0]}\n\n" message += f"**文档总数**: {total_docs} 篇\n\n" if data: # 按一级目录分组统计 category_stats = {} for row in data: kb_name, tag_name, count, min_date, max_date = row if kb_name not in category_stats: category_stats[kb_name] = { 'total': 0, 'sub_categories': [] } category_stats[kb_name]['total'] += count category_stats[kb_name]['sub_categories'].append({ 'name': tag_name, 'count': count, 'min_date': min_date, 'max_date': max_date }) # 按文档数量排序 sorted_categories = sorted(category_stats.items(), key=lambda x: x[1]['total'], reverse=True) message += "## 📋 分类统计详情\n\n" # 使用正确的表格格式 message += "| 集团 | 部门 | 文档数量 | 最早创建 | 最晚创建 |\n" message += "| :--- | :--- | :---: | :---: | :---: |\n" for kb_name, stats in sorted_categories: # 先显示一级目录的汇总行 message += f"| **{kb_name}** | **汇总** | **{stats['total']}** | - | - |\n" # 显示该一级目录下的所有二级目录 for sub_cat in sorted(stats['sub_categories'], key=lambda x: x['count'], reverse=True): min_date_str = sub_cat['min_date'].strftime('%m-%d') if sub_cat['min_date'] else '-' max_date_str = sub_cat['max_date'].strftime('%m-%d') if sub_cat['max_date'] else '-' message += f"| ↳ | {sub_cat['name']} | {sub_cat['count']} | {min_date_str} | {max_date_str} |\n" # 添加空行分隔不同的一级目录 message += "| | | | | |\n" # 添加汇总表格 message += "## 📈 数据汇总\n\n" message += "| 集团 | 文档数量 | 占比 |\n" message += "| :--- | :---: | :---: |\n" for kb_name, stats in sorted_categories: percentage = f"{(stats['total']/total_docs*100):.1f}%" if total_docs > 0 else "0%" message += f"| {kb_name} | {stats['total']} | {percentage} |\n" else: message += "> 本月暂无文档数据\n" message += f"\n**统计时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" message += "### 统计范围🕐:\n" message += "**未分类**📌标签下的笔记不在统计范围内!请将📓笔记移动到相关部门的标签下\n" return message def format_simple_markdown_v2_message(data, start_date, end_date): """简洁版markdown_v2消息""" current_month = datetime.now().strftime('%Y年%m月') total_docs = sum(row[2] for row in data) if data else 0 message = f"# 📊 {current_month}文档统计\n\n" message += f"**统计周期**: {start_date.split(' ')[0]} 至 {end_date.split(' ')[0]}\n" message += f"**文档总数**: {total_docs}篇\n\n" if data: # 按一级目录分组统计 category_totals = {} for row in data: kb_name, _, count, _, _ = row category_totals[kb_name] = category_totals.get(kb_name, 0) + count message += "## 📋 分类统计\n\n" for kb_name, count in sorted(category_totals.items(), key=lambda x: x[1], reverse=True): message += f"**{kb_name}**: {count}篇\n" # 简单表格 message += "\n## 📊 详细数据\n\n" message += "| 集团 | 部门 | 数量 |\n" message += "| :--- | :--- | :---: |\n" for row in data: kb_name, tag_name, count, _, _ = row message += f"| {kb_name} | {tag_name} | {count} |\n" message += f"\n⏰ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" return message def main(): try: print("开始获取文档统计数据...") data, start_date, end_date = get_document_statistics() # 使用markdown_v2格式 markdown_content = format_markdown_v2_message(data, start_date, end_date) print("消息内容生成成功!") # 打印消息内容(用于调试) print("\n" + "="*50) print(markdown_content) print("="*50 + "\n") # 发送到企业微信机器人 success = send_wechat_robot_message(markdown_content, msg_type="markdown_v2") if success: print("消息发送成功!") else: print("消息发送失败,请检查webhook配置") except Exception as e: error_message = f"❌ 统计任务失败\n\n错误信息: {str(e)}\n\n发生时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" print(f"执行失败: {e}") # 发送错误通知 send_wechat_robot_message(error_message, msg_type="text") if __name__ == "__main__": main()