学成在线-媒资管理2 一、上传图片 1.数据架构 数据库存储方式
前端配置minio服务器地址
media_file
上传原理
2.重要逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public UploadFileResultDto uploadFile (Long companyId, UploadFileParamsDto uploadFileParamsDto, String localFilePath) { String filename = uploadFileParamsDto.getFilename(); String extension = FilenameUtils.getExtension(filename); String mimeType = getMimeType(extension); String defaultFolderPath = getDefaultFolderPath(); String fileMd5 = getFileMd5(new File (localFilePath)); String objectName = defaultFolderPath+fileMd5+extension; boolean result = addMediaFilesToMinIO(localFilePath, mimeType, bucket_mediafiles, objectName); if (!result){ XueChengPlusException.cast("文件上传失败" ); } MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_mediafiles, objectName); UploadFileResultDto uploadFileResultDto = new UploadFileResultDto (); BeanUtils.copyProperties(mediaFiles, uploadFileResultDto); return uploadFileResultDto; }
3.事务优化 将上传文件到第三方(如 MinIO、阿里云 OSS)等网络 I/O 操作放在事务外部是绝对正确的。如果整个 uploadFile 都在事务中,网络一旦波动或上传时间较长,会导致数据库连接一直被占用,极易引发连接池耗尽和数据库死锁。“事务越精细越好” 是后端性能优化的铁律
1 2 @Transactional public MediaFiles addMediaFilesToDb (Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket,String objectName) {
Spring 的 @Transactional 是基于动态代理实现的。如果在类内部直接调用 this.addMediaFilesToDb(),是没有经过 Spring 代理对象的,因此切面逻辑(开启、提交/回滚事务)不会被触发。通过 @Autowired 注入自身的代理对象 currentProxy,强制走代理逻辑,成功激活了事务注解
于是将addMediaFilesToDb方法暴露到service接口方法中,在注入service代理类
1 2 3 4 @Autowired private MediaFileService currentProxy;MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_mediafiles, objectName);
二、上传视频 1.断点续传
2.minioSDK 上传
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public boolean addMediaFilesToMinIO (String localFilePath,String mimeType,String bucket, String objectName) { try { UploadObjectArgs testbucket = UploadObjectArgs.builder() .bucket(bucket) .object(objectName) .filename(localFilePath) .contentType(mimeType) .build(); minioClient.uploadObject(testbucket); log.debug("上传文件到minio成功,bucket:{},objectName:{}" ,bucket,objectName); System.out.println("上传成功" ); return true ; } catch (Exception e) { e.printStackTrace(); log.error("上传文件到minio出错,bucket:{},objectName:{},错误原因:{}" ,bucket,objectName,e.getMessage(),e); XueChengPlusException.cast("上传文件到文件系统失败" ); } return false ; }
删除分块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void clearChunkFiles (String chunkFileFolderPath,int chunkTotal) { try { List<DeleteObject> deleteObjects = Stream.iterate(0 , i -> ++i) .limit(chunkTotal) .map(i -> new DeleteObject (chunkFileFolderPath.concat(Integer.toString(i)))) .collect(Collectors.toList()); RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video" ).objects(deleteObjects).build(); Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs); results.forEach(r->{ DeleteError deleteError = null ; try { deleteError = r.get(); } catch (Exception e) { e.printStackTrace(); log.error("清楚分块文件失败,objectname:{}" ,deleteError.objectName(),e); } }); } catch (Exception e) { e.printStackTrace(); log.error("清楚分块文件失败,chunkFileFolderPath:{}" ,chunkFileFolderPath,e); } }
下载
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public File downloadFileFromMinIO (String bucket,String objectName) { File minioFile = null ; FileOutputStream outputStream = null ; try { InputStream stream = minioClient.getObject(GetObjectArgs.builder() .bucket(bucket) .object(objectName) .build()); minioFile=File.createTempFile("minio" , ".merge" ); outputStream = new FileOutputStream (minioFile); IOUtils.copy(stream,outputStream); return minioFile; } catch (Exception e) { e.printStackTrace(); }finally { if (outputStream!=null ){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return null ; }
3.视频上传主要逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public RestResponse mergechunks (Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) { String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); List<ComposeSource> sourceObjectList = Stream.iterate(0 , i -> ++i) .limit(chunkTotal) .map(i -> ComposeSource.builder() .bucket(bucket_video) .object(chunkFileFolderPath.concat(Integer.toString(i))) .build()) .collect(Collectors.toList()); String fileName = uploadFileParamsDto.getFilename(); String extName = fileName.substring(fileName.lastIndexOf("." )); String mergeFilePath = getFilePathByMd5(fileMd5, extName); try { ObjectWriteResponse response = minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucket_video) .object(mergeFilePath) .sources(sourceObjectList) .build()); log.debug("合并文件成功:{}" ,mergeFilePath); } catch (Exception e) { log.debug("合并文件失败,fileMd5:{},异常:{}" ,fileMd5,e.getMessage(),e); return RestResponse.validfail(false , "合并文件失败。" ); } File minioFile = downloadFileFromMinIO(bucket_video,mergeFilePath); if (minioFile == null ){ log.debug("下载合并后文件失败,mergeFilePath:{}" ,mergeFilePath); return RestResponse.validfail(false , "下载合并后文件失败。" ); } try (InputStream newFileInputStream = new FileInputStream (minioFile)) { String md5Hex = DigestUtils.md5Hex(newFileInputStream); if (!fileMd5.equals(md5Hex)){ return RestResponse.validfail(false , "文件合并校验失败,最终上传失败。" ); } uploadFileParamsDto.setFileSize(minioFile.length()); }catch (Exception e){ log.debug("校验文件失败,fileMd5:{},异常:{}" ,fileMd5,e.getMessage(),e); return RestResponse.validfail(false , "文件合并校验失败,最终上传失败。" ); }finally { if (minioFile!=null ){ minioFile.delete(); } } currentProxy.addMediaFilesToDb(companyId,fileMd5,uploadFileParamsDto,bucket_video,mergeFilePath); clearChunkFiles(chunkFileFolderPath,chunkTotal); return RestResponse.success(true ); }
三、视频转码 1.文件格式与编码格式
编码格式的作用
因为原始的视频和音频数据实在太大了! 一部未经任何压缩的1080P、时长2小时的电影,可能需要几千GB的存储空间。网络根本无法传输,硬盘也装不下。因此,工程师发明了各种“编码格式”(压缩算法),用来剔除人类眼睛和耳朵察觉不到的冗余信息,把几千GB的数据压缩到只有几个GB甚至几百MB
2.工具实现 fffmp
1 2 https://www.ffmpeg.org/download.html#build-windows https://zhuanlan.zhihu.com/p/15849180981
fffmp
1 2 3 4 5 6 7 8 9 10 11 12 13 String ffmpeg_path = "D:\\develop\\ffmpeg\\ffmpeg.exe" ;String video_path = "D:\\develop\\bigfile_test\\nacos01.avi" ;String mp4_name = "nacos01.mp4" ;String mp4_path = "D:\\develop\\bigfile_test\\" ;Mp4VideoUtil videoUtil = new Mp4VideoUtil (ffmpeg_path,video_path,mp4_name,mp4_path);String s = videoUtil.generateMp4();System.out.println(s);
四、分布式任务调度 1.基础 本质上就是多台机器做定时任务
好处
目前主要实现
任务调度的其他实现方式
1 2 3 4 多线程方式实现 Timer方式实现 ScheduledExecutor方式实现 三方Quartz方式实现
2.XSS-JOB 核心架构
2.1、配置过程 1 2 GitHub:https://github.com/xuxueli/xxl-job 码云:https://gitee.com/xuxueli0323/xxl-job
1. pom.xml中引入依赖
1 2 3 4 <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> </dependency>
2.调度中心注册
3.配置
1 2 3 4 5 6 7 8 9 10 11 12 xxl: job: admin: addresses: http://192.168.101.65:8088/xxl-job-admin executor: appname: media-process-service address: ip: port: 9999 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 accessToken: default_token
4.配置xxl-job的执行器
将xxl-job示例工程下配置类拷贝到媒资管理的service工程下
2.2、调用过程 1.定义人物类以及执行内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class); @XxlJob("demoJobHandler") public void demoJobHandler () throws Exception { XxlJobHelper.log("XXL-JOB, Hello World." ); for (int i = 0 ; i < 5 ; i++) { XxlJobHelper.log("beat at:" + i); TimeUnit.SECONDS.sleep(2 ); } } }
2.注册中心注册
3.启动任务
2.3、高级配置
分片广播
在代码中,不需要关心怎么分发任务,只需要根据这两个参数来“过滤”自己该干的活。最常用的方案是取模运算 (mod)
一个系统启动俩个执行器
通过-D参数启动另外一个执行器
1 2 3 int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
获取分片参数
执行结果如下
2.4、保证幂等性 幂等性就是针对同一个任务执行之后不在重复执行
五、视频转码任务实现
1.分布式锁 实现原因
实现方式
我们这里用到了数据库分布式锁方式下的乐观锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 1. 核心执行流程:版本号机制 (Version)实现乐观锁最常用的方式,是在数据库表中增加一个额外的字段,通常叫做 version(版本号),默认值为 0 或 1 。 步骤 1 :读取数据,记下版本号 当客户端 A 想要扣减某个商品的库存时,它先去查数据,同时把版本号也查出来。 -- 查出当前的库存和版本号 -- 假设查出来 stock = 10 , version = 1 SELECT stock, version FROM goods WHERE id = 'order_123' ; 步骤 2 :在内存中执行业务逻辑 客户端 A 发现 stock = 10 ,库存充足,决定扣减 1 个库存。 步骤 3 :带着版本号去更新数据(核心!) 客户端 A 尝试去更新数据库,但在 WHERE 条件里,必须带上刚才查出来的版本号 version = 1 。同时,把版本号加 1 。 UPDATE goods SET stock = stock - 1 , version = version + 1 WHERE id = 'order_123' AND version = 1 ;步骤 4 :判断更新是否成功 情况 A(没人抢): 如果在这个过程中,没有其他客户端修改过这条数据,那么数据库里这条记录的 version 依然是 1 。SQL 语句能成功匹配到这行记录,执行更新,返回影响行数(affected rows)为 1 。扣减成功! 情况 B(被别人抢先了): 假设在客户端 A 查出数据到更新数据的这段时间里,客户端 B 抢先一步买了这个商品,并且成功把 version 更新成了 2 。此时,客户端 A 执行上面的 UPDATE 语句时,发现数据库里找不到 id = 'order_123' AND version = 1 的记录了。SQL 执行完毕,返回影响行数(affected rows)为 0 。这就意味着发生了并发冲突,更新失败! 步骤 5 :失败后的处理(重试机制) 如果发现影响行数为 0 ,程序不能直接报错说系统异常,而是需要根据业务场景决定: 直接返回失败给用户: 比如“当前排队人数过多,请重试”。 自旋重试: 写一个循环(比如 while ),让客户端 A 重新回到步骤 1 ,再去查最新的库存和版本号,再尝试扣减,直到成功或者超过最大重试次数为止。
2.线程阻塞 1 2 3 CountDownLatch countDownLatch = new CountDownLatch (size);countDownLatch.countDown(); countDownLatch.await(30 , TimeUnit.MINUTES);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 1. CountDownLatch countDownLatch = new CountDownLatch(size); 含义: 初始化一个倒计时器,设定需要等待完成的“任务数量”(就是这个 size)。 场景比喻: 假设你是一个旅行团的导游,大巴车停在景点门口,你需要等 5名 自由活动的游客全部上车后才能发车。这里的 size 就是 5。导游手里拿着一个计数器,初始值是 5。 2. countDownLatch.countDown(); 含义: 任务完成一次,计数器就减 1。注意:这个方法通常是由那些“被等待的子线程”在执行完自己的任务后调用的。 场景比喻: 每一名游客回到大巴车上,导游手里的计数器就减 1(5变成4,4变成3...)。 避坑指南(极其重要): 在实际写代码时,countDown() 这个方法必须、一定、绝对要放在 try-finally 代码块的 finally 里面!因为如果子线程在执行任务时发生了异常(报错了),导致代码中断,没有执行到 countDown(),那么主线程的计数器就永远不会归零,主线程就会被永远卡死(死锁)。 3. countDownLatch.await(30, TimeUnit.MINUTES); 含义: 发起等待。调用这个方法的线程(通常是分配任务的主线程)会被阻塞卡住,直到计数器减到 0,才会继续往下执行。 带有超时时间的等待: 你给它加了 30, TimeUnit.MINUTES。这是一种非常稳妥的防御性编程!它的意思是:主线程最多等待 30 分钟。 情况 A(顺利): 比如在第 10 分钟的时候,所有子任务都 countDown() 完毕,计数器归零了。主线程会被立刻唤醒,继续往下走,不需要傻等满 30 分钟。此时 await 方法返回 true。 情况 B(超时): 30 分钟过去了,但是子线程可能卡住了,导致计数器还没归零。主线程说:“不等了,我还有别的事!”,于是强制解除阻塞,继续往下走。此时 await 方法返回 false。