学成在线-媒资管理2

一、上传图片

1.数据架构

数据库存储方式

image-20260311202828596

前端配置minio服务器地址

image-20260311202901411

media_file

image-20260311213432618

上传原理

image

2.重要逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, String localFilePath) {
//文件上传minio
//获取扩展名
String filename = uploadFileParamsDto.getFilename();
String extension = FilenameUtils.getExtension(filename);
//通过扩展名获取mimetype
String mimeType = getMimeType(extension);
//获取文件路径以及md5
String defaultFolderPath = getDefaultFolderPath();
String fileMd5 = getFileMd5(new File(localFilePath));
String objectName = defaultFolderPath+fileMd5+extension;
//具体实现上传
boolean result = addMediaFilesToMinIO(localFilePath, mimeType, bucket_mediafiles, objectName);
if(!result){
XueChengPlusException.cast("文件上传失败");
}

//将文件信息保存到数据库
MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_mediafiles, objectName);
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
BeanUtils.copyProperties(mediaFiles, uploadFileResultDto);

return uploadFileResultDto;
}

3.事务优化

将上传文件到第三方(如 MinIO、阿里云 OSS)等网络 I/O 操作放在事务外部是绝对正确的。如果整个 uploadFile 都在事务中,网络一旦波动或上传时间较长,会导致数据库连接一直被占用,极易引发连接池耗尽和数据库死锁。“事务越精细越好” 是后端性能优化的铁律

1
2
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket,String objectName){

Spring 的 @Transactional 是基于动态代理实现的。如果在类内部直接调用 this.addMediaFilesToDb(),是没有经过 Spring 代理对象的,因此切面逻辑(开启、提交/回滚事务)不会被触发。通过 @Autowired 注入自身的代理对象 currentProxy,强制走代理逻辑,成功激活了事务注解

于是将addMediaFilesToDb方法暴露到service接口方法中,在注入service代理类

1
2
3
4
@Autowired
private MediaFileService currentProxy;

MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_mediafiles, objectName);

二、上传视频

1.断点续传

image-20260312211536417

2.minioSDK

上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//将文件上传道minio
public boolean addMediaFilesToMinIO(String localFilePath,String mimeType,String bucket, String objectName) {
try {
UploadObjectArgs testbucket = UploadObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.filename(localFilePath)
.contentType(mimeType)
.build();
minioClient.uploadObject(testbucket);
log.debug("上传文件到minio成功,bucket:{},objectName:{}",bucket,objectName);
System.out.println("上传成功");
return true;
} catch (Exception e) {
e.printStackTrace();
log.error("上传文件到minio出错,bucket:{},objectName:{},错误原因:{}",bucket,objectName,e.getMessage(),e);
XueChengPlusException.cast("上传文件到文件系统失败");
}
return false;
}

删除分块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void clearChunkFiles(String chunkFileFolderPath,int chunkTotal){

try {
List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i)
.limit(chunkTotal)
.map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i))))
.collect(Collectors.toList());

RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();
Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
//遍历以正确删除分块文件
results.forEach(r->{
DeleteError deleteError = null;
try {
deleteError = r.get();//这里需要get以删除
} catch (Exception e) {
e.printStackTrace();
log.error("清楚分块文件失败,objectname:{}",deleteError.objectName(),e);
}
});
} catch (Exception e) {
e.printStackTrace();
log.error("清楚分块文件失败,chunkFileFolderPath:{}",chunkFileFolderPath,e);
}
}

下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public File downloadFileFromMinIO(String bucket,String objectName){
//临时文件
File minioFile = null;
FileOutputStream outputStream = null;
try{
InputStream stream = minioClient.getObject(GetObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.build());
//创建临时文件
minioFile=File.createTempFile("minio", ".merge");
outputStream = new FileOutputStream(minioFile);
IOUtils.copy(stream,outputStream);
return minioFile;
} catch (Exception e) {
e.printStackTrace();
}finally {
if(outputStream!=null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return null;
}

3.视频上传主要逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
//=====获取分块文件路径=====
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//组成将分块文件路径组成 List<ComposeSource>
List<ComposeSource> sourceObjectList = Stream.iterate(0, i -> ++i)
.limit(chunkTotal)
.map(i -> ComposeSource.builder()
.bucket(bucket_video)
.object(chunkFileFolderPath.concat(Integer.toString(i)))
.build())
.collect(Collectors.toList());
//=====合并=====
//文件名称
String fileName = uploadFileParamsDto.getFilename();
//文件扩展名
String extName = fileName.substring(fileName.lastIndexOf("."));
//合并文件路径
String mergeFilePath = getFilePathByMd5(fileMd5, extName);
try {
//合并文件
ObjectWriteResponse response = minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucket_video)
.object(mergeFilePath)
.sources(sourceObjectList)
.build());
log.debug("合并文件成功:{}",mergeFilePath);
} catch (Exception e) {
log.debug("合并文件失败,fileMd5:{},异常:{}",fileMd5,e.getMessage(),e);
return RestResponse.validfail(false, "合并文件失败。");
}

// ====验证md5====
//下载合并后的文件
File minioFile = downloadFileFromMinIO(bucket_video,mergeFilePath);
if(minioFile == null){
log.debug("下载合并后文件失败,mergeFilePath:{}",mergeFilePath);
return RestResponse.validfail(false, "下载合并后文件失败。");
}

try (InputStream newFileInputStream = new FileInputStream(minioFile)) {
//minio上文件的md5值
String md5Hex = DigestUtils.md5Hex(newFileInputStream);
//比较md5值,不一致则说明文件不完整
if(!fileMd5.equals(md5Hex)){
return RestResponse.validfail(false, "文件合并校验失败,最终上传失败。");
}
//文件大小
uploadFileParamsDto.setFileSize(minioFile.length());
}catch (Exception e){
log.debug("校验文件失败,fileMd5:{},异常:{}",fileMd5,e.getMessage(),e);
return RestResponse.validfail(false, "文件合并校验失败,最终上传失败。");
}finally {
if(minioFile!=null){
minioFile.delete();
}
}

//文件入库
currentProxy.addMediaFilesToDb(companyId,fileMd5,uploadFileParamsDto,bucket_video,mergeFilePath);
//=====清除分块文件=====
clearChunkFiles(chunkFileFolderPath,chunkTotal);
return RestResponse.success(true);
}

三、视频转码

1.文件格式与编码格式

image-20260312222046701

编码格式的作用

因为原始的视频和音频数据实在太大了! 一部未经任何压缩的1080P、时长2小时的电影,可能需要几千GB的存储空间。网络根本无法传输,硬盘也装不下。因此,工程师发明了各种“编码格式”(压缩算法),用来剔除人类眼睛和耳朵察觉不到的冗余信息,把几千GB的数据压缩到只有几个GB甚至几百MB

image-20260312222840638

2.工具实现

fffmp

1
2
https://www.ffmpeg.org/download.html#build-windows
https://zhuanlan.zhihu.com/p/15849180981

fffmp

image-20260312222915113

image-20260312223010146
1
2
3
4
5
6
7
8
9
10
11
12
13
//ffmpeg的路径
String ffmpeg_path = "D:\\develop\\ffmpeg\\ffmpeg.exe";//ffmpeg的安装位置
//源avi视频的路径
String video_path = "D:\\develop\\bigfile_test\\nacos01.avi";
//转换后mp4文件的名称
String mp4_name = "nacos01.mp4";
//转换后mp4文件的路径
String mp4_path = "D:\\develop\\bigfile_test\\";
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
//开始视频转换,成功将返回success
String s = videoUtil.generateMp4();
System.out.println(s);

四、分布式任务调度

1.基础

本质上就是多台机器做定时任务

image-20260312223700873

好处

image-20260312223733487

目前主要实现

image-20260312223808509

任务调度的其他实现方式

1
2
3
4
多线程方式实现
Timer方式实现
ScheduledExecutor方式实现
三方Quartz方式实现

2.XSS-JOB

核心架构

image-20260313172444905

2.1、配置过程

1
2
GitHub:https://github.com/xuxueli/xxl-job
码云:https://gitee.com/xuxueli0323/xxl-job

1.pom.xml中引入依赖

1
2
3
4
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>

2.调度中心注册

image-20260313174337785

3.配置

1
2
3
4
5
6
7
8
9
10
11
12
xxl:
job:
admin:
addresses: http://192.168.101.65:8088/xxl-job-admin
executor:
appname: media-process-service
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token

4.配置xxl-job的执行器

将xxl-job示例工程下配置类拷贝到媒资管理的service工程下

2.2、调用过程

1.定义人物类以及执行内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);

/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");

for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}

}

2.注册中心注册

image-20260313175435892

3.启动任务

image-20260313175931640

2.3、高级配置

image-20260313180419806

分片广播

image-20260313183223554

在代码中,不需要关心怎么分发任务,只需要根据这两个参数来“过滤”自己该干的活。最常用的方案是取模运算 (mod)

一个系统启动俩个执行器

image-20260313190308420

通过-D参数启动另外一个执行器

1
2
3
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

获取分片参数

执行结果如下

image-20260313192507693

2.4、保证幂等性

幂等性就是针对同一个任务执行之后不在重复执行

image-20260313222121582

五、视频转码任务实现

image

1.分布式锁

实现原因

image-20260315172228169

实现方式

image-20260315172347639

我们这里用到了数据库分布式锁方式下的乐观锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1. 核心执行流程:版本号机制 (Version)
实现乐观锁最常用的方式,是在数据库表中增加一个额外的字段,通常叫做 version(版本号),默认值为 01

步骤 1:读取数据,记下版本号
当客户端 A 想要扣减某个商品的库存时,它先去查数据,同时把版本号也查出来。

-- 查出当前的库存和版本号
-- 假设查出来 stock = 10, version = 1
SELECT stock, version FROM goods WHERE id = 'order_123';
步骤 2:在内存中执行业务逻辑
客户端 A 发现 stock = 10,库存充足,决定扣减 1 个库存。

步骤 3:带着版本号去更新数据(核心!)
客户端 A 尝试去更新数据库,但在 WHERE 条件里,必须带上刚才查出来的版本号 version = 1。同时,把版本号加 1

UPDATE goods
SET stock = stock - 1, version = version + 1
WHERE id = 'order_123' AND version = 1;
步骤 4:判断更新是否成功

情况 A(没人抢): 如果在这个过程中,没有其他客户端修改过这条数据,那么数据库里这条记录的 version 依然是 1。SQL 语句能成功匹配到这行记录,执行更新,返回影响行数(affected rows)为 1。扣减成功!

情况 B(被别人抢先了): 假设在客户端 A 查出数据到更新数据的这段时间里,客户端 B 抢先一步买了这个商品,并且成功把 version 更新成了 2。此时,客户端 A 执行上面的 UPDATE 语句时,发现数据库里找不到 id = 'order_123' AND version = 1 的记录了。SQL 执行完毕,返回影响行数(affected rows)为 0。这就意味着发生了并发冲突,更新失败!

步骤 5:失败后的处理(重试机制)
如果发现影响行数为 0,程序不能直接报错说系统异常,而是需要根据业务场景决定:

直接返回失败给用户: 比如“当前排队人数过多,请重试”。

自旋重试: 写一个循环(比如 while),让客户端 A 重新回到步骤 1,再去查最新的库存和版本号,再尝试扣减,直到成功或者超过最大重试次数为止。

2.线程阻塞

1
2
3
CountDownLatch countDownLatch = new CountDownLatch(size);
countDownLatch.countDown();
countDownLatch.await(30, TimeUnit.MINUTES);

image-20260315182256560

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1. CountDownLatch countDownLatch = new CountDownLatch(size);
含义: 初始化一个倒计时器,设定需要等待完成的“任务数量”(就是这个 size)。

场景比喻: 假设你是一个旅行团的导游,大巴车停在景点门口,你需要等 5名 自由活动的游客全部上车后才能发车。这里的 size 就是 5。导游手里拿着一个计数器,初始值是 5。

2. countDownLatch.countDown();
含义: 任务完成一次,计数器就减 1。注意:这个方法通常是由那些“被等待的子线程”在执行完自己的任务后调用的。

场景比喻: 每一名游客回到大巴车上,导游手里的计数器就减 1(5变成4,4变成3...)。

避坑指南(极其重要): 在实际写代码时,countDown() 这个方法必须、一定、绝对要放在 try-finally 代码块的 finally 里面!因为如果子线程在执行任务时发生了异常(报错了),导致代码中断,没有执行到 countDown(),那么主线程的计数器就永远不会归零,主线程就会被永远卡死(死锁)。

3. countDownLatch.await(30, TimeUnit.MINUTES);
含义: 发起等待。调用这个方法的线程(通常是分配任务的主线程)会被阻塞卡住,直到计数器减到 0,才会继续往下执行。

带有超时时间的等待: 你给它加了 30, TimeUnit.MINUTES。这是一种非常稳妥的防御性编程!它的意思是:主线程最多等待 30 分钟。

情况 A(顺利): 比如在第 10 分钟的时候,所有子任务都 countDown() 完毕,计数器归零了。主线程会被立刻唤醒,继续往下走,不需要傻等满 30 分钟。此时 await 方法返回 true。

情况 B(超时): 30 分钟过去了,但是子线程可能卡住了,导致计数器还没归零。主线程说:“不等了,我还有别的事!”,于是强制解除阻塞,继续往下走。此时 await 方法返回 false。

image-20260318145243518