java ftp文件上传实例
配置ftp上传路径
读取的model FtpProperties
package com.lyc.ftp.utils;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 功能:
*
* @author chenQi
* @version 1.0
* @date 2020/11/16 15:24
*/
@Data
@ConfigurationProperties(prefix = "ftp")
@Component
public class FtpProperties {
private String url;
private Integer port;
private String username;
private String password;
private String path;
}
上传状态enums UploadStatus
package com.lyc.ftp.utils;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author chenqi
* @DESCRIPTION:
* @params:
*/
@Getter
@AllArgsConstructor
public enum UploadStatus {
/**
*
*/
CREATE_DIRECTORY_FAIL("远程服务器相应目录创建失败"),
CREATE_DIRECTORY_SUCCESS("远程服务器闯将目录成功"),
UPLOAD_NEW_FILE_SUCCESS("上传新文件成功"),
UPLOAD_NEW_FILE_FAILED("上传新文件失败"),
FILE_EXITS("文件已经存在"),
REMOTE_BIGGER_LOCAL("远程文件大于本地文件"),
UPLOAD_FROM_BREAK_SUCCESS("断点续传成功"),
UPLOAD_FROM_BREAK_FAILED("断点续传失败"),
DELETE_REMOTE_FAILD("删除远程文件失败");
private final String desc;
}
创建线程池
package com.lyc.ftp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 类 TaskScheduler
* </p>
*
* @author ChenQi
* @since 2020/11/14 15:33
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
private static int CORE_POOL_SIZE = 3;
private static int MAX_POOL_SIZE = 3000;
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
//线程池维护线程的最少数量
poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
//线程池维护线程的最大数量
poolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
//线程池所使用的缓冲队列
poolTaskExecutor.setQueueCapacity(200);
//线程池维护线程所允许的空闲时间
poolTaskExecutor.setKeepAliveSeconds(30000);
// 线程默认名称
poolTaskExecutor.setThreadNamePrefix("jd-ftp-");
// 拒绝策略
poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
poolTaskExecutor.initialize();
return poolTaskExecutor;
}
}
定时执行器,每个小时执行一次,文件每隔分钟上传一次ReadFileHandle
package com.lyc.ftp.handle;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.NamedThreadFactory;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.lyc.ftp.utils.ContinueFtp;
import com.lyc.ftp.utils.FtpProperties;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;
/**
* 功能: 定时传送站点xml文件至ftp服务器
*
* @author chenQi
* @version 1.0
* @date 2020/11/16 15:47
*/
@Component
@AllArgsConstructor
@Slf4j
public class ReadFileHandle {
private FtpProperties ftpProperties;
private final ThreadPoolTaskExecutor taskExecutor;
@Scheduled(cron = "0 0 * * * ?")
private void readFileAndSend() {
ContinueFtp myFtp = new ContinueFtp();
log.info("开始执行" + DateUtil.now());
try {
// 查询本地文件夹中所有文件
File file = new File(ftpProperties.getPath());
File[] fileList = file.listFiles();
if (fileList != null && fileList.length > 0) {
for (File f : fileList) {
// 判断文件创建时间,如果文件时间大于当前时间跳过
String[] fileName = f.getName().split("_");
String time = fileName[2];
// 时间比较
if (LocalDateTime.now().isBefore(LocalDateTime.parse(time, DateTimeFormatter.ofPattern(DatePattern.PURE_DATETIME_PATTERN)))) {
continue;
}
taskExecutor.execute(() -> {
try {
myFtp.connect(ftpProperties.getUrl(), ftpProperties.getPort(), ftpProperties.getUsername(), ftpProperties.getPassword());
myFtp.upload(f, f.getName());
myFtp.disconnect();
Thread.sleep(120000L);
} catch (IOException e) {
System.out.println("ftp连接出错" + e.getLocalizedMessage());
} catch (InterruptedException e) {
System.out.println("线程出错" + e.getLocalizedMessage());
}
});
}
}
} catch (Exception e) {
System.out.println("系统错误:" + e.getMessage());
}
}
}
ftp上传逻辑 ContinueFtp 计算上传百分比存在问题,有时间修改
package com.lyc.ftp.utils;
import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import java.io.*;
/**
* 支持断点续传的FTP实用类
*
* @author chenqi
* @version 0.3 实现中文目录创建及中文文件创建,添加对于中文的支持
*/
public class ContinueFtp {
public FTPClient ftpClient = new FTPClient();
public ContinueFtp() {
//设置将过程中使用到的命令输出到控制台
this.ftpClient.addProtocolCommandListener(new PrintCommandListener(new PrintWriter(System.out)));
}
/**
* 连接到FTP服务器
*
* @param hostname 主机名
* @param port 端口
* @param username 用户名
* @param password 密码
* @return 是否连接成功
* @throws IOException
*/
public boolean connect(String hostname, int port, String username, String password) throws IOException {
ftpClient.connect(hostname, port);
ftpClient.setControlEncoding("GBK");
if (FTPReply.isPositiveCompletion(ftpClient.getReplyCode())) {
if (ftpClient.login(username, password)) {
return true;
}
}
disconnect();
return false;
}
/**
* 上传文件到FTP服务器,支持断点续传
*
* @param remote 远程文件路径,使用/home/directory1/subdirectory/file.ext 按照Linux上的路径指定方式,支持多级目录嵌套,支持递归创建不存在的目录结构
* @return 上传结果
* @throws IOException
*/
public UploadStatus upload(File f, String remote) throws IOException {
//设置PassiveMode传输
ftpClient.enterLocalPassiveMode();
//设置以二进制流的方式传输
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
ftpClient.setControlEncoding("GBK");
UploadStatus result;
//对远程目录的处理
String remoteFileName = remote;
if (remote.contains("/")) {
remoteFileName = remote.substring(remote.lastIndexOf("/") + 1);
//创建服务器远程目录结构,创建失败直接返回
if (CreateDirecroty(remote, ftpClient) == UploadStatus.CREATE_DIRECTORY_FAIL) {
return UploadStatus.CREATE_DIRECTORY_FAIL;
}
}
//检查远程是否存在文件
FTPFile[] files = ftpClient.listFiles(new String(remoteFileName.getBytes("GBK"), "iso-8859-1"));
if (files.length == 1) {
long remoteSize = files[0].getSize();
long localSize = f.length();
if (remoteSize == localSize) {
// 删除本地文件
f.delete();
return UploadStatus.FILE_EXITS;
} else if (remoteSize > localSize) {
return UploadStatus.REMOTE_BIGGER_LOCAL;
}
//尝试移动文件内读取指针,实现断点续传
result = uploadFile(remoteFileName, f, ftpClient, remoteSize);
//如果断点续传没有成功,则删除服务器上文件,重新上传
if (result == UploadStatus.UPLOAD_FROM_BREAK_FAILED) {
if (!ftpClient.deleteFile(remoteFileName)) {
return UploadStatus.DELETE_REMOTE_FAILD;
}
result = uploadFile(remoteFileName, f, ftpClient, 0);
}
} else {
result = uploadFile(remoteFileName, f, ftpClient, 0);
// 删除本地文件
f.delete();
}
return result;
}
/**
* 断开与远程服务器的连接
*
* @throws IOException
*/
public void disconnect() throws IOException {
if (ftpClient.isConnected()) {
ftpClient.disconnect();
}
}
/**
* 递归创建远程服务器目录
*
* @param remote 远程服务器文件绝对路径
* @param ftpClient FTPClient对象
* @return 目录创建是否成功
* @throws IOException
*/
public UploadStatus CreateDirecroty(String remote, FTPClient ftpClient) throws IOException {
UploadStatus status = UploadStatus.CREATE_DIRECTORY_SUCCESS;
String directory = remote.substring(0, remote.lastIndexOf("/") + 1);
if (!directory.equalsIgnoreCase("/") && !ftpClient.changeWorkingDirectory(new String(directory.getBytes("GBK"), "iso-8859-1"))) {
//如果远程目录不存在,则递归创建远程服务器目录
int start = 0;
int end = 0;
if (directory.startsWith("/")) {
start = 1;
} else {
start = 0;
}
end = directory.indexOf("/", start);
while (true) {
String subDirectory = new String(remote.substring(start, end).getBytes("GBK"), "iso-8859-1");
if (!ftpClient.changeWorkingDirectory(subDirectory)) {
if (ftpClient.makeDirectory(subDirectory)) {
ftpClient.changeWorkingDirectory(subDirectory);
} else {
System.out.println("创建目录失败");
return UploadStatus.CREATE_DIRECTORY_FAIL;
}
}
start = end + 1;
end = directory.indexOf("/", start);
//检查所有目录是否创建完毕
if (end <= start) {
break;
}
}
}
return status;
}
/**
* 上传文件到服务器,新上传和断点续传
*
* @param remoteFile 远程文件名,在上传之前已经将服务器工作目录做了改变
* @param localFile 本地文件File句柄,绝对路径
* @param ftpClient FTPClient引用
* @throws IOException
*/
public UploadStatus uploadFile(String remoteFile, File localFile, FTPClient ftpClient, long remoteSize) throws IOException {
UploadStatus status;
//显示进度的上传
long step = localFile.length();
long process = 0;
long localreadbytes = 0L;
RandomAccessFile raf = new RandomAccessFile(localFile, "r");
OutputStream out = ftpClient.appendFileStream(new String(remoteFile.getBytes("GBK"), "iso-8859-1"));
//断点续传
if (remoteSize > 0) {
ftpClient.setRestartOffset(remoteSize);
process = remoteSize / step;
raf.seek(remoteSize);
localreadbytes = remoteSize;
}
byte[] bytes = new byte[1024];
int c;
while ((c = raf.read(bytes)) != -1) {
out.write(bytes, 0, c);
localreadbytes += c;
if (localreadbytes / step != process) {
process = localreadbytes / step;
System.out.println("上传进度:" + process);
}
}
out.flush();
raf.close();
out.close();
boolean result = ftpClient.completePendingCommand();
if (remoteSize > 0) {
status = result ? UploadStatus.UPLOAD_FROM_BREAK_SUCCESS : UploadStatus.UPLOAD_FROM_BREAK_FAILED;
} else {
status = result ? UploadStatus.UPLOAD_NEW_FILE_SUCCESS : UploadStatus.UPLOAD_NEW_FILE_FAILED;
}
return status;
}
}
🧑 该代码开发用于传输江都水站检测数据传输,省平台要求将每日数据通过xml文件的形式上传至指定服务器
💻 代码地址 github
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ALLBS!
评论