123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- from minio import Minio
- from minio.error import S3Error
- import os
- # MinIO 配置
- MINIO_SERVER = "192.168.0.25:9000" # 服务器地址(无http前缀)
- ACCESS_KEY = "minioadmin" # 默认Access Key
- SECRET_KEY = "minioadmin" # 默认Secret Key
- BUCKET_NAME = "szxc" # 你的Bucket名称
- def upload_to_minio(local_file_path, object_name=None):
- """
- 上传文件到MinIO
- :param local_file_path: 本地文件路径(如 "C:\\data\\test.txt")
- :param object_name: 存储在MinIO中的名称(可选,默认使用本地文件名)
- """
- # 初始化客户端
- client = Minio(
- MINIO_SERVER,
- access_key=ACCESS_KEY,
- secret_key=SECRET_KEY,
- secure=False # HTTP模式(如果是HTTPS则设为True)
- )
- try:
- # 自动创建Bucket(如果不存在)
- if not client.bucket_exists(BUCKET_NAME):
- client.make_bucket(BUCKET_NAME)
- print(f"[INFO] Bucket '{BUCKET_NAME}' 创建成功")
- # 如果未指定object_name,则使用本地文件名
- if object_name is None:
- object_name = os.path.basename(local_file_path)
- # 上传文件
- client.fput_object(
- BUCKET_NAME,
- object_name,
- local_file_path
- )
- print(f"[SUCCESS] 文件 '{local_file_path}' 已上传到 '{BUCKET_NAME}/{object_name}'")
- return True
- except S3Error as e:
- print(f"[ERROR] MinIO操作失败: {e}")
- except Exception as e:
- print(f"[ERROR] 其他错误: {e}")
- return False
- def download_from_minio(object_name, local_file_path=None):
- """
- 从MinIO下载文件
- :param object_name: MinIO中的文件名称
- :param local_file_path: 本地保存路径(可选,默认使用当前目录下的同名文件)
- :return: 下载成功返回True,失败返回False
- """
- # 初始化客户端
- client = Minio(
- MINIO_SERVER,
- access_key=ACCESS_KEY,
- secret_key=SECRET_KEY,
- secure=False
- )
- try:
- # 检查Bucket是否存在
- if not client.bucket_exists(BUCKET_NAME):
- print(f"[ERROR] Bucket '{BUCKET_NAME}' 不存在")
- return False
- # 如果未指定本地保存路径,则使用当前目录下的同名文件
- if local_file_path is None:
- local_file_path = os.path.join(os.getcwd(), object_name)
- # 确保目标目录存在
- os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
- # 下载文件
- client.fget_object(
- BUCKET_NAME,
- object_name,
- local_file_path
- )
- print(f"[SUCCESS] 文件 '{BUCKET_NAME}/{object_name}' 已下载到 '{local_file_path}'")
- return True
- except S3Error as e:
- print(f"[ERROR] MinIO操作失败: {e}")
- except Exception as e:
- print(f"[ERROR] 其他错误: {e}")
- return False
- # 示例用法
- if __name__ == "__main__":
- # # 上传文件示例
- # local_file = r"D:\work\codemyself\dify\output.pdf" # 使用原始字符串避免转义问题
- # upload_to_minio(local_file) # 使用默认object_name
- # 下载文件示例
- object_name = "output.pdf" # MinIO中的文件名
- download_from_minio(object_name) # 下载到当前目录
- # 或指定保存路径:download_from_minio(object_name, "path/to/save/file.pdf")
|