百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

ubuntu每五分钟执行企业微信机器人推送

nanshan 2025-05-21 15:22 13 浏览 0 评论

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025

@author: 1
"""

import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql  # 用于连接MySQL数据库
from mysql.connector import Error

# 信息等级定义
INFO_LEVELS = {
    '紧急': {'retry_interval': 60, 'max_retries': 3},
    '重要': {'retry_interval': 60, 'max_retries': 3},
    '一般': {'retry_interval': 60, 'max_retries': 3}
}

def send_message(group, message, info_level):
    headers = {'Content-Type': 'application/json'}
    payload = {
        "msgtype": "text",
        "text": {
            "content": message
        }
    }
    try:
        print(13)
        print(group)
        response = requests.post(group['robot_webhook'], json=payload, headers=headers)
        print(14)
        if response.status_code == 200 and response.json().get('errcode') == 0:
            print(15)
            print(f"[{datetime.now()}] 成功发送到群 {group['group_name']}")
            return True
        else:
            print(16)
            print(f"[{datetime.now()}] 发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
            return False
    except Exception as e:
        print(17)
        print(f"[{datetime.now()}] 发送到群 {group['group_name']} 异常: {e}")
        return False

def retry_send(group, message, info_level, attempt=1):
    max_retries = INFO_LEVELS[info_level]['max_retries']
    retry_interval = INFO_LEVELS[info_level]['retry_interval']
    print(6)
    if attempt > max_retries:
        print(f"[{datetime.now()}] 最终发送失败到群 {group['group_name']},将在1小时后重试")
        time.sleep(3600)  # 等待1小时后重试
        attempt = 1
    print(7)
    success = send_message(group, message, info_level)
    print(8)
    if not success:
        time.sleep(retry_interval)
        retry_send(group, message, info_level, attempt + 1)

def send_to_superior_groups(group, message, info_level):
    current_group = group
    while current_group['parent_group_id'] is not None:
        superior = find_group_by_id(current_group['parent_group_id'])
        if superior:
            retry_send(superior, message, info_level)
            current_group = superior
        else:
            break

def find_group_by_id(group_id):
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "select id,group_name,level,robot_webhook,parent_group_id from wechat_groups where id = %s"
            cursor.execute(sql, (group_id,))
            results = cursor.fetchall()
            
            for row in results:
                target_group = row['id']  # 根据 upno 找到对应的群组
                if target_group == group_id:
                    return row
            return None
    except Exception as e:
        print(f"[{datetime.now()}] 遍历机器人时出错: {e}")
        return None
    finally:
        if connection:
            connection.close()
            
def find_group_by_name(group_name):
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "select id,group_name,level,robot_webhook,parent_group_id from wechat_groups where group_name = %s"
            cursor.execute(sql, (group_name,))
            results = cursor.fetchall()
            
            for row in results:
                target_group = row['group_name']  # 根据 upno 找到对应的群组
                if target_group == group_name:
                    return row
            return None
    except Exception as e:
        print(f"[{datetime.now()}] 遍历机器人时出错: {e}")
        return None
    finally:
        if connection:
            connection.close()

def schedule_message(group, message, info_level, send_time):
    print(9)
    delay = (send_time - datetime.now()).total_seconds()
    print(10)
    if delay > 0:
        print(11)
        scheduler = sched.scheduler(time.time, time.sleep)
        scheduler.enterabs(send_time.timestamp(), 1, retry_send, argument=(group, message, info_level))
        scheduler.run()
    else:
        print(12)
        retry_send(group, message, info_level)

def send_message_with_scheduling(group, message, info_level, send_time=None):
    if send_time:
        schedule_message(group, message, info_level, send_time)
    else:
        retry_send(group, message, info_level)
    if group['parent_group_id'] is not None:
        send_to_superior_groups(group, message, info_level)

def run_scheduler():
    while True:
        schedule.run_pending()
        time.sleep(1)

# 数据库连接配置
DB_CONFIG = {
    'host': '*********',
    'user': 'root',
    'password': '************',
    'database': '*************',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

def check_and_send_urgent_messages():
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "SELECT id, sqlcmd, result, createdate, role, details, levelname, upno FROM ai_dayairesult WHERE levelname = '紧急' AND isend = 0"
            cursor.execute(sql)
            results = cursor.fetchall()
            print(1)
            
            for row in results:
                message = f"紧急通知: {row['result']}"  # 只发送result字段内容
                target_group = find_group_by_name(row['role'])  # 根据role字段找到对应的群组
                print(2)
                if target_group:
                    print(3)
                    print(target_group)
                    print(31)
                    send_message_with_scheduling(target_group, message, "紧急")
                    print(4)
                    # 更新数据库状态为已发送
                    update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
                    cursor.execute(update_sql, (row['id'],))
                    print(5)
            # 提交事务
            connection.commit()
    except Exception as e:
        print(f"[{datetime.now()}] 检查并发送紧急消息时出错: {e}")
    finally:
        if connection:
            connection.close()

def schedule_next_check():
    # 设置每5分钟检查一次紧急消息
    schedule.every(5).minutes.do(check_and_send_urgent_messages)

# 示例用法
if __name__ == "__main__":
    # 启动调度器线程
    scheduler_thread = threading.Thread(target=run_scheduler)
    scheduler_thread.daemon = True
    scheduler_thread.start()
    
    # 设置定时检查紧急消息
    schedule_next_check()
    
    # 立即执行一次检查
    check_and_send_urgent_messages()
    
    # 保持主线程运行
    while True:
        schedule.run_pending()
        time.sleep(1)

相关推荐

Linux/Unix 系统中非常常用的命令

Linux/Unix系统中非常常用的命令,它们是进行文件操作、文本处理、权限管理等任务的基础。下面是对这些命令的简要说明:**文件操作类:*****`ls`(list):**列出目录内容,显...

教你如何在Linux中删除分区(CLI篇)

文接上篇,继续以Ubuntu系统为例。删除分区前,急得重要数据备份!备份!备份用命令操作分区,用的最多的莫过于fdisk了,几乎所有的Linux发行版都默认带有fdisk。首先要知道的是,你想删除的分...

敲完就让你提桶跑路的Linux命令(敲完就让你提桶跑路的linux命令是什么)

不谨慎可能就会让你提桶的Linux命令!!!删除文件rm-rf该命令是删除文件或文件夹等最快的方式之一。删除后的内容很难恢复,如果删除系统文件可能会导致系统崩坏。>rm-rf/#强制...

Log文件可以删除吗(taxukeylog文件可以删除吗)

Log文件(日志文件)是否可以删除取决于具体场景和文件类型。以下是详细分析和建议:一、哪些Log文件可以删除?非关键应用日志用户级应用日志:如浏览器缓存日志、游戏临时日志等,通常不影响系统运行,可定期...

Linux 删除空目录(linux直接删除目录)

rmdir命令用来删除空目录。当目录不再被使用时,或者磁盘空间已到达使用限定值,就需要删除失去使用价值的目录。利用rmdir命令可以从一个目录中删除一个或多个空的子目录。该命令从一个目录中删除一个或...

在 Windows 11 或 10 上删除、创建和格式化分区

在Windows11或10上删除、创建和格式化分区假设您的现有电脑使用的是传统硬盘,但现在您想再添加一个硬盘或SSD。当然,后者将用于启动操作系统,而前者将作为纯数据存储。在成功将操作系统...

如何使用 Apt Clean 命令清除 APT 缓存?

APT(AdvancedPackageTool)是Debian系Linux发行版的包管理工具,用于处理软件包的安装、升级和依赖管理。在使用apt命令(如aptinstall、apt...

Linux 磁盘空间不够用?5 招快速清理文件,释放 10GB 空间不是梦!

刚收到服务器警告:磁盘空间不足90%!装软件提示Nospaceleftondevice!连日志都写不进去,系统卡到崩溃?别慌!今天教你5个超实用的磁盘清理大招,从临时文件到无用软件一键搞定...

Linux清空日志方法(linux怎么清理日志)

方法1:使用>重定向>/path/to/logfile或(需要权限时):sudosh-c'>/var/log/logfile'方法2:使用trun...

如何在Eclipse中搭建Zabbix源码的调试和开发环境

Zabbix是一款非常优秀的企业级软件,被设计用于对数万台服务器、虚拟机和网络设备的数百万个监控项进行实时监控。Zabbix是开放源码和免费的,这就意味着当出现bug时,我们可以很方便地通过调试源码来...

Linux操作系统之常用命令(linux操作系统之常用命令有哪些)

Linux操作系统一、常用命令1.系统(1)系统信息arch显示机器的处理器架构uname-m显示机器的处理器架构uname-r显示正在使用的内核版本dmidecode-q显示硬件系...

理解linux内核的vmlinuz和initrd(linux内核原理及分析)

Originaladdress:http://www.chenjunlu.com/2010/11/understanding-of-vmlinuz-initrd-and-system-map/1....

Linux纯干货知识总结|面试专用(linux面试宝典)

学习Linux的重要性相信不用我多说大家也明白,以下是小编总结的常用Linux基础知识以及面试常问的Linux命令,希望能帮助大家更规范地理解和使用~绝对路径和相对路径绝对路径以正斜杠开始完整的文件的...

Linux基础知识之启动流程分析(简述linux启动流程)

Linux系统启动原理:1.poweron开机。2.开机自检:电脑开机后首先加载BIOS(BasicInput/OutputSystem基本输入输出系统)。BIOS程序首先检查计算机能否满足运...

Java程序员必备——Linux的面试常见问题及面试题!你知道多少?

一.常用命令1.编辑相关①.awkNF:字段总数NR:第几行数据FS:分隔字符②.sed-n-i直接修改4a:在第四行后添加4i:在第四行前插入1,5csting:用sting替换1到5行...

取消回复欢迎 发表评论: