如何将bag文件批量转成mp4

作者:点PY 时间:2021-06-10 11:04:54 

简介

这篇博客涉及的脚本用来将bag文件批量转化为mp4文件

dockerfile

FROM osrf/ros:kinetic-desktop-full
RUN sudo apt update && \
   sudo apt install -y ffmpeg

Build docker image

docker build -t ros/kinetic:latest .

Build docker container

这里链接的数据卷,要换成你自己的本地路径。容器路径也可以自定义。

docker run -it -v /C/yp/project/realsense2video:/test ros/kinetic:latest

Run script

cd ${your project root}
python main.py --data_root /test/data --out_root /test/save --fps 25
param data_root:包含bag的文件夹
param out_root:输出目录
param fps:每秒的帧数

Source code

bag2video.py

# -*- coding: utf-8 -*-

#!/usr/bin/env python2

import roslib
#roslib.load_manifest('rosbag')
import rospy
import rosbag
import sys, getopt
import os
from sensor_msgs.msg import CompressedImage  #压缩图片
from sensor_msgs.msg import Image
import cv2

import numpy as np

import shlex, subprocess  #读取命令行参数
#subprocess 是一个 python 标准类库,用于创建进程运行系统命令,并且可以连接进程的输入输出和
#错误管道,获取它们的返回,使用起来要优于 os.system,在这里我们使用这个库运行 hive 语句并获取返回结果。

#shlex 是一个 python 标准类库,使用这个类我们可以轻松的做出对 linux shell 的词法分析,在
#这里我们将格式化好的 hive 连接语句用 shlex 切分,配合 subprocess.run 使用。
MJPEG_VIDEO = 1
RAWIMAGE_VIDEO = 2
VIDEO_CONVERTER_TO_USE = "ffmpeg" # or you may want to use "avconv" #视频转换器

def print_help():
   print('rosbag2video.py [--fps 25] [--rate 1] [-o outputfile] [-v] [-s] [-t topic] bagfile1 [bagfile2] ...')
   print()
   print('Converts image sequence(s) in ros bag file(s) to video file(s) with fixed frame rate using',VIDEO_CONVERTER_TO_USE)
   print(VIDEO_CONVERTER_TO_USE,'needs to be installed!')
   print()
   print('--fps   Sets FPS value that is passed to',VIDEO_CONVERTER_TO_USE)
   print('        Default is 25.')
   print('-h      Displays this help.')
   print('--ofile (-o) sets output file name.')
   print('        If no output file name (-o) is given the filename \'<prefix><topic>.mp4\' is used and default output codec is h264.')
   print('        Multiple image topics are supported only when -o option is _not_ used.')
   print('        ',VIDEO_CONVERTER_TO_USE,' will guess the format according to given extension.')
   print('        Compressed and raw image messages are supported with mono8 and bgr8/rgb8/bggr8/rggb8 formats.')
   print('--rate  (-r) You may slow down or speed up the video.')
   print('        Default is 1.0, that keeps the original speed.')
   print('-s      Shows each and every image extracted from the rosbag file (cv_bride is needed).')
   print('--topic (-t) Only the images from topic "topic" are used for the video output.')
   print('-v      Verbose messages are displayed.')
   print('--prefix (-p) set a output file name prefix othervise \'bagfile1\' is used (if -o is not set).')
   print('--start Optional start time in seconds.')
   print('--end   Optional end time in seconds.')
class RosVideoWriter():
   def __init__(self, fps=25.0, rate=1.0, topic="", output_filename ="", display= False, verbose = False, start = rospy.Time(0), end = rospy.Time(sys.maxsize)):
       self.opt_topic = topic
       self.opt_out_file = output_filename
       self.opt_verbose = verbose
       self.opt_display_images = display
       self.opt_start = start
       self.opt_end = end
       self.rate = rate
       self.fps = fps
       self.opt_prefix= None
       self.t_first={}
       self.t_file={}
       self.t_video={}
       self.p_avconv = {}

#语法分析Args
   def parseArgs(self, args):
       opts, opt_files = getopt.getopt(args,"hsvr:o:t:p:",["fps=","rate=","ofile=","topic=","start=","end=","prefix="])
       #getopt()
       for opt, arg in opts:
           if opt == '-h':
               print_help()
               sys.exit(0)
           elif opt == '-s':
               self.opt_display_images = True
           elif opt == '-v':
               self.opt_verbose = True
           elif opt in ("--fps"):
               self.fps = float(arg)
           elif opt in ("-r", "--rate"):
               self.rate = float(arg)
           elif opt in ("-o", "--ofile"):
               self.opt_out_file = arg
           elif opt in ("-t", "--topic"):
               self.opt_topic = arg
           elif opt in ("-p", "--prefix"):
               self.opt_prefix = arg
           elif opt in ("--start"):
               self.opt_start = rospy.Time(int(arg))
               if(self.opt_verbose):
                   print("starting at",self.opt_start.to_sec())
           elif opt in ("--end"):
               self.opt_end = rospy.Time(int(arg))
               if(self.opt_verbose):
                   print("ending at",self.opt_end.to_sec())
           else:
               print("opt:", opt,'arg:', arg)

if (self.fps<=0):
           print("invalid fps", self.fps)
           self.fps = 1

if (self.rate<=0):
           print("invalid rate", self.rate)
           self.rate = 1

if(self.opt_verbose):
           print("using ",self.fps," FPS")
       return opt_files
   # filter messages using type or only the opic we whant from the 'topic' argument
   def filter_image_msgs(self, topic, datatype, md5sum, msg_def, header):
       if(datatype=="sensor_msgs/CompressedImage"):
           if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "":
               print("############# COMPRESSED IMAGE  ######################")
               print(topic,' with datatype:', str(datatype))
               print()
               return True;

if(datatype=="theora_image_transport/Packet"):
           if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "":
               print(topic,' with datatype:', str(datatype))
               print('!!! theora is not supported, sorry !!!')
               return False;

if(datatype=="sensor_msgs/Image"):
           if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "":
               print("############# UNCOMPRESSED IMAGE ######################")
               print(topic,' with datatype:', str(datatype))
               print()
               return True;

return False;
   def write_output_video(self, msg, topic, t, video_fmt, pix_fmt = ""):
       # no data in this topic
       if len(msg.data) == 0 :
           return
       # initiate data for this topic
       if not topic in self.t_first :
           self.t_first[topic] = t # timestamp of first image for this topic
           self.t_video[topic] = 0
           self.t_file[topic] = 0
       # if multiple streams of images will start at different times the resulting video files will not be in sync
       # current offset time we are in the bag file
       self.t_file[topic] = (t-self.t_first[topic]).to_sec()
       # fill video file up with images until we reache the current offset from the beginning of the bag file
       while self.t_video[topic] < self.t_file[topic]/self.rate :
           if not topic in self.p_avconv:
               # we have to start a new process for this topic
               if self.opt_verbose :
                   print("Initializing pipe for topic", topic, "at time", t.to_sec())
               if self.opt_out_file=="":
                   out_file = self.opt_prefix + str(topic).replace("/", "_")+".mp4"
               else:
                   out_file = self.opt_out_file

if self.opt_verbose :
                   print("Using output file ", out_file, " for topic ", topic, ".")

if video_fmt == MJPEG_VIDEO :
                   cmd = [VIDEO_CONVERTER_TO_USE, '-v', '1', '-stats', '-r',str(self.fps),'-c','mjpeg','-f','mjpeg','-i','-','-an',out_file]
                   self.p_avconv[topic] = subprocess.Popen(cmd, stdin=subprocess.PIPE)
                   if self.opt_verbose :
                       print("Using command line:")
                       print(cmd)
               elif video_fmt == RAWIMAGE_VIDEO :
                   size = str(msg.width)+"x"+str(msg.height)
                   cmd = [VIDEO_CONVERTER_TO_USE, '-v', '1', '-stats','-r',str(self.fps),'-f','rawvideo','-s',size,'-pix_fmt', pix_fmt,'-i','-','-an',out_file]
                   self.p_avconv[topic] = subprocess.Popen(cmd, stdin=subprocess.PIPE)
                   if self.opt_verbose :
                       print("Using command line:")
                       print(cmd)

else :
                   print("Script error, unknown value for argument video_fmt in function write_output_video.")
                   exit(1)
           # send data to ffmpeg process pipe
           self.p_avconv[topic].stdin.write(msg.data)
           # next frame time
           self.t_video[topic] += 1.0/self.fps

def addBag(self, filename):
       if self.opt_display_images:
           from cv_bridge import CvBridge, CvBridgeError
           bridge = CvBridge()
           cv_image = []

if self.opt_verbose :
           print("Bagfile: {}".format(filename))

if not self.opt_prefix:
           # create the output in the same folder and name as the bag file minu '.bag'
           self.opt_prefix = bagfile[:-4]

#Go through the bag file
       bag = rosbag.Bag(filename)
       if self.opt_verbose :
           print("Bag opened.")
       # loop over all topics
       for topic, msg, t in bag.read_messages(connection_filter=self.filter_image_msgs, start_time=self.opt_start, end_time=self.opt_end):
           try:
               if msg.format.find("jpeg")!=-1 :
                   if msg.format.find("8")!=-1 and (msg.format.find("rgb")!=-1 or msg.format.find("bgr")!=-1 or msg.format.find("bgra")!=-1 ):
                       if self.opt_display_images:
                           np_arr = np.fromstring(msg.data, np.uint8)
                           cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
                       self.write_output_video( msg, topic, t, MJPEG_VIDEO )
                   elif msg.format.find("mono8")!=-1 :
                       if self.opt_display_images:
                           np_arr = np.fromstring(msg.data, np.uint8)
                           cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
                       self.write_output_video( msg, topic, t, MJPEG_VIDEO )
                   elif msg.format.find("16UC1")!=-1 :
                       if self.opt_display_images:
                           np_arr = np.fromstring(msg.data, np.uint16)
                           cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
                       self.write_output_video( msg, topic, t, MJPEG_VIDEO )
                   else:
                       print('unsupported jpeg format:', msg.format, '.', topic)

# has no attribute 'format'
           except AttributeError:
               try:
                       pix_fmt=None
                       if msg.encoding.find("mono8")!=-1 or msg.encoding.find("8UC1")!=-1:
                           pix_fmt = "gray"
                           if self.opt_display_images:
                               cv_image = bridge.imgmsg_to_cv2(msg, "bgr8")

elif msg.encoding.find("bgra")!=-1 :
                           pix_fmt = "bgra"
                           if self.opt_display_images:
                               cv_image = bridge.imgmsg_to_cv2(msg, "bgr8")

elif msg.encoding.find("bgr8")!=-1 :
                           pix_fmt = "bgr24"
                           if self.opt_display_images:
                               cv_image = bridge.imgmsg_to_cv2(msg, "bgr8")
                       elif msg.encoding.find("bggr8")!=-1 :
                           pix_fmt = "bayer_bggr8"
                           if self.opt_display_images:
                               cv_image = bridge.imgmsg_to_cv2(msg, "bayer_bggr8")
                       elif msg.encoding.find("rggb8")!=-1 :
                           pix_fmt = "bayer_rggb8"
                           if self.opt_display_images:
                               cv_image = bridge.imgmsg_to_cv2(msg, "bayer_rggb8")
                       elif msg.encoding.find("rgb8")!=-1 :
                           pix_fmt = "rgb24"
                           if self.opt_display_images:
                               cv_image = bridge.imgmsg_to_cv2(msg, "bgr8")
                       elif msg.encoding.find("16UC1")!=-1 :
                           pix_fmt = "gray16le"
                       else:
                           print('unsupported encoding:', msg.encoding, topic)
                           #exit(1)
                       if pix_fmt:
                           self.write_output_video( msg, topic, t, RAWIMAGE_VIDEO, pix_fmt )

except AttributeError:
                   # maybe theora packet
                   # theora not supported
                   if self.opt_verbose :
                       print("Could not handle this format. Maybe thoera packet? theora is not supported.")
                   pass
           if self.opt_display_images:
               cv2.imshow(topic, cv_image)
               key=cv2.waitKey(1)
               if key==1048603:
                   exit(1)
       if self.p_avconv == {}:
           print("No image topics found in bag:", filename)
       bag.close()
if __name__ == '__main__':
   #print()
   #print('rosbag2video, by Maximilian Laiacker 2020 and Abel Gabor 2019')
   #print()

if len(sys.argv) < 2:
       print('Please specify ros bag file(s)!')
       print_help()
       sys.exit(1)
   else :
       videowriter = RosVideoWriter()
       try:
           opt_files = videowriter.parseArgs(sys.argv[1:])
       except getopt.GetoptError:
           print_help()
           sys.exit(2)
   # loop over all files
   for files in range(0,len(opt_files)):
       #First arg is the bag to look at
       bagfile = opt_files[files]
       videowriter.addBag(bagfile)
   print("finished")

main.py

import glob
import sys
import glob
import os
import argparse

def mkdir(path):
   if not os.path.exists(path):
       os.mkdir(path)
if __name__ == "__main__":
   parser = argparse.ArgumentParser(
       description='''This is a code for bag2video.''')
   parser.add_argument('--data_root', type=str, default=r'/test/data', help='path to the root of data')
   parser.add_argument('--out_root', type=str, default=r'/test/save', help='path to the root of data')
   parser.add_argument('--fps', type=int, default=25, help='path to the root of data')
   args = parser.parse_args()

mkdir(args.out_root)
   bagList = glob.glob(os.path.join(args.data_root, "*.bag"))
   for bagPath in bagList:
       baseName = os.path.basename(bagPath).split(".")[0]
       outPath = os.path.join(args.out_root, baseName + ".mp4")
       os.system("python bag2video.py -o {outName} --fps {fps} {bagPath}".format(outName=outPath, fps=args.fps, bagPath=bagPath))

来源:https://blog.csdn.net/weixin_42990464/article/details/127257801

标签:bag,mp4
0
投稿

猜你喜欢

  • PHP入门教程之会话控制技巧(cookie与session)

    2023-11-16 00:13:39
  • Vue3 ref构建响应式变量失效问题及解决

    2023-07-02 16:33:49
  • python中sort和sorted排序的实例方法

    2022-12-22 09:10:12
  • 教你轻松了解MySQL数据库中的结果字符串

    2009-02-23 17:29:00
  • mssql中获取指定日期所在月份的第一天的代码

    2011-09-30 11:23:57
  • 基于python3 类的属性、方法、封装、继承实例讲解

    2022-12-09 17:54:02
  • Django模板标签中url使用详解(url跳转到指定页面)

    2023-12-18 00:09:54
  • python爬虫神器Pyppeteer入门及使用

    2022-07-05 08:09:16
  • keras获得model中某一层的某一个Tensor的输出维度教程

    2023-12-15 11:28:54
  • 浅谈python中的面向对象和类的基本语法

    2023-06-27 11:36:45
  • 运行asp.net时出现 http错误404-文件或目录未找到

    2023-07-24 01:53:36
  • python实现动态数组的示例代码

    2023-10-22 16:02:07
  • python try 异常处理(史上最全)

    2021-11-19 21:03:04
  • 浅谈pandas中Dataframe的查询方法([], loc, iloc, at, iat, ix)

    2023-03-09 19:28:59
  • python argparse的使用步骤(全网最全)

    2023-06-14 01:13:45
  • python代码实现学生信息管理系统

    2023-02-06 18:31:34
  • Python中POST调用Restful接口示例

    2021-03-14 19:42:19
  • python 还原梯度下降算法实现一维线性回归

    2023-10-09 21:53:42
  • JavaScript之解构赋值的理解

    2024-04-10 10:44:07
  • uniqueidentifier转换成varchar数据类型的sql语句

    2011-09-30 11:17:48
  • asp之家 网络编程 m.aspxhome.com