Java之Rsync并发迁移数据并校验详解

作者:王者丶丿风范 时间:2023-07-17 23:01:22 

java调用Rsync并发迁移数据并执行校验

java代码如下

RsyncFile.java


import lombok.NoArgsConstructor;
import lombok.SneakyThrows;

import java.io.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;

/**
* @ClassName RsyncFile
* @Descriptiom TODO rsync多线程同步迁移数据
* @Author KING
* @Date 2019/11/25 09:17
* @Version 1.2.2
* rsync -vzrtopg --progress --delete   //镜像同步
**/
@NoArgsConstructor
public class RsyncFile implements  Runnable{
   private static final int availProcessors = Runtime.getRuntime().availableProcessors();
   //构造以cpu核心数为核心池,cpu线程数为最大池,超时时间为1s,线程队列为大小为 * 的安全阻塞线程队列,拒绝策略为DiscardOldestPolicy()的线程池。(同步数据当然不能丢下拒绝任务)
   private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1,
           availProcessors,
           1L,
           TimeUnit.SECONDS,
           new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),
           new ThreadPoolExecutor.DiscardOldestPolicy());

//保存扫描得到的文件列表
   private static ArrayList<String> fileNameList = new ArrayList<String>();
   private String shellname;
   private String filename;
   private String userip;
   private CountDownLatch countDownLatch;
   private static int deep = 0;

public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) {
       this.shellname = ShellName;
       this.filename = filename;
       this.userip = UserIP;
       this.countDownLatch = countDownLatch;
   }

public static void main(String[] args) {
       try {
           new RsyncFile().Do(args[0],args[1],Integer.parseInt(args[2]));
       }catch (ArrayIndexOutOfBoundsException e){
           System.out.println(e);
           System.out.println("Error , args send fault");
           System.out.println("please send localAddress remote username @ remote IP or hostname and catalogue");
           System.out.println("like this [  /home/test/  root@node1:/test/   1 ]");
       }catch(NumberFormatException e1){
           System.out.println(e1);
           System.out.println("please input Right Directory depth, this number must be int");
           System.out.println("like this [  /home/test/  root@node1:/test/   1 ]");
       }
   }

@SneakyThrows
   private void Do(String content,String UserIP,int setdeep){
       System.out.println("开始执行");
       System.out.println("开始时间:" + new Date());
       Long a = System.nanoTime();
       File file = new File(content);
       System.out.println("开始扫描本地指定目录");
       GetAllFile(file,setdeep);//按深度扫描非空文件夹和文件
       System.out.println("扫描本地目录完成");

//给脚本赋予权限
       String [] cmd={"/bin/sh","-c","chmod 755 ./do*"};
       Runtime.getRuntime().exec(cmd);
       //创建远端目录操作
       System.out.println("开始创建远端目录结构");
       //一次计数锁用于保证目录创建完成
       CountDownLatch doDirLock = new CountDownLatch(1);
       ThreadPool.execute(new RsyncFile("./doDirc.sh",content,UserIP,doDirLock));
       doDirLock.await();
       System.out.println("创建远端目录结构完成");

//开始同步工作
       System.out.println("开始执行同步工作");
       System.out.println("同步的文件夹或文件总数: " + fileNameList.size());
       System.out.println("正在同步。。。。。");
       //fileNameList.size()次计数锁用于保证数据同步完成(保证计时准确)
       CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size());
       System.out.println(fileNameList.size());
       for (String fileName:fileNameList) {
           //除去文件名中与UserIP重复的文件路径
           String RemoteDir = UserIP.concat(fileName.replace(content, ""));
           System.out.println("要同步的本地目录或文件: " + fileName);
           System.out.println("要同步的远端目录或文件: " + RemoteDir);
           ThreadPool.execute(new RsyncFile("./doRync.sh",fileName, RemoteDir,rsyncLock));
       }
       rsyncLock.await();
       System.out.println("执行同步工作完成");

//开始文件校验工作
       System.out.println("执行文件校验及重传");
       //一次计数锁用于保证校验完成
       CountDownLatch chechSumLock = new CountDownLatch(1);
       ThreadPool.execute(new RsyncFile("./doChecksum.sh",content,UserIP,chechSumLock));
       chechSumLock.await();
       System.out.println("文件校验及重传完成");
       ThreadPool.shutdown();

Long b = System.nanoTime();
       Long aLong = (b - a)/1000000L;
       System.out.println("处理时间" + aLong + "ms");
       System.out.println("结束时间:" + new Date());

}

/**
    * 执行rsync脚本的线程方法,使用PrintWriter来与linux Terminal交互
    */
   @Override
   public void run() {
       try {
           String command=shellname.concat(" ").concat(filename).concat(" ").concat(userip);
           File wd = new File("/bin");
           Process process = null;
           process = Runtime.getRuntime().exec("/bin/bash", null, wd);
           if (process != null) {
               InputStream is = process.getInputStream();
               BufferedReader reader = new BufferedReader(new InputStreamReader(is));
               PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())),
                       true);
               //切换到当前class文件所在目录
               out.println("cd " + System.getProperty("user.dir"));
               out.println(command);
               out.println("exit");
               StringBuilder sb = new StringBuilder();
               String line;
               while ((line = reader.readLine()) != null) {
                   sb.append(line + System.lineSeparator());
               }
               process.waitFor();
               reader.close();
               out.close();
               process.destroy();
               System.out.println("result:" + sb.toString());
           }else {
               System.out.println("找不到系统bash工具,请检查系统是否异常,并为系统创建/bin/sh的bash工具软连接");
           }
       } catch (Exception e) {
           System.err.println(e.getMessage());
       }finally {
           //倒记数锁释放一次
           countDownLatch.countDown();
       }
   }

/**遍历指定的目录并能指定深度
    * @param file 指定要遍历的目录
    * @param setDeep 设定遍历深度
    */
   @SneakyThrows
   private static void  GetAllFile(File file, int setDeep) {
       if(file != null){
           if(file.isDirectory() && deep<setDeep){
               deep++;
               File f[] = file.listFiles();
               if(f != null) {
                   int length = f.length;
                   for(int i = 0; i < length; i++)
                       GetAllFile(f[i],setDeep);
                   deep--;
               }
           } else {
               if(file.isDirectory()){
                   //如果为目录末尾添加 / 保证rsync正常处理
                   fileNameList.add(file.getAbsolutePath().concat("/"));
               }else {
                   fileNameList.add(file.getAbsolutePath());
               }
           }
       }
   }

}

doDir.sh


rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1
echo "创建目录结构操作"

doRsync.sh


rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1

doChecksum.sh


rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1

附录

rsync输出日志说明如下


YXcstpoguax  path/to/file
|||||||||||
||||||||||╰- x: The extended attribute information changed
|||||||||╰-- a: The ACL information changed
||||||||╰--- u: The u slot is reserved for future use
|||||||╰---- g: Group is different
||||||╰----- o: Owner is different
|||||╰------ p: Permission are different
||||╰------- t: Modification time is different
|||╰-------- s: Size is different
||╰--------- c: Different checksum (for regular files), or
||              changed value (for symlinks, devices, and special files)
|╰---------- the file type:
|            f: for a file,
|            d: for a directory,
|            L: for a symlink,
|            D: for a device,
|            S: for a special file (e.g. named sockets and fifos)
╰----------- the type of update being done::
            <: file is being transferred to the remote host (sent)
            >: file is being transferred to the local host (received)
            c: local change/creation for the item, such as:
               - the creation of a directory
               - the changing of a symlink,
               - etc.
            h: the item is a hard link to another item (requires
               --hard-links).
            .: the item is not being updated (though it might have
               attributes that are being modified)
            *: means that the rest of the itemized-output area contains
               a message (e.g. "deleting")

来源:https://blog.csdn.net/w4187402/article/details/103274618

标签:Java,Rsync
0
投稿

猜你喜欢

  • java数字转汉字工具类详解

    2023-04-28 02:00:26
  • 基于C#生成随机数示例

    2023-06-27 08:40:17
  • java异常(Exception)处理机制详解

    2023-06-06 08:21:48
  • Springboot 跨域配置无效及接口访问报错的解决方法

    2021-10-02 01:04:11
  • 获取Android系统唯一识别码的方法

    2022-08-09 22:20:45
  • C#byte数组与Image的相互转换实例代码

    2023-08-15 16:15:51
  • 利用Jetpack Compose实现绘制五角星效果

    2023-04-10 06:20:48
  • Spring Security自定义登录原理及实现详解

    2022-11-20 21:57:39
  • java中addMouseListener()方法的使用

    2021-07-07 19:29:35
  • 详解Spring 中 Bean 对象的存储和取出

    2023-08-06 02:42:46
  • C#中静态的深入理解

    2023-10-18 01:28:27
  • 给c#添加SetTimeout和SetInterval函数

    2021-07-02 00:58:08
  • Java语言实现简单FTP软件 FTP上传下载队列窗口实现(7)

    2021-07-25 04:01:58
  • 功能强大的Android滚动控件RecyclerView

    2022-11-30 13:45:27
  • C# ODP.NET 调用Oracle函数返回值时报错的一个解决方案

    2021-10-03 01:54:19
  • Groovy动态语言使用教程简介

    2022-04-28 15:05:54
  • Jackson的用法实例分析

    2021-11-13 09:13:40
  • JAVA JDK8 List分组的实现和用法

    2023-11-26 09:56:11
  • SpringBoot MongoDB 索引冲突分析及解决方法

    2023-01-23 06:28:05
  • Spring Boot示例分析讲解自动化装配机制核心注解

    2022-07-26 15:56:14
  • asp之家 软件编程 m.aspxhome.com