| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import pymysql
- import requests
- import json
- import calendar
- from datetime import datetime
- # 数据库配置
- DB_CONFIG = {
- 'host': '192.168.5.23',
- 'port': 3316,
- 'user': 'kali',
- 'password': '123456',
- 'database': 'wizksent',
- 'charset': 'utf8mb4'
- }
- # 企业微信机器人webhook地址
- # WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=42952463-05dd-4935-a2d1-8304de70f1e8"
- # 子兰生活
- WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=795d0b1e-20e9-40ab-979c-a5752cd2de678"
- def send_wechat_robot_message(content, msg_type="markdown_v2"):
- """通过企业微信机器人发送消息"""
- if msg_type == "markdown_v2":
- payload = {
- "msgtype": "markdown_v2",
- "markdown_v2": {
- "content": content
- }
- }
- elif msg_type == "text":
- payload = {
- "msgtype": "text",
- "text": {
- "content": content
- }
- }
-
- try:
- response = requests.post(WEBHOOK_URL, json=payload, timeout=10)
- result = response.json()
- if result.get('errcode') != 0:
- print(f"发送消息失败: {result}")
- return False
- return True
- except Exception as e:
- print(f"发送消息异常: {e}")
- return False
- def get_month_date_range():
- """获取当月1号到月底的时间范围"""
- today = datetime.now()
- first_day = today.replace(day=1)
- last_day = today.replace(day=calendar.monthrange(today.year, today.month)[1])
-
- start_date = first_day.strftime('%Y-%m-%d 00:00:00')
- end_date = last_day.strftime('%Y-%m-%d 23:59:59')
-
- return start_date, end_date
- def get_document_statistics():
- """获取文档统计数据"""
- start_date, end_date = get_month_date_range()
-
- try:
- connection = pymysql.connect(**DB_CONFIG)
- with connection.cursor() as cursor:
- sql = """
- SELECT
- wk.KB_NAME as 一级目录,
- wt.TAG_NAME as 二级目录,
- COUNT(*) as 文档数量,
- MIN(d.DT_CREATED) as 最早创建时间,
- MAX(d.DT_CREATED) as 最晚创建时间
- FROM wizksent.wiz_document d
- INNER JOIN wizksent.wiz_document_tag dt
- ON d.DOCUMENT_GUID = dt.DOCUMENT_GUID
- INNER JOIN wizksent.wiz_tag wt
- ON dt.TAG_GUID = wt.TAG_GUID
- INNER JOIN wizasent.wiz_kb wk
- ON HEX(wt.KB_GUID) = UPPER(REPLACE(wk.KB_GUID, '-', ''))
- WHERE wk.KB_NAME IS NOT NULL
- AND d.DT_CREATED >= %s
- AND d.DT_CREATED <= %s
- GROUP BY wk.KB_NAME, wt.TAG_NAME
- ORDER BY wk.KB_NAME, wt.TAG_NAME
- """
-
- cursor.execute(sql, (start_date, end_date))
- results = cursor.fetchall()
-
- return results, start_date, end_date
-
- except pymysql.Error as e:
- raise Exception(f"数据库查询失败: {e}")
- finally:
- if 'connection' in locals() and connection:
- connection.close()
- def format_markdown_v2_message(data, start_date, end_date):
- """格式化企业微信markdown_v2消息 - 使用正确的表格格式"""
- current_month = datetime.now().strftime('%Y年%m月')
- total_docs = sum(row[2] for row in data) if data else 0
-
- message = f"# 📊 为知笔记文档统计报告 ({current_month})\n\n"
- message += f"**统计周期**: {start_date.split(' ')[0]} 至 {end_date.split(' ')[0]}\n\n"
- message += f"**文档总数**: {total_docs} 篇\n\n"
-
- if data:
- # 按一级目录分组统计
- category_stats = {}
- for row in data:
- kb_name, tag_name, count, min_date, max_date = row
- if kb_name not in category_stats:
- category_stats[kb_name] = {
- 'total': 0,
- 'sub_categories': []
- }
- category_stats[kb_name]['total'] += count
- category_stats[kb_name]['sub_categories'].append({
- 'name': tag_name,
- 'count': count,
- 'min_date': min_date,
- 'max_date': max_date
- })
-
- # 按文档数量排序
- sorted_categories = sorted(category_stats.items(), key=lambda x: x[1]['total'], reverse=True)
-
- message += "## 📋 分类统计详情\n\n"
-
- # 使用正确的表格格式
- message += "| 集团 | 部门 | 文档数量 | 最早创建 | 最晚创建 |\n"
- message += "| :--- | :--- | :---: | :---: | :---: |\n"
-
- for kb_name, stats in sorted_categories:
- # 先显示一级目录的汇总行
- message += f"| **{kb_name}** | **汇总** | **{stats['total']}** | - | - |\n"
-
- # 显示该一级目录下的所有二级目录
- for sub_cat in sorted(stats['sub_categories'], key=lambda x: x['count'], reverse=True):
- min_date_str = sub_cat['min_date'].strftime('%m-%d') if sub_cat['min_date'] else '-'
- max_date_str = sub_cat['max_date'].strftime('%m-%d') if sub_cat['max_date'] else '-'
- message += f"| ↳ | {sub_cat['name']} | {sub_cat['count']} | {min_date_str} | {max_date_str} |\n"
-
- # 添加空行分隔不同的一级目录
- message += "| | | | | |\n"
-
- # 添加汇总表格
- message += "## 📈 数据汇总\n\n"
- message += "| 集团 | 文档数量 | 占比 |\n"
- message += "| :--- | :---: | :---: |\n"
-
- for kb_name, stats in sorted_categories:
- percentage = f"{(stats['total']/total_docs*100):.1f}%" if total_docs > 0 else "0%"
- message += f"| {kb_name} | {stats['total']} | {percentage} |\n"
-
- else:
- message += "> 本月暂无文档数据\n"
-
- message += f"\n**统计时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
- message += "### 统计范围🕐:\n"
- message += "**未分类**📌标签下的笔记不在统计范围内!请将📓笔记移动到相关部门的标签下\n"
- return message
- def format_simple_markdown_v2_message(data, start_date, end_date):
- """简洁版markdown_v2消息"""
- current_month = datetime.now().strftime('%Y年%m月')
- total_docs = sum(row[2] for row in data) if data else 0
-
- message = f"# 📊 {current_month}文档统计\n\n"
- message += f"**统计周期**: {start_date.split(' ')[0]} 至 {end_date.split(' ')[0]}\n"
- message += f"**文档总数**: {total_docs}篇\n\n"
-
- if data:
- # 按一级目录分组统计
- category_totals = {}
- for row in data:
- kb_name, _, count, _, _ = row
- category_totals[kb_name] = category_totals.get(kb_name, 0) + count
-
- message += "## 📋 分类统计\n\n"
-
- for kb_name, count in sorted(category_totals.items(), key=lambda x: x[1], reverse=True):
- message += f"**{kb_name}**: {count}篇\n"
-
- # 简单表格
- message += "\n## 📊 详细数据\n\n"
- message += "| 集团 | 部门 | 数量 |\n"
- message += "| :--- | :--- | :---: |\n"
-
- for row in data:
- kb_name, tag_name, count, _, _ = row
- message += f"| {kb_name} | {tag_name} | {count} |\n"
-
- message += f"\n⏰ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
-
- return message
- def main():
- try:
- print("开始获取文档统计数据...")
- data, start_date, end_date = get_document_statistics()
-
- # 使用markdown_v2格式
- markdown_content = format_markdown_v2_message(data, start_date, end_date)
- print("消息内容生成成功!")
-
- # 打印消息内容(用于调试)
- print("\n" + "="*50)
- print(markdown_content)
- print("="*50 + "\n")
-
- # 发送到企业微信机器人
- success = send_wechat_robot_message(markdown_content, msg_type="markdown_v2")
- if success:
- print("消息发送成功!")
- else:
- print("消息发送失败,请检查webhook配置")
-
- except Exception as e:
- error_message = f"❌ 统计任务失败\n\n错误信息: {str(e)}\n\n发生时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
- print(f"执行失败: {e}")
- # 发送错误通知
- send_wechat_robot_message(error_message, msg_type="text")
- if __name__ == "__main__":
- main()
|