from minio import Minio from minio.error import S3Error import os # MinIO 配置 MINIO_SERVER = "192.168.0.25:9000" # 服务器地址(无http前缀) ACCESS_KEY = "minioadmin" # 默认Access Key SECRET_KEY = "minioadmin" # 默认Secret Key BUCKET_NAME = "szxc" # 你的Bucket名称 def upload_to_minio(local_file_path, object_name=None): """ 上传文件到MinIO :param local_file_path: 本地文件路径(如 "C:\\data\\test.txt") :param object_name: 存储在MinIO中的名称(可选,默认使用本地文件名) """ # 初始化客户端 client = Minio( MINIO_SERVER, access_key=ACCESS_KEY, secret_key=SECRET_KEY, secure=False # HTTP模式(如果是HTTPS则设为True) ) try: # 自动创建Bucket(如果不存在) if not client.bucket_exists(BUCKET_NAME): client.make_bucket(BUCKET_NAME) print(f"[INFO] Bucket '{BUCKET_NAME}' 创建成功") # 如果未指定object_name,则使用本地文件名 if object_name is None: object_name = os.path.basename(local_file_path) # 上传文件 client.fput_object( BUCKET_NAME, object_name, local_file_path ) print(f"[SUCCESS] 文件 '{local_file_path}' 已上传到 '{BUCKET_NAME}/{object_name}'") return True except S3Error as e: print(f"[ERROR] MinIO操作失败: {e}") except Exception as e: print(f"[ERROR] 其他错误: {e}") return False def download_from_minio(object_name, local_file_path=None): """ 从MinIO下载文件 :param object_name: MinIO中的文件名称 :param local_file_path: 本地保存路径(可选,默认使用当前目录下的同名文件) :return: 下载成功返回True,失败返回False """ # 初始化客户端 client = Minio( MINIO_SERVER, access_key=ACCESS_KEY, secret_key=SECRET_KEY, secure=False ) try: # 检查Bucket是否存在 if not client.bucket_exists(BUCKET_NAME): print(f"[ERROR] Bucket '{BUCKET_NAME}' 不存在") return False # 如果未指定本地保存路径,则使用当前目录下的同名文件 if local_file_path is None: local_file_path = os.path.join(os.getcwd(), object_name) # 确保目标目录存在 os.makedirs(os.path.dirname(local_file_path), exist_ok=True) # 下载文件 client.fget_object( BUCKET_NAME, object_name, local_file_path ) print(f"[SUCCESS] 文件 '{BUCKET_NAME}/{object_name}' 已下载到 '{local_file_path}'") return True except S3Error as e: print(f"[ERROR] MinIO操作失败: {e}") except Exception as e: print(f"[ERROR] 其他错误: {e}") return False # 示例用法 if __name__ == "__main__": # # 上传文件示例 # local_file = r"D:\work\codemyself\dify\output.pdf" # 使用原始字符串避免转义问题 # upload_to_minio(local_file) # 使用默认object_name # 下载文件示例 object_name = "output.pdf" # MinIO中的文件名 download_from_minio(object_name) # 下载到当前目录 # 或指定保存路径:download_from_minio(object_name, "path/to/save/file.pdf")