百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

ubuntu每五分钟执行企业微信机器人推送

nanshan 2025-05-21 15:22 12 浏览 0 评论

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025

@author: 1
"""

import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql  # 用于连接MySQL数据库
from mysql.connector import Error

# 信息等级定义
INFO_LEVELS = {
    '紧急': {'retry_interval': 60, 'max_retries': 3},
    '重要': {'retry_interval': 60, 'max_retries': 3},
    '一般': {'retry_interval': 60, 'max_retries': 3}
}

def send_message(group, message, info_level):
    headers = {'Content-Type': 'application/json'}
    payload = {
        "msgtype": "text",
        "text": {
            "content": message
        }
    }
    try:
        print(13)
        print(group)
        response = requests.post(group['robot_webhook'], json=payload, headers=headers)
        print(14)
        if response.status_code == 200 and response.json().get('errcode') == 0:
            print(15)
            print(f"[{datetime.now()}] 成功发送到群 {group['group_name']}")
            return True
        else:
            print(16)
            print(f"[{datetime.now()}] 发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
            return False
    except Exception as e:
        print(17)
        print(f"[{datetime.now()}] 发送到群 {group['group_name']} 异常: {e}")
        return False

def retry_send(group, message, info_level, attempt=1):
    max_retries = INFO_LEVELS[info_level]['max_retries']
    retry_interval = INFO_LEVELS[info_level]['retry_interval']
    print(6)
    if attempt > max_retries:
        print(f"[{datetime.now()}] 最终发送失败到群 {group['group_name']},将在1小时后重试")
        time.sleep(3600)  # 等待1小时后重试
        attempt = 1
    print(7)
    success = send_message(group, message, info_level)
    print(8)
    if not success:
        time.sleep(retry_interval)
        retry_send(group, message, info_level, attempt + 1)

def send_to_superior_groups(group, message, info_level):
    current_group = group
    while current_group['parent_group_id'] is not None:
        superior = find_group_by_id(current_group['parent_group_id'])
        if superior:
            retry_send(superior, message, info_level)
            current_group = superior
        else:
            break

def find_group_by_id(group_id):
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "select id,group_name,level,robot_webhook,parent_group_id from wechat_groups where id = %s"
            cursor.execute(sql, (group_id,))
            results = cursor.fetchall()
            
            for row in results:
                target_group = row['id']  # 根据 upno 找到对应的群组
                if target_group == group_id:
                    return row
            return None
    except Exception as e:
        print(f"[{datetime.now()}] 遍历机器人时出错: {e}")
        return None
    finally:
        if connection:
            connection.close()
            
def find_group_by_name(group_name):
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "select id,group_name,level,robot_webhook,parent_group_id from wechat_groups where group_name = %s"
            cursor.execute(sql, (group_name,))
            results = cursor.fetchall()
            
            for row in results:
                target_group = row['group_name']  # 根据 upno 找到对应的群组
                if target_group == group_name:
                    return row
            return None
    except Exception as e:
        print(f"[{datetime.now()}] 遍历机器人时出错: {e}")
        return None
    finally:
        if connection:
            connection.close()

def schedule_message(group, message, info_level, send_time):
    print(9)
    delay = (send_time - datetime.now()).total_seconds()
    print(10)
    if delay > 0:
        print(11)
        scheduler = sched.scheduler(time.time, time.sleep)
        scheduler.enterabs(send_time.timestamp(), 1, retry_send, argument=(group, message, info_level))
        scheduler.run()
    else:
        print(12)
        retry_send(group, message, info_level)

def send_message_with_scheduling(group, message, info_level, send_time=None):
    if send_time:
        schedule_message(group, message, info_level, send_time)
    else:
        retry_send(group, message, info_level)
    if group['parent_group_id'] is not None:
        send_to_superior_groups(group, message, info_level)

def run_scheduler():
    while True:
        schedule.run_pending()
        time.sleep(1)

# 数据库连接配置
DB_CONFIG = {
    'host': '*********',
    'user': 'root',
    'password': '************',
    'database': '*************',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

def check_and_send_urgent_messages():
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "SELECT id, sqlcmd, result, createdate, role, details, levelname, upno FROM ai_dayairesult WHERE levelname = '紧急' AND isend = 0"
            cursor.execute(sql)
            results = cursor.fetchall()
            print(1)
            
            for row in results:
                message = f"紧急通知: {row['result']}"  # 只发送result字段内容
                target_group = find_group_by_name(row['role'])  # 根据role字段找到对应的群组
                print(2)
                if target_group:
                    print(3)
                    print(target_group)
                    print(31)
                    send_message_with_scheduling(target_group, message, "紧急")
                    print(4)
                    # 更新数据库状态为已发送
                    update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
                    cursor.execute(update_sql, (row['id'],))
                    print(5)
            # 提交事务
            connection.commit()
    except Exception as e:
        print(f"[{datetime.now()}] 检查并发送紧急消息时出错: {e}")
    finally:
        if connection:
            connection.close()

def schedule_next_check():
    # 设置每5分钟检查一次紧急消息
    schedule.every(5).minutes.do(check_and_send_urgent_messages)

# 示例用法
if __name__ == "__main__":
    # 启动调度器线程
    scheduler_thread = threading.Thread(target=run_scheduler)
    scheduler_thread.daemon = True
    scheduler_thread.start()
    
    # 设置定时检查紧急消息
    schedule_next_check()
    
    # 立即执行一次检查
    check_and_send_urgent_messages()
    
    # 保持主线程运行
    while True:
        schedule.run_pending()
        time.sleep(1)

相关推荐

如何为MySQL服务器和客户机启用SSL?

用户想要与MySQL服务器建立一条安全连接时,常常依赖VPN隧道或SSH隧道。不过,获得MySQL连接的另一个办法是,启用MySQL服务器上的SSL封装器(SSLwrapper)。这每一种方法各有其...

Mysql5.7 出现大量 unauthenticated user

线上环境mysql5.7突然出现大量unauthenticateduser,进mysql,showprocesslist;解决办法有:在/etc/hosts中添加客户端ip,如192.16...

MySQL 在 Windows 系统下的安装(mysql安装教程windows)

更多技术文章MySQL在Windows系统下的安装1.下载mysql和Framework链接链接:百度网盘请输入提取码提取码:6w3p双击mysql-installer-communit...

MySql5.7.21.zip绿色版安装(mysql数据库绿色版安装)

1、去网上下载满足系统要求的版本(mysql-5.7.21-winx64.zip)2、直接解压3、mysql的初始化(1)以管理员身份运行cmd,在mysql中的bin目录下shift+右键-在...

MySQL(8.0)中文全文检索 (亲测有效)

在一堆文字中找到含有关键字的应用。当然也可以用以下语句实现:SELECT*FROM<表名>WHERE<字段名>like‘%ABC%’但是它的效率太低,是全盘扫描。...

新手教程,Linux系统下MySQL的安装

看了两三个教程。终于在哔哩哔哩找到一个简单高效的教程,成功安装,up主名叫bili逍遥bili,感兴趣可以去看看。下面这个是我总结的安装方法环境:CentOS764位1.下载安装包,个人觉得在...

麒麟服务器操作系统安装 MySQL 8 实战指南

原文连接:「链接」Hello,大家好啊,今天给大家带来一篇麒麟服务器操作系统上安装MySQL8的文章,欢迎大家分享点赞,点个在看和关注吧!MySQL作为主流开源数据库之一,被广泛应用于各种业务...

用Python玩转MySQL的全攻略,从环境搭建到项目实战全解析

这是一篇关于“MySQL数据库入门实战-Python版”的教程,结合了案例实战分析,帮助初学者快速掌握如何使用Python操作MySQL数据库。一、环境准备1.安装Python访问Pytho...

安装MySQL(中标麒麟 安装mysql)

安装MySQL注意:一定要用root用户操作如下步骤;先卸载MySQL再安装1.安装包准备(1)查看MySQL是否安装rpm-qa|grepmysql(2)如果安装了MySQL,就先卸载rpm-...

Mysql最全笔记,快速入门,干货满满,爆肝

目录一、MySQL的重要性二、MySQL介绍三、软件的服务架构四、MySQL的安装五、SQL语句六、数据库相关(DDL)七、表相关八、DML相关(表中数据)九、DQL(重点)十、数据完...

MAC电脑安装MySQL操作步骤(mac安装mysqldb)

1、在官网下载MySQL:https://dev.mysql.com/downloads/mysql/根据自己的macOS版本,选择适配的MySQL版本根据自己需求选择相应的安装包,我这里选择macO...

mysql主从(mysql主从切换)

1、本章面试题什么是mysql主从,主从有什么好处什么是读写分离,有什么好处,使用mycat如何实现2、知识点2.1、课程回顾dubboORM->MVC->RPC->SOApro...

【linux学习】以MySQL为例,带你了解数据库

做运维的小伙伴在日常工作中难免需要接触到数据库,不管是MySQL,mariadb,达梦还是瀚高等其实命令都差不多,下面我就以MySQL为例带大家一起来了解下数据库。有兴趣的小伙伴不妨评论区一起交流下...

玩玩WordPress - 环境简介(0)(玩玩网络科技有限公司)

简介提到开源博客系统,一般都会直接想到WordPress!WordPress是使用PHP开发的,数据库使用的是MySQL,一般会在Linux上运行,Nginx作为前端。这时候就需要有一套LNMP(Li...

服务器常用端口都有哪些?(服务器端使用的端口号范围)

下面为大家介绍一下,服务器常用的一些默认端口,以及他们的作用:  21:FTP服务所开放的端口,用于上传、下载文件。  22:SSH端口,用于通过命令行模式远程连接Linux服务器或vps。  23:...

取消回复欢迎 发表评论: