Skip to content
知识星球

远程数据库

连接远程数据库进行操作,由于需要将数据库开放至公网,请做好安全策略!如限制账号访问权限、使用高位端口等.

使用场景:实时获取公司内部数据

支持数据库:Mysql、Postgre

由于未知原因,本插件无法上传至扣子商店,所以只能公开代码,请自己创建插件使用.

输入/输出参数

输入参数:

  • host --- 数据库地址
  • port --- 数据库端口
  • user --- 用户名
  • pwd --- 密码
  • database --- 数据库
  • sql --- 原生SQL语句

输出参数

  • data --- 执行结果
  • msg --- 消息

进行测试

查询数据

更新数据

代码

Mysql

from runtime import Args
from typings.postgres.postgres import Input, Output
import pymysql  # 替换psycopg2为pymysql
from typing import List, Dict, Any

def handler(args: Args[Input])->Output:
    # 获取输入参数
    host = args.input.host
    port = int(args.input.port)  # 确保port是整数类型
    user = args.input.user
    pwd = args.input.pwd
    database = args.input.database if hasattr(args.input, 'database') else "mysql"  # 默认数据库改为mysql
    sql = args.input.sql  # 添加缺失的sql参数获取
    
    try:
        # 连接MySQL数据库
        conn = pymysql.connect(
            host=host,
            port=port,  # 现在port是整数类型
            user=user,
            password=pwd,
            database=database,
            charset='utf8mb4',  # MySQL需要指定字符集
            cursorclass=pymysql.cursors.DictCursor  # 直接返回字典格式结果
        )
        cursor = conn.cursor()
        
        
        # 执行SQL查询
        cursor.execute(sql)
        
        # 获取查询结果
        if sql.strip().lower().startswith('select'):
            columns = [desc[0] for desc in cursor.description]
            rows = cursor.fetchall()
            data = [dict(zip(columns, row)) for row in rows]
        else:
            data = []
            conn.commit()
        
        # 关闭连接
        cursor.close()
        conn.close()
        
        return {
            "msg": "执行成功",
            "data": data
        }
        
    except Exception as e:
        return {
            "msg": f"执行失败: {str(e)}",
            "data": []
        }

Postgre

from runtime import Args
from typings.postgres.postgres import Input, Output
import psycopg2
from typing import List, Dict, Any

def handler(args: Args[Input])->Output:
    # 获取输入参数
    host = args.input.host
    port = args.input.port
    user = args.input.user
    pwd = args.input.pwd
    database = args.input.database if hasattr(args.input, 'database') else "postgres"
    sql = args.input.sql
    
    try:
        # 连接PostgreSQL数据库
        conn = psycopg2.connect(
            host=host,
            port=port,
            user=user,
            password=pwd,
            database=database  # 使用传入的数据库名
        )
        cursor = conn.cursor()
        
        
        # 执行SQL查询
        cursor.execute(sql)
        
        # 获取查询结果
        if sql.strip().lower().startswith('select'):
            columns = [desc[0] for desc in cursor.description]
            rows = cursor.fetchall()
            data = [dict(zip(columns, row)) for row in rows]
        else:
            data = []
            conn.commit()
        
        # 关闭连接
        cursor.close()
        conn.close()
        
        return {
            "msg": "执行成功",
            "data": data
        }
        
    except Exception as e:
        return {
            "msg": f"执行失败: {str(e)}",
            "data": []
        }
知识星球