配置ftp上传路径

image

读取的model FtpProperties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.lyc.ftp.utils;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
* 功能:
*
* @author chenQi
* @version 1.0
* @date 2020/11/16 15:24
*/
@Data
@ConfigurationProperties(prefix = "ftp")
@Component
public class FtpProperties {

private String url;

private Integer port;

private String username;

private String password;

private String path;
}

上传状态enums UploadStatus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.lyc.ftp.utils;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* @author chenqi
* @DESCRIPTION:
* @params:
*/
@Getter
@AllArgsConstructor
public enum UploadStatus {
/**
*
*/
CREATE_DIRECTORY_FAIL("远程服务器相应目录创建失败"),
CREATE_DIRECTORY_SUCCESS("远程服务器闯将目录成功"),
UPLOAD_NEW_FILE_SUCCESS("上传新文件成功"),
UPLOAD_NEW_FILE_FAILED("上传新文件失败"),
FILE_EXITS("文件已经存在"),
REMOTE_BIGGER_LOCAL("远程文件大于本地文件"),
UPLOAD_FROM_BREAK_SUCCESS("断点续传成功"),
UPLOAD_FROM_BREAK_FAILED("断点续传失败"),
DELETE_REMOTE_FAILD("删除远程文件失败");

private final String desc;
}

定时执行器,每个小时执行一次,文件每隔分钟上传一次ReadFileHandle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.lyc.ftp.handle;

import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.lyc.ftp.utils.ContinueFtp;
import com.lyc.ftp.utils.FtpProperties;
import lombok.AllArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 功能: 定时传送站点xml文件至ftp服务器
*
* @author chenQi
* @version 1.0
* @date 2020/11/16 15:47
*/
@Component
@AllArgsConstructor
public class ReadFileHandle {

private FtpProperties ftpProperties;

@Scheduled(cron = "0 5 * * * ?")
private void readFileAndSend() {
ContinueFtp myFtp = new ContinueFtp();
try {
// 查询本地文件夹中所有文件
File file = new File(ftpProperties.getPath());
File[] fileList = file.listFiles();
if (fileList.length > 0) {
// 使用 ThreadFactoryBuilder 创建自定义线程名称的 ThreadFactory
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNamePrefix("send-file-pool-%d").build();

// 创建线程池,其中任务队列需要结合实际情况设置合理的容量
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
20,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

for (File f : fileList) {
executor.execute(() -> {
try {
myFtp.connect(ftpProperties.getUrl(), ftpProperties.getPort(), ftpProperties.getUsername(), ftpProperties.getPassword());
myFtp.upload(f, f.getName());
myFtp.disconnect();
Thread.sleep(120000L);
} catch (IOException e) {
System.out.println("ftp连接出错" + e.getLocalizedMessage());
} catch (InterruptedException e) {
System.out.println("线程出错" + e.getLocalizedMessage());
}
});
}
// 优雅关闭线程池
executor.shutdown();
executor.awaitTermination(120L, TimeUnit.SECONDS);
}
} catch (InterruptedException e1) {
System.out.println("线程创建错误:" + e1.getMessage());
}
}
}

ftp上传逻辑 ContinueFtp 计算上传百分比存在问题,有时间修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package com.lyc.ftp.utils;

import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;

import java.io.*;

/**
* 支持断点续传的FTP实用类
*
* @author chenqi
* @version 0.3 实现中文目录创建及中文文件创建,添加对于中文的支持
*/
public class ContinueFtp {
public FTPClient ftpClient = new FTPClient();

public ContinueFtp() {
//设置将过程中使用到的命令输出到控制台
this.ftpClient.addProtocolCommandListener(new PrintCommandListener(new PrintWriter(System.out)));
}

/**
* 连接到FTP服务器
*
* @param hostname 主机名
* @param port 端口
* @param username 用户名
* @param password 密码
* @return 是否连接成功
* @throws IOException
*/
public boolean connect(String hostname, int port, String username, String password) throws IOException {
ftpClient.connect(hostname, port);
ftpClient.setControlEncoding("GBK");
if (FTPReply.isPositiveCompletion(ftpClient.getReplyCode())) {
if (ftpClient.login(username, password)) {
return true;
}
}
disconnect();
return false;
}

/**
* 上传文件到FTP服务器,支持断点续传
*
* @param remote 远程文件路径,使用/home/directory1/subdirectory/file.ext 按照Linux上的路径指定方式,支持多级目录嵌套,支持递归创建不存在的目录结构
* @return 上传结果
* @throws IOException
*/
public UploadStatus upload(File f, String remote) throws IOException {
//设置PassiveMode传输
ftpClient.enterLocalPassiveMode();
//设置以二进制流的方式传输
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
ftpClient.setControlEncoding("GBK");
UploadStatus result;
//对远程目录的处理
String remoteFileName = remote;
if (remote.contains("/")) {
remoteFileName = remote.substring(remote.lastIndexOf("/") + 1);
//创建服务器远程目录结构,创建失败直接返回
if (CreateDirecroty(remote, ftpClient) == UploadStatus.CREATE_DIRECTORY_FAIL) {
return UploadStatus.CREATE_DIRECTORY_FAIL;
}
}

//检查远程是否存在文件
FTPFile[] files = ftpClient.listFiles(new String(remoteFileName.getBytes("GBK"), "iso-8859-1"));
if (files.length == 1) {
long remoteSize = files[0].getSize();
long localSize = f.length();
if (remoteSize == localSize) {
// 删除本地文件
f.delete();
return UploadStatus.FILE_EXITS;
} else if (remoteSize > localSize) {
return UploadStatus.REMOTE_BIGGER_LOCAL;
}

//尝试移动文件内读取指针,实现断点续传
result = uploadFile(remoteFileName, f, ftpClient, remoteSize);

//如果断点续传没有成功,则删除服务器上文件,重新上传
if (result == UploadStatus.UPLOAD_FROM_BREAK_FAILED) {
if (!ftpClient.deleteFile(remoteFileName)) {
return UploadStatus.DELETE_REMOTE_FAILD;
}
result = uploadFile(remoteFileName, f, ftpClient, 0);
}
} else {
result = uploadFile(remoteFileName, f, ftpClient, 0);
// 删除本地文件
f.delete();
}
return result;
}

/**
* 断开与远程服务器的连接
*
* @throws IOException
*/
public void disconnect() throws IOException {
if (ftpClient.isConnected()) {
ftpClient.disconnect();
}
}

/**
* 递归创建远程服务器目录
*
* @param remote 远程服务器文件绝对路径
* @param ftpClient FTPClient对象
* @return 目录创建是否成功
* @throws IOException
*/
public UploadStatus CreateDirecroty(String remote, FTPClient ftpClient) throws IOException {
UploadStatus status = UploadStatus.CREATE_DIRECTORY_SUCCESS;
String directory = remote.substring(0, remote.lastIndexOf("/") + 1);
if (!directory.equalsIgnoreCase("/") && !ftpClient.changeWorkingDirectory(new String(directory.getBytes("GBK"), "iso-8859-1"))) {
//如果远程目录不存在,则递归创建远程服务器目录
int start = 0;
int end = 0;
if (directory.startsWith("/")) {
start = 1;
} else {
start = 0;
}
end = directory.indexOf("/", start);
while (true) {
String subDirectory = new String(remote.substring(start, end).getBytes("GBK"), "iso-8859-1");
if (!ftpClient.changeWorkingDirectory(subDirectory)) {
if (ftpClient.makeDirectory(subDirectory)) {
ftpClient.changeWorkingDirectory(subDirectory);
} else {
System.out.println("创建目录失败");
return UploadStatus.CREATE_DIRECTORY_FAIL;
}
}

start = end + 1;
end = directory.indexOf("/", start);

//检查所有目录是否创建完毕
if (end <= start) {
break;
}
}
}
return status;
}

/**
* 上传文件到服务器,新上传和断点续传
*
* @param remoteFile 远程文件名,在上传之前已经将服务器工作目录做了改变
* @param localFile 本地文件File句柄,绝对路径
* @param ftpClient FTPClient引用
* @throws IOException
*/
public UploadStatus uploadFile(String remoteFile, File localFile, FTPClient ftpClient, long remoteSize) throws IOException {
UploadStatus status;
//显示进度的上传
long step = localFile.length();
long process = 0;
long localreadbytes = 0L;
RandomAccessFile raf = new RandomAccessFile(localFile, "r");
OutputStream out = ftpClient.appendFileStream(new String(remoteFile.getBytes("GBK"), "iso-8859-1"));
//断点续传
if (remoteSize > 0) {
ftpClient.setRestartOffset(remoteSize);
process = remoteSize / step;
raf.seek(remoteSize);
localreadbytes = remoteSize;
}
byte[] bytes = new byte[1024];
int c;
while ((c = raf.read(bytes)) != -1) {
out.write(bytes, 0, c);
localreadbytes += c;
if (localreadbytes / step != process) {
process = localreadbytes / step;
System.out.println("上传进度:" + process);
}
}
out.flush();
raf.close();
out.close();
boolean result = ftpClient.completePendingCommand();
if (remoteSize > 0) {
status = result ? UploadStatus.UPLOAD_FROM_BREAK_SUCCESS : UploadStatus.UPLOAD_FROM_BREAK_FAILED;
} else {
status = result ? UploadStatus.UPLOAD_NEW_FILE_SUCCESS : UploadStatus.UPLOAD_NEW_FILE_FAILED;
}
return status;
}
}