mini_toolbox.ftp 源代码
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
""" 用于FTP相关操作 """
__all__ = ['FtpTools']
import os
from ftplib import FTP
from typing import Optional
from .logger import logger
from .path import mkdirs, gen_path, split_path
[文档]class FtpTools():
""" 用于FTP相关操作
支持文件/目录的上传和下载, 由于存在安全风险, 不支持删除, 仅支持重写
Example:
FTP download / upload 语法示例::
copy /haha/hehe . # hehe 拷贝为 ./hehe # 后为. 则后补/
copy /haha/hehe ./ # hehe 拷贝为 ./hehe
copy /haha/hehe ./haha # hehe 拷贝为 ./haha
copy /haha/hehe ./haha/ # hehe 拷贝为 ./haha/hehe
copy /haha/hehe/ ./ # hehe/* 拷贝为 ./*
copy /haha/hehe/ ./haha # hehe/* 拷贝为 ./haha/* # 前有/ 则后补/
copy /haha/hehe/ ./haha/ # hehe/* 拷贝为 ./haha/*
Args:
host (str): 服务器地址
port (int): 服务器端口
user (str): 用户名
passwd (str): 用户密码
level (int): 日志等级, ``0/1/2`` 分别表示 ``隐藏/少量/大量``, 默认为0
"""
def __init__(self, host: str, port: int = 0, user: str = '', passwd: str = '', level: int = 0):
self.ftp = FTP()
self.ftp.encoding = 'utf-8'
self.ftp.set_debuglevel(level)
self.ftp.connect(host, port, timeout=10) # 登陆超时时间10s
self.ftp.login(user, passwd)
self.ftp.getwelcome()
def _is_remote_file(self, path: str) -> Optional[bool]:
""" 判断ftp路径是否为文件, 返回 ``None/True/False`` """
pwd = self.ftp.pwd()
is_file = None # 默认路径不存在
td, tf = split_path(path)
try:
self.ftp.cwd(path) # 能直接进入则是目录
is_file = False
except:
try:
self.ftp.cwd(td) # 能进入上级目录, 且目录中存在, 则是文件
if tf in self.ftp.nlst():
is_file = True
except:
pass
self.ftp.cwd(pwd)
return is_file
def _upload_dirs(self, remote_dir: str, log: bool = True) -> None:
""" 仅内部调用, 处理目录上传, 入参为目录 """
if log:
logger.debug('目录上传中: {}'.format(remote_dir))
pwd = self.ftp.pwd()
for dir in remote_dir.split('/'):
if len(dir.strip('.')) == 0:
continue
if dir not in self.ftp.nlst():
self.ftp.mkd(dir)
self.ftp.cwd(dir)
self.ftp.cwd(pwd)
def _upload_file(self, src_path: str, dst_path: str, overwrite: bool = True) -> None:
""" 仅内部调用, 处理单个文件上传 """
if self._is_remote_file(dst_path) is None or overwrite:
try:
with open(src_path, "rb") as fp:
logger.debug('文件上传中: {}'.format(src_path))
self.ftp.storbinary("STOR {}".format(dst_path), fp)
except Exception as err:
logger.error('文件上传失败: {}'.format(src_path))
raise Exception(err)
else:
logger.debug('文件已存在, 跳过: {}'.format(src_path))
[文档] def upload(self, src_path: str, dst_path: str, overwrite: bool = True) -> None:
""" 从本地路径上传至服务器路径
Args:
src_path (str): 源路径
dst_path (str): 目标路径
overwrite (bool): 如果相对路径存在 ``同名同类型`` 文件, 则重写, 默认为True
"""
# 判断源路径是否存在
if not os.path.exists(src_path):
logger.error('源路径不存在: {}'.format(src_path))
return
# 首次遍历时, 使用src_file, 如果src_file为空, 则遍历src_dir中所有
src_dir, src_file = split_path(src_path)
# 如果src是目录, 则dst也是目录
dst_path = dst_path + '/' if not src_file else dst_path
# 根据dst_file判断是否需要改名
dst_dir, dst_file = split_path(dst_path)
# 切换路径
os.chdir(src_dir)
self._upload_dirs(dst_dir)
self.ftp.cwd(dst_dir)
logger.debug([src_dir, src_file, dst_dir, dst_file])
# 处理 src_file 和 dst_file 都存在的情况, 需要重命名
if src_file and dst_file:
if not os.path.isdir(src_file): # 是文件
logger.debug('文件重命名上传中: {} -> {}'.format(src_file, dst_file))
self._upload_file(src_file, dst_file, overwrite=overwrite)
return
else:
# 创建重命名的远程文件夹, 并切换相对路径
os.chdir(src_file)
logger.debug('目录重命名上传中: {} -> {}'.format(src_file, dst_file))
self._upload_dirs(dst_file, log=False)
self.ftp.cwd(dst_file)
src_file = '.'
# 遍历后上传
for item in gen_path(src_file):
if not os.path.isdir(item):
td, tp = split_path(item)
self._upload_dirs(td)
self._upload_file(item, item, overwrite=overwrite)
else:
self._upload_dirs(item)
[文档] def download(self, src_path: str, dst_path: str, overwrite: bool = True) -> None:
""" 从服务器路径下载至本地路径
Args:
src_path (str): 源路径
dst_path (str): 目标路径
overwrite (bool): 如果相对路径存在 ``同名同类型`` 文件, 则重写, 默认为True
"""
# 判断源路径是否存在
if self._is_remote_file(src_path) is None:
logger.error('源路径不存在: {}'.format(src_path))
return
# 首次遍历时, 使用src_file, 如果src_file为空, 则遍历src_dir中所有
src_dir, src_file = split_path(src_path)
# 如果src是目录, 则dst也是目录
dst_path = dst_path + '/' if not src_file else dst_path
# 根据dst_file判断是否需要改名
dst_dir, dst_file = split_path(dst_path)
# 创建路径
mkdirs(dst_dir, is_file=False)
# 切换路径
self.ftp.cwd(src_dir)
os.chdir(dst_dir)
# 下载文件
for item in self.ftp.nlst(src_file):
# 备份当前所在路径
src_pwd, dst_pwd = self.ftp.pwd(), os.getcwd()
# 正在处理的相对路径
std, stf = split_path(item)
dtd = dst_file if dst_file and std else std
dtf = dst_file if dst_file and not std else stf
# 正在处理的绝对路径
now_src = '/'.join((src_pwd, std, stf))
now_dst = '/'.join((dst_pwd, dtd, dtf))
# 如果item为多级路径, 则先新建目录dirname(item)
if std:
self.ftp.cwd(std)
mkdirs(dtd, is_file=False)
os.chdir(dtd)
# 处理item路径, 如果是文件则下载, 如果是目录, 则递归download()
if self._is_remote_file(stf):
# 路径不存在, 或者需要重写
if not os.path.exists(dtf) or overwrite:
try:
with open(dtf, "wb") as fp:
logger.debug('文件下载中: {}'.format(now_src))
self.ftp.retrbinary("RETR {}".format(stf), fp.write)
except Exception as err:
logger.error('文件下载失败: {}'.format(now_src))
raise Exception(err)
else:
logger.debug('文件已存在, 跳过: {}'.format(now_src))
else:
logger.debug('目录下载中: {}'.format(now_src))
mkdirs(dtf, is_file=False)
self.download(now_src, now_dst, overwrite=overwrite)
# 回到备份路径
self.ftp.cwd(src_pwd)
os.chdir(dst_pwd)
[文档] def check_file(self, path: str) -> Optional[bool]:
""" 判断ftp路径是否为文件, 返回 ``None/True/False``, None表示不存在 """
return self._is_remote_file(path)