Ver Fonte

青龙脚本

zhensolid há 2 meses atrás
pai
commit
1a80b71e34
1 ficheiros alterados com 230 adições e 0 exclusões
  1. 230 0
      wiz_monthly_document_report.py

+ 230 - 0
wiz_monthly_document_report.py

@@ -0,0 +1,230 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import pymysql
+import requests
+import json
+import calendar
+from datetime import datetime
+
+# 数据库配置
+DB_CONFIG = {
+    'host': '192.168.5.23',
+    'port': 3316,
+    'user': 'kali',
+    'password': '123456',
+    'database': 'wizksent',
+    'charset': 'utf8mb4'
+}
+
+# 企业微信机器人webhook地址
+# WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=42952463-05dd-4935-a2d1-8304de70f1e8"
+# 子兰生活
+WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=795d0b1e-20e9-40ab-979c-a5752cd2de678"
+def send_wechat_robot_message(content, msg_type="markdown_v2"):
+    """通过企业微信机器人发送消息"""
+    if msg_type == "markdown_v2":
+        payload = {
+            "msgtype": "markdown_v2",
+            "markdown_v2": {
+                "content": content
+            }
+        }
+    elif msg_type == "text":
+        payload = {
+            "msgtype": "text",
+            "text": {
+                "content": content
+            }
+        }
+    
+    try:
+        response = requests.post(WEBHOOK_URL, json=payload, timeout=10)
+        result = response.json()
+        if result.get('errcode') != 0:
+            print(f"发送消息失败: {result}")
+            return False
+        return True
+    except Exception as e:
+        print(f"发送消息异常: {e}")
+        return False
+
+def get_month_date_range():
+    """获取当月1号到月底的时间范围"""
+    today = datetime.now()
+    first_day = today.replace(day=1)
+    last_day = today.replace(day=calendar.monthrange(today.year, today.month)[1])
+    
+    start_date = first_day.strftime('%Y-%m-%d 00:00:00')
+    end_date = last_day.strftime('%Y-%m-%d 23:59:59')
+    
+    return start_date, end_date
+
+def get_document_statistics():
+    """获取文档统计数据"""
+    start_date, end_date = get_month_date_range()
+    
+    try:
+        connection = pymysql.connect(**DB_CONFIG)
+        with connection.cursor() as cursor:
+            sql = """
+            SELECT 
+                wk.KB_NAME as 一级目录,
+                wt.TAG_NAME as 二级目录,
+                COUNT(*) as 文档数量,
+                MIN(d.DT_CREATED) as 最早创建时间,
+                MAX(d.DT_CREATED) as 最晚创建时间
+            FROM wizksent.wiz_document d
+            INNER JOIN wizksent.wiz_document_tag dt 
+                ON d.DOCUMENT_GUID = dt.DOCUMENT_GUID
+            INNER JOIN wizksent.wiz_tag wt 
+                ON dt.TAG_GUID = wt.TAG_GUID
+            INNER JOIN wizasent.wiz_kb wk 
+                ON HEX(wt.KB_GUID) = UPPER(REPLACE(wk.KB_GUID, '-', ''))
+            WHERE wk.KB_NAME IS NOT NULL
+                AND d.DT_CREATED >= %s
+                AND d.DT_CREATED <= %s
+            GROUP BY wk.KB_NAME, wt.TAG_NAME
+            ORDER BY wk.KB_NAME, wt.TAG_NAME
+            """
+            
+            cursor.execute(sql, (start_date, end_date))
+            results = cursor.fetchall()
+            
+            return results, start_date, end_date
+            
+    except pymysql.Error as e:
+        raise Exception(f"数据库查询失败: {e}")
+    finally:
+        if 'connection' in locals() and connection:
+            connection.close()
+
+def format_markdown_v2_message(data, start_date, end_date):
+    """格式化企业微信markdown_v2消息 - 使用正确的表格格式"""
+    current_month = datetime.now().strftime('%Y年%m月')
+    total_docs = sum(row[2] for row in data) if data else 0
+    
+    message = f"# 📊 为知笔记文档统计报告 ({current_month})\n\n"
+    message += f"**统计周期**: {start_date.split(' ')[0]} 至 {end_date.split(' ')[0]}\n\n"
+    message += f"**文档总数**: {total_docs} 篇\n\n"
+    
+    if data:
+        # 按一级目录分组统计
+        category_stats = {}
+        for row in data:
+            kb_name, tag_name, count, min_date, max_date = row
+            if kb_name not in category_stats:
+                category_stats[kb_name] = {
+                    'total': 0,
+                    'sub_categories': []
+                }
+            category_stats[kb_name]['total'] += count
+            category_stats[kb_name]['sub_categories'].append({
+                'name': tag_name,
+                'count': count,
+                'min_date': min_date,
+                'max_date': max_date
+            })
+        
+        # 按文档数量排序
+        sorted_categories = sorted(category_stats.items(), key=lambda x: x[1]['total'], reverse=True)
+        
+        message += "## 📋 分类统计详情\n\n"
+        
+        # 使用正确的表格格式
+        message += "| 集团 | 部门 | 文档数量 | 最早创建 | 最晚创建 |\n"
+        message += "| :--- | :--- | :---: | :---: | :---: |\n"
+        
+        for kb_name, stats in sorted_categories:
+            # 先显示一级目录的汇总行
+            message += f"| **{kb_name}** | **汇总** | **{stats['total']}** | - | - |\n"
+            
+            # 显示该一级目录下的所有二级目录
+            for sub_cat in sorted(stats['sub_categories'], key=lambda x: x['count'], reverse=True):
+                min_date_str = sub_cat['min_date'].strftime('%m-%d') if sub_cat['min_date'] else '-'
+                max_date_str = sub_cat['max_date'].strftime('%m-%d') if sub_cat['max_date'] else '-'
+                message += f"| ↳ | {sub_cat['name']} | {sub_cat['count']} | {min_date_str} | {max_date_str} |\n"
+            
+            # 添加空行分隔不同的一级目录
+            message += "| | | | | |\n"
+        
+        # 添加汇总表格
+        message += "## 📈 数据汇总\n\n"
+        message += "| 集团 | 文档数量 | 占比 |\n"
+        message += "| :--- | :---: | :---: |\n"
+        
+        for kb_name, stats in sorted_categories:
+            percentage = f"{(stats['total']/total_docs*100):.1f}%" if total_docs > 0 else "0%"
+            message += f"| {kb_name} | {stats['total']} | {percentage} |\n"
+    
+    else:
+        message += "> 本月暂无文档数据\n"
+    
+    message += f"\n**统计时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
+    message += "### 统计范围🕐:\n"
+    message += "**未分类**📌标签下的笔记不在统计范围内!请将📓笔记移动到相关部门的标签下\n"
+    return message
+
+def format_simple_markdown_v2_message(data, start_date, end_date):
+    """简洁版markdown_v2消息"""
+    current_month = datetime.now().strftime('%Y年%m月')
+    total_docs = sum(row[2] for row in data) if data else 0
+    
+    message = f"# 📊 {current_month}文档统计\n\n"
+    message += f"**统计周期**: {start_date.split(' ')[0]} 至 {end_date.split(' ')[0]}\n"
+    message += f"**文档总数**: {total_docs}篇\n\n"
+    
+    if data:
+        # 按一级目录分组统计
+        category_totals = {}
+        for row in data:
+            kb_name, _, count, _, _ = row
+            category_totals[kb_name] = category_totals.get(kb_name, 0) + count
+        
+        message += "## 📋 分类统计\n\n"
+        
+        for kb_name, count in sorted(category_totals.items(), key=lambda x: x[1], reverse=True):
+            message += f"**{kb_name}**: {count}篇\n"
+        
+        # 简单表格
+        message += "\n## 📊 详细数据\n\n"
+        message += "| 集团 | 部门 | 数量 |\n"
+        message += "| :--- | :--- | :---: |\n"
+        
+        for row in data:
+            kb_name, tag_name, count, _, _ = row
+            message += f"| {kb_name} | {tag_name} | {count} |\n"
+    
+    message += f"\n⏰ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
+    
+    return message
+
+def main():
+    try:
+        print("开始获取文档统计数据...")
+        data, start_date, end_date = get_document_statistics()
+        
+        # 使用markdown_v2格式
+        markdown_content = format_markdown_v2_message(data, start_date, end_date)
+        print("消息内容生成成功!")
+        
+        # 打印消息内容(用于调试)
+        print("\n" + "="*50)
+        print(markdown_content)
+        print("="*50 + "\n")
+        
+        # 发送到企业微信机器人
+        success = send_wechat_robot_message(markdown_content, msg_type="markdown_v2")
+        if success:
+            print("消息发送成功!")
+        else:
+            print("消息发送失败,请检查webhook配置")
+            
+    except Exception as e:
+        error_message = f"❌ 统计任务失败\n\n错误信息: {str(e)}\n\n发生时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
+        print(f"执行失败: {e}")
+        # 发送错误通知
+        send_wechat_robot_message(error_message, msg_type="text")
+
+if __name__ == "__main__":
+    main()