配置ftp上传路径

image

读取的model FtpProperties

package com.lyc.ftp.utils;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * 功能:
 *
 * @author chenQi
 * @version 1.0
 * @date 2020/11/16 15:24
 */
@Data
@ConfigurationProperties(prefix = "ftp")
@Component
public class FtpProperties {

    private String url;

    private Integer port;

    private String username;

    private String password;

    private String path;
}

上传状态enums UploadStatus

package com.lyc.ftp.utils;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author chenqi
 * @DESCRIPTION:
 * @params:
*/
@Getter
@AllArgsConstructor
public enum UploadStatus {
    /**
     * 
     */
    CREATE_DIRECTORY_FAIL("远程服务器相应目录创建失败"),
    CREATE_DIRECTORY_SUCCESS("远程服务器闯将目录成功"),
    UPLOAD_NEW_FILE_SUCCESS("上传新文件成功"),
    UPLOAD_NEW_FILE_FAILED("上传新文件失败"),
    FILE_EXITS("文件已经存在"),
    REMOTE_BIGGER_LOCAL("远程文件大于本地文件"),
    UPLOAD_FROM_BREAK_SUCCESS("断点续传成功"),
    UPLOAD_FROM_BREAK_FAILED("断点续传失败"),
    DELETE_REMOTE_FAILD("删除远程文件失败");

    private final String desc;
}

创建线程池

package com.lyc.ftp.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 类 TaskScheduler
 * </p>
 *
 * @author ChenQi
 * @since 2020/11/14 15:33
 */
@Configuration
@EnableAsync
public class ExecutorConfig {

    private static int CORE_POOL_SIZE = 3;
    private static int MAX_POOL_SIZE = 3000;

    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
        //线程池维护线程的最少数量
        poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        //线程池维护线程的最大数量
        poolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        //线程池所使用的缓冲队列
        poolTaskExecutor.setQueueCapacity(200);
        //线程池维护线程所允许的空闲时间
        poolTaskExecutor.setKeepAliveSeconds(30000);
        // 线程默认名称
        poolTaskExecutor.setThreadNamePrefix("jd-ftp-");
        // 拒绝策略
        poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        poolTaskExecutor.initialize();
        return poolTaskExecutor;
    }
}

定时执行器,每个小时执行一次,文件每隔分钟上传一次ReadFileHandle

package com.lyc.ftp.handle;

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.NamedThreadFactory;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.lyc.ftp.utils.ContinueFtp;
import com.lyc.ftp.utils.FtpProperties;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;

/**
 * 功能: 定时传送站点xml文件至ftp服务器
 *
 * @author chenQi
 * @version 1.0
 * @date 2020/11/16 15:47
 */
@Component
@AllArgsConstructor
@Slf4j
public class ReadFileHandle {

    private FtpProperties ftpProperties;

    private final ThreadPoolTaskExecutor taskExecutor;

    @Scheduled(cron = "0 0 * * * ?")
    private void readFileAndSend() {
        ContinueFtp myFtp = new ContinueFtp();
        log.info("开始执行" + DateUtil.now());
        try {
            // 查询本地文件夹中所有文件
            File file = new File(ftpProperties.getPath());
            File[] fileList = file.listFiles();
            if (fileList != null && fileList.length > 0) {

                for (File f : fileList) {
                    // 判断文件创建时间,如果文件时间大于当前时间跳过
                    String[] fileName = f.getName().split("_");
                    String time = fileName[2];
                    // 时间比较
                    if (LocalDateTime.now().isBefore(LocalDateTime.parse(time, DateTimeFormatter.ofPattern(DatePattern.PURE_DATETIME_PATTERN)))) {
                        continue;
                    }
                    taskExecutor.execute(() -> {
                        try {
                            myFtp.connect(ftpProperties.getUrl(), ftpProperties.getPort(), ftpProperties.getUsername(), ftpProperties.getPassword());
                            myFtp.upload(f, f.getName());
                            myFtp.disconnect();
                            Thread.sleep(120000L);
                        } catch (IOException e) {
                            System.out.println("ftp连接出错" + e.getLocalizedMessage());
                        } catch (InterruptedException e) {
                            System.out.println("线程出错" + e.getLocalizedMessage());
                        }
                    });
                }
            }
        } catch (Exception e) {
            System.out.println("系统错误:" + e.getMessage());
        }
    }
}

ftp上传逻辑 ContinueFtp 计算上传百分比存在问题,有时间修改

package com.lyc.ftp.utils;

import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;

import java.io.*;

/**
 * 支持断点续传的FTP实用类
 *
 * @author chenqi
 * @version 0.3 实现中文目录创建及中文文件创建,添加对于中文的支持
 */
public class ContinueFtp {
    public FTPClient ftpClient = new FTPClient();

    public ContinueFtp() {
        //设置将过程中使用到的命令输出到控制台
        this.ftpClient.addProtocolCommandListener(new PrintCommandListener(new PrintWriter(System.out)));
    }

    /**
     * 连接到FTP服务器
     *
     * @param hostname 主机名
     * @param port     端口
     * @param username 用户名
     * @param password 密码
     * @return 是否连接成功
     * @throws IOException
     */
    public boolean connect(String hostname, int port, String username, String password) throws IOException {
        ftpClient.connect(hostname, port);
        ftpClient.setControlEncoding("GBK");
        if (FTPReply.isPositiveCompletion(ftpClient.getReplyCode())) {
            if (ftpClient.login(username, password)) {
                return true;
            }
        }
        disconnect();
        return false;
    }

    /**
     * 上传文件到FTP服务器,支持断点续传
     *
     * @param remote 远程文件路径,使用/home/directory1/subdirectory/file.ext 按照Linux上的路径指定方式,支持多级目录嵌套,支持递归创建不存在的目录结构
     * @return 上传结果
     * @throws IOException
     */
    public UploadStatus upload(File f, String remote) throws IOException {
        //设置PassiveMode传输
        ftpClient.enterLocalPassiveMode();
        //设置以二进制流的方式传输
        ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
        ftpClient.setControlEncoding("GBK");
        UploadStatus result;
        //对远程目录的处理
        String remoteFileName = remote;
        if (remote.contains("/")) {
            remoteFileName = remote.substring(remote.lastIndexOf("/") + 1);
            //创建服务器远程目录结构,创建失败直接返回
            if (CreateDirecroty(remote, ftpClient) == UploadStatus.CREATE_DIRECTORY_FAIL) {
                return UploadStatus.CREATE_DIRECTORY_FAIL;
            }
        }

        //检查远程是否存在文件
        FTPFile[] files = ftpClient.listFiles(new String(remoteFileName.getBytes("GBK"), "iso-8859-1"));
        if (files.length == 1) {
            long remoteSize = files[0].getSize();
            long localSize = f.length();
            if (remoteSize == localSize) {
                // 删除本地文件
                f.delete();
                return UploadStatus.FILE_EXITS;
            } else if (remoteSize > localSize) {
                return UploadStatus.REMOTE_BIGGER_LOCAL;
            }

            //尝试移动文件内读取指针,实现断点续传
            result = uploadFile(remoteFileName, f, ftpClient, remoteSize);

            //如果断点续传没有成功,则删除服务器上文件,重新上传
            if (result == UploadStatus.UPLOAD_FROM_BREAK_FAILED) {
                if (!ftpClient.deleteFile(remoteFileName)) {
                    return UploadStatus.DELETE_REMOTE_FAILD;
                }
                result = uploadFile(remoteFileName, f, ftpClient, 0);
            }
        } else {
            result = uploadFile(remoteFileName, f, ftpClient, 0);
            // 删除本地文件
            f.delete();
        }
        return result;
    }

    /**
     * 断开与远程服务器的连接
     *
     * @throws IOException
     */
    public void disconnect() throws IOException {
        if (ftpClient.isConnected()) {
            ftpClient.disconnect();
        }
    }

    /**
     * 递归创建远程服务器目录
     *
     * @param remote    远程服务器文件绝对路径
     * @param ftpClient FTPClient对象
     * @return 目录创建是否成功
     * @throws IOException
     */
    public UploadStatus CreateDirecroty(String remote, FTPClient ftpClient) throws IOException {
        UploadStatus status = UploadStatus.CREATE_DIRECTORY_SUCCESS;
        String directory = remote.substring(0, remote.lastIndexOf("/") + 1);
        if (!directory.equalsIgnoreCase("/") && !ftpClient.changeWorkingDirectory(new String(directory.getBytes("GBK"), "iso-8859-1"))) {
            //如果远程目录不存在,则递归创建远程服务器目录
            int start = 0;
            int end = 0;
            if (directory.startsWith("/")) {
                start = 1;
            } else {
                start = 0;
            }
            end = directory.indexOf("/", start);
            while (true) {
                String subDirectory = new String(remote.substring(start, end).getBytes("GBK"), "iso-8859-1");
                if (!ftpClient.changeWorkingDirectory(subDirectory)) {
                    if (ftpClient.makeDirectory(subDirectory)) {
                        ftpClient.changeWorkingDirectory(subDirectory);
                    } else {
                        System.out.println("创建目录失败");
                        return UploadStatus.CREATE_DIRECTORY_FAIL;
                    }
                }

                start = end + 1;
                end = directory.indexOf("/", start);

                //检查所有目录是否创建完毕
                if (end <= start) {
                    break;
                }
            }
        }
        return status;
    }

    /**
     * 上传文件到服务器,新上传和断点续传
     *
     * @param remoteFile 远程文件名,在上传之前已经将服务器工作目录做了改变
     * @param localFile  本地文件File句柄,绝对路径
     * @param ftpClient  FTPClient引用
     * @throws IOException
     */
    public UploadStatus uploadFile(String remoteFile, File localFile, FTPClient ftpClient, long remoteSize) throws IOException {
        UploadStatus status;
        //显示进度的上传
        long step = localFile.length();
        long process = 0;
        long localreadbytes = 0L;
        RandomAccessFile raf = new RandomAccessFile(localFile, "r");
        OutputStream out = ftpClient.appendFileStream(new String(remoteFile.getBytes("GBK"), "iso-8859-1"));
        //断点续传
        if (remoteSize > 0) {
            ftpClient.setRestartOffset(remoteSize);
            process = remoteSize / step;
            raf.seek(remoteSize);
            localreadbytes = remoteSize;
        }
        byte[] bytes = new byte[1024];
        int c;
        while ((c = raf.read(bytes)) != -1) {
            out.write(bytes, 0, c);
            localreadbytes += c;
            if (localreadbytes / step != process) {
                process = localreadbytes / step;
                System.out.println("上传进度:" + process);
            }
        }
        out.flush();
        raf.close();
        out.close();
        boolean result = ftpClient.completePendingCommand();
        if (remoteSize > 0) {
            status = result ? UploadStatus.UPLOAD_FROM_BREAK_SUCCESS : UploadStatus.UPLOAD_FROM_BREAK_FAILED;
        } else {
            status = result ? UploadStatus.UPLOAD_NEW_FILE_SUCCESS : UploadStatus.UPLOAD_NEW_FILE_FAILED;
        }
        return status;
    }
}

🧑 该代码开发用于传输江都水站检测数据传输,省平台要求将每日数据通过xml文件的形式上传至指定服务器
‍💻 代码地址 github