RxJava加Retrofit文件分段上传实现详解

作者:Chavin 时间:2022-03-02 05:36:10 

前言

RxJava加Retrofit文件分段上传实现详解

本文基于 RxJava 和 Retrofit 库,设计并实现了一种用于大文件分块上传的工具,并对其进行了全面的拆解分析。抛砖引玉,对同样有处理文件分块上传诉求的读者,可能会起到一定的启发作用。

文章主体由四部分构成:

  • 首先分析问题,问题拆解为:多线程分段读取文件、构建和发出文件片段上传请求

  • 基于 JDK 随机读取文件的类库,设计本地多线程分段读取文件的单元

  • 基于 Retrofit 设计由文件片段构建上传的网络请求

  • 从上述设计演变而来的完整代码实现

  另外,在文章提供的完整代码中,还附了一段由 PHP 编写,用来接收多线程分段数据的服务端接口实现,其中处理了因客户端都线程上传片段,导致服务端接收的文件片段无序,故需在适当时机合并分块构成目标文件。

受限于笔者的开发经验与理论理解,文章的思路和代码难免可能有偏颇,对于有改进和优化的部分,欢迎大家讨论区提出。

问题拆解

要完成文件分段上传到服务端,第一步是分段读取本地文件。通常分段是为了多线程同时执行上传,提高设备计算和网络资源利用率,减少上传时间优化体验,这样即需要一个支持多线程的文件分段读取工具。由于文件可能超过设备内存大小,在读取这类超大文件时需要控制最大读取量防止内存溢出。此时文件已从磁盘数据转换为内存中的字节数据,只需要将这些内存数据传给服务端即可。这样问题被分成 3 个子问题:

  • 分段读取文件到内存中

  • 控制多线程数量

  • 将文件片段传给服务端

问题 1 很好解决,利用 Java 的 RandomAccessFile 可对文件的随机读取的特性,即可按需读取文件片段到内存中。

问题 2 相对复杂一点,但如果有阅读过 JDK 中线程池源码的读者,就会发现这个问题的和控制线程池中线程数量其实是类似的。

问题 3 就不复杂了,Retrofit 基于 OKhttp ,OkHttp是很容易基于字节数组构建 multipart/form-data 请求的。

分块并发读取文件

根据上述对问题 1、2 的拆解,可将读取抽象为一个文件读取器,构建时传入文件对象和分段大小以及最大并发数,以及分段数据的回调。当外部启动读取时将根据文件大小和配置的分段大小构建若干个 Task 用于读取对应片段的数据。

public BlockReader(@NotNull File file, @NotNull BlockCallback callback, int poolSize, int blockSize) {
   mFile = file;
   mCallback = callback;
   mPoolSize = poolSize;
   mBlockSize = blockSize;
}
public void start(@Nullable BlockFilter filter) {
   Observable.empty().observeOn(Schedulers.computation()).doOnComplete(() -> {
       long length = mFile.length();
       for (long offset = 0; offset < length; offset += mBlockSize) {
           if (null != filter && filter.ignore(offset)) {
               continue;
           }
           mQueue.offer(new ReadTask(offset));
       }
       for (int i = 0; i < Math.min(mPoolSize, mQueue.size()); i++) {
           Observable.empty().observeOn(Schedulers.io()).doOnComplete(this::schedule).subscribe();
       }
   }).subscribe();
}

多线程调度部分,可通过加锁和记录状态变量统计当前正运行的线程数,则可控制字节数组数,这样就相当于控制住了最大内存占用。

private void schedule() {
   if (mRunning.get() >= mPoolSize) {
       return;
   }
   ReadTask task;
   synchronized (mQueue) {
       if (mRunning.get() >= mPoolSize) {
           return;
       }
       task = mQueue.poll();
       if (null != task) {
           mRunning.incrementAndGet();
       }
   }
   if (null != task) {
       task.run();
   }
}

最后是文件随机读取,直接调用 RandomAccessFile 的 API 即可:

private class ReadTask implements Action {
   @Override
   public void run() {
       try (RandomAccessFile raf = new RandomAccessFile(mFile, RAF_MODE);
               ByteArrayOutputStream out = new ByteArrayOutputStream(mBlockSize)) {
           raf.seek(mOffset);
           byte[] buf = new byte[DEF_BLOCK_SIZE];
           long cnt = 0;
           for (int bytes = raf.read(buf); bytes != -1 && cnt < mBlockSize; bytes = raf.read(buf)) {
               out.write(buf, 0, bytes);
               cnt += bytes;
           }
           out.flush();
           mCallback.onFinished(mOffset, out.toByteArray());
       } catch (IOException e) {
           mCallback.onFinished(mOffset, null);
       } finally {
           mRunning.decrementAndGet();
           schedule();
       }
   }
}

文件片段上传

上传部分则使用 Retrofit 提供的注解和 OKHttp 的类库构建请求。但值得一提的是需要在磁盘IO线程同步完成网络IO,这样可以避免网络IO速度落后磁盘IO太多而导致任务堆积造成内存溢出。

public interface BlockUploader {
   @POST("test/upload.php")
   @Multipart
   Single<Response<ResponseBody>> upload(@Header("filename") String filename,
                                         @Header("total") long total,
                                         @Header("offset") long offset,
                                         @Part List<MultipartBody.Part> body);
}
private static void syncUpload(String fileName, long fileLength, long offset, byte[] bytes) {
   RequestBody data = RequestBody.create(MediaType.parse("application/octet-stream"), bytes);
   MultipartBody body = new MultipartBody.Builder()
           .addFormDataPart("file", fileName, data)
           .setType(MultipartBody.FORM)
           .build();
   retrofit.create(BlockUploader.class).upload(fileName, fileLength, offset, body.parts()).subscribe(resp -> {
       if (resp.isSuccessful()) {
           System.out.println("✓ offset: " + offset + " upload succeed " + resp.code());
       } else {
           System.out.println("✗ offset: " + offset + " upload failed " + resp.code());
       }
   }, throwable -> {
       System.out.println("! offset: " + offset + " upload failed");
   });
}

完整代码

为控制篇幅,完整代码请移步 Github,服务端部分处理形如:

RxJava加Retrofit文件分段上传实现详解

来源:https://juejin.cn/post/7183887127992598585

标签:RxJava,Retrofit,文件上传
0
投稿

猜你喜欢

  • 实现分布式WebSocket集群的方法

    2021-06-12 22:55:10
  • C#集合本质之队列的用法详解

    2023-03-17 06:42:38
  • 详解升级Android Studio3.0时遇到的几个问题

    2021-11-19 08:13:52
  • Java 中 synchronized的用法详解(四种用法)

    2022-03-11 08:55:05
  • C语言使用strcmp()函数比较两个字符串的实现

    2023-10-15 13:06:55
  • java compare compareTo方法区别详解

    2022-06-26 08:13:55
  • C语言根据协议分割获取字符串单元的实现代码

    2023-06-21 08:20:27
  • java安全编码指南之:Number操作详解

    2021-09-27 07:14:50
  • 对Java中JSON解析器的一些见解

    2023-02-05 20:53:15
  • C#图像边缘检测(Roberts)的方法

    2022-12-26 15:11:18
  • 基于Android10渲染Surface的创建过程

    2022-02-02 12:25:13
  • 聊聊Java的switch为什么不支持long

    2023-08-24 17:35:14
  • Android实现手指触控图片缩放功能

    2021-06-07 17:08:00
  • Android调用手机摄像头拍照和录音功能

    2022-10-22 15:37:16
  • Spring JPA 错题集解决案例

    2022-03-26 22:44:03
  • Android App中使用ViewPager+Fragment实现滑动切换效果

    2023-01-12 19:51:50
  • Android ToolBar整合实例使用方法详解

    2023-04-05 07:27:53
  • Java十分钟精通异常处理机制

    2022-08-04 19:03:07
  • Android Activity 不能被截屏的解决方法

    2021-06-28 08:37:35
  • Unity Shader实现描边OutLine效果

    2022-01-13 03:11:13
  • asp之家 软件编程 m.aspxhome.com