ROS1 rosbag的详细使用并且使用python合并bag包的方法

作者:锡城筱凯 时间:2021-04-30 11:43:54 

在使用ros的时候经常会用到rosbag来录制或者回放算法,是个非常有用的工具。

rosbag 命令列表

命令作用
record录制一个包,并且指定topic
info总结一个包的详细信息
play回放一个或者多个包,并且可以指定topic
check确定一个包是否可以在当前系统中播放,或者是否可以迁移
fix修复一个包使得它能在当前系统中播放
filter通过python脚本转换包内容
compress压缩包
decompress解压缩包
reindex重新索引一个或者多个损坏的包

record 命令使用

record <topic-names> # 指定一个或者多个topic来录制

$ rosbag record body_status gnss_imu

record -a, --all # 录制所有topic

$ rosbag record -a

record -e, --regex # 录制使用正则表达式匹配的topic

$ rosbag record -e "/(.*)_log/point"

rocord -p, -publish # (Melodic 新特性)当开始录制包时,发布一个topic=&rdquo;begin_write&ldquo;

$ rosbag record -p

record -x, --exclude # 录制时排除正则表达式匹配的topic

$ rosbag record -e "/(.*)_log/point" -x "(.*)/point"

record -q, --quiet # 录制时抑制指定topic的log输出

$ rosbag record -q /body_status

record -d, --duration # 指定topic录制最大时常

$ rosbag record -d 30 /body_status
$ rosbag record -d 3m /body_status
$ rosbag record -d 3h /body_status

record -o, --output-prefix # 输出包名前加上前缀

$ rosbag record -o output /topic_name /topic_name2

record -O, --output-name # 指定输出包名

$ rosbag record -O output.bag /topic_name /topic_name2

record --split # 每隔设定的时常或者大小分割包录制

$ rosbag record --split --size=1024 /topic_name
$ rosbag record --split --duration=30 /topic_name
$ rosbag record --split --duration=5m /topic_name
$ rosbag record --split --duration=2h /topic_name

record --max-splits # (Kinetic 新特性)分割包的最大个数

$ rosbag record --split --max-splits 3 --duration=30 /topic_name

record -b, --buffsize # 使用内部缓冲区(默认值:256MB,无限为0)

$ rosbag record -b 1024 /topic_name

record --chunksize # 开辟缓冲区,比buffsize更为先进(默认值: 768KB,无限为0)

$ rosbag record --chunksize=1024 /topic_name

record -l, --limit # 在topic中只录制指定数量的消息

$ rosbag record -l 1000 /topic_name

record --node # 记录指定节点订阅的所有消息

$ rosbag record --node=/node

record -j, --bz2 # 设置录制以bz2格式压缩

$ rosbag record -j /topic_name

record --lz4 # 设置录制以lz4格式压缩

$ rosbag record --lz4 /topic_name
  • record --tcpnodelay # 订阅主题时使用TCP_NODELAY传输 参考资料

  • record --udp # 订阅主题时使用UDP传输

info 指令使用

info <bag-files> # 查看一个或者多个包的详细信息

$ rosbag info test.bag

info -y, --yaml # 以yaml格式输出一个或者多个包的详细信息

$ rosbag info -y test.bag

info -k, --key # 查看以yaml格式输出,包内key值对应的数据

$ rosbag info -y -k topics test.bag

play 指令使用

play <bag_name> # 播放一个或者多个数据包

$ rosbag play test.bag

play -p, --prefix # 为所有输出主题添加前缀

$ rosbag play -p test test.bag

play -q, --quiet # 抑制控制台输出

$ rosbag play -q test.bag

play -i, --immediate # 播放包不等待

$ rosbag play -i test.bag

play --pause # 开始播放包时暂停

$ rosbag play --pause test.bag

play --queue # 播放包时以设置的队列大小播放 默认值为100

$ rosbag play --queue=1000 test.bag

play --clock # 自动发布一个clocktopic

$ rosbag play --clock test.bag

play --hz # 以指定赫兹发布clock topic

$ rosbag play --hz=1 --clock test.bag

play -d, --delay # 指定延迟播放的时间

$ rosbag -d 5 test.bag

play -r, --rate # 指定播放速度

$ rosbag -r 10 test.bag

play -s, --start # 指定开始播放时间

$ rosbag play -s 5 test.bag

play -u,--duration # 设定播放时间

$ rosbag play -u 240 test.bag

play --skip-empty # 设置跳过没有消息的数据的时间

$ rosbag play --skip-empty=1 test.bag

play -l, --loop # 循环播放

$ rosbag play -l test.bag

play -k, --keep-alive # 播放时保持活跃状态

$ rosbag play -k test.bag

play --try-future-version # 即使不知道版本号也可以打开bag

$ rosbag play --try-future-version test.bag

play --topics # 指定对应topics进行播放

$ rosbag play test.bag --topics /topic1 /topic2

play --pause-topics # 当回放数据时,暂停指定topics

$ rosbag play test.bag --topics /topic1 /topic2 --pause-topics

play --bags # 指定多个bags进行回放

$ rosbag play --bags=test.bag

play --wait-for-subscribers # 再发布之前,等待每个主题至少有一个订阅者play --rate-control-topic= # 观看给定的主题,如果上一次发布时间超过了<速率控制最大延迟>,请等待该主题再次发布后再继续播放play --rate-control-max-delay= # 暂停前与<速率控制主题>的最大时间差 check 指令使用 check <file> # 确定一个包在当前系统中是否可以播放。

$ rosbag check test.bag

check -g, --genrules # 生成一个名为RULEFILE的迁移规则文件

$ rosbag check -g diagnostics.bmr test.bag

check -a, --append # 加载后附加到现有数据迁移规则文件的末尾。

$ rosbag check -a -g diagnostics.bmr test.bag

check -n, --noplugins # 不加载规则文件

$ rosbag check -n test.bag

fix 指令使用

fix <in-bag> <out-bag> [rules.bmr] # 使用规则文件修复数据包

$ rosbag fix old.bag repaired.bag myrules.bmr

fix -n, --noplugins # 修复数据包不带规则文件

$ rosbag fix -n old.bag repaired.bag

filter 指令使用

filter <in-bag> <out-bag> <expression> # 使用给定的Python表达式转换数据文件。

$ rosbag filter my.bag only-tf.bag "topic == '/tf'"

compress 指令使用

compress <bag-files> # 使用BZ2格式压缩包

$ rosbag compress *.bag

compress --output-dir=DIR # 指定路径输出文件

$ rosbag compress *.bag --output-dir=compressed

compress -f, --force # 如果已经存在包,则强制覆盖

$ rosbag compress -f *.bag

compress -q, --quiet # 抑制非关键信息

$ rosbag compress -q *.bag

compress -j, --bz2 # 使用bz2格式压缩包

$ rosbag compress -j *.bag

compress --lz4 # 使用lz4格式压缩包

$ rosbag compress --lz4 *.bag

decompress 指令使用 decompress <bag-files> # 解压缩包

$ rosbag decompress *.bag

decompress --output-dir=DIR # 指定解压缩的路径

$ rosbag decompress --output-dir=uncompressed *.bag

decompress -f, --force # 如果已经存在包,则强制覆盖

$ rosbag decompress -f *.bag

decompress -q, --quiet # 抑制非关键信息

$ rosbag decompress -q *.bag

reindex 指令使用 reindex <bag-files> # 修复坏包

$ rosbag reindex *.bag

reindex --output-dir=DIR # 指定路径

$ rosbag reindex --output-dir=reindexed *.bag

reindex -f, --force # 如果已经存在包,则强制覆盖

$ rosbag reindex -f *.bag

reindex -q, --quiet # 抑制非关键信息

$ rosbag reindex -q *.bag

2. 使用Python合并包代码

#! /usr/bin/env python3
import os
import time
import argparse
from rosbag import Bag, Compression
def parse_compression(compression):
   if compression == "none" or compression == "NONE":
       compression = Compression.NONE
   elif compression == "bz2":
       compression = Compression.BZ2
   elif compression == "lz4":
       compression = Compression.LZ4
   return compression
def get_files_list(arg_input, arg_select):
   files = None
   if " " in arg_input:
       files = arg_input.split(" ")
   elif "," in arg_input:
       files = arg_input.split(",")
   elif os.path.exists(args.input) or "*" in args.input:
       rfind_index = arg_input.rfind("/")
       dir_path = arg_input[:rfind_index]
       files = os.listdir(dir_path)
       files.sort() # 名称排序
       files = [dir_path + "/" + file for file in files]
   if arg_select:
       files = select_bags(files)
   print("bags list:")
   for i in range(len(files)):
       print(str(i) + "." + files[i])
   return files
def select_bags(files):
   for i in range(len(files)):
       print(str(i) + "." + files[i])
   print("Please input bag numbers to merge. split by , or space", end=":")
   s_b = input()
   s_b_list = []
   if "," in s_b:
       s_b_list = s_b.split(",")
   elif " " in s_b:
       s_b_list = s_b.split(" ")
   files = [files[int(x)] for x in s_b_list]
   return files
def show_process_bar(total, i, start):
   a = "*" * i
   b = "." * (total - i)
   c = (i / total) * 100
   dur = time.perf_counter() - start
   print("\r{:^3.0f}%[{}->{}]{:.2f}s".format(c,a,b,dur), end="")
def merge_bags(args):
   print("start merge bags.")
   compression = parse_compression(args.compression)
   print(f"bag's compression mode is {compression}.")
   files = get_files_list(args.input, args.select)
   with Bag(args.output, "w",  compression=compression) as o:
       start = time.perf_counter()
       for i in range(len(files)):
           show_process_bar(len(files), i+1, start)
           with Bag(files[i], "r") as ib:
               for topic, msg, t in ib:
                   o.write(topic, msg, t)
           show_process_bar(len(files), i+1, start)
if __name__ == "__main__":
   parser = argparse.ArgumentParser(description="Merge one or more bag files to one file.")
   parser.add_argument("-o", "--output", help="Output bag's file path.",
                       default="output.bag", required=True)
   parser.add_argument("-i", "--input", help="Input files bags name or path, split by , .",
                       required=True)
   parser.add_argument("-c", "--compression", help="Compress the bag by bz2 or lz4",
                       default="lz4", choices=["none", "lz4", "bz2"])
   parser.add_argument("-s", "--select", help="Select bags to merge.",
                       default=False)
   parser.add_argument("-v", "--verbose", help="Show the verbose msg.")
   args = parser.parse_args()
   merge_bags(args)

Reference:

http://wiki.ros.org/rosbag/Commandline

来源:https://blog.csdn.net/xiaokai1999/article/details/130504577

标签:ROS1,rosbag,python,bag包
0
投稿

猜你喜欢

  • JAVA正则表达式 Pattern和Matcher

    2023-07-01 19:13:53
  • python 限制函数调用次数的实例讲解

    2023-11-11 00:34:23
  • 文字的减法

    2007-11-06 12:58:00
  • python开发飞机大战游戏

    2022-10-08 05:15:37
  • 数据库查询的分页优化技巧

    2009-05-17 10:31:00
  • 微信跳一跳自动运行python脚本

    2023-11-22 01:42:29
  • python tkinter实现弹窗的输入输出

    2021-10-03 14:58:42
  • python3 pillow生成简单验证码图片的示例

    2022-08-16 23:17:05
  • 分析Python中解析构建数据知识

    2022-01-12 10:23:48
  • Python注释、分支结构、循环结构、伪“选择结构”用法实例分析

    2021-01-15 14:45:25
  • JsonServer安装及启动过程图解

    2023-08-12 20:06:02
  • LangChain简化ChatGPT工程复杂度使用详解

    2022-10-21 22:25:34
  • 利用Tkinter(python3.6)实现一个简单计算器

    2022-07-16 10:42:30
  • Python绘制K线图之可视化神器pyecharts的使用

    2023-06-28 12:30:58
  • tensorflow 实现从checkpoint中获取graph信息

    2023-01-05 09:38:41
  • Python如何将函数值赋给变量

    2022-01-01 22:58:22
  • Python中循环后使用list.append()数据被覆盖问题的解决

    2023-02-08 20:45:49
  • 如何基于Django实现上下文章跳转

    2022-02-11 02:38:05
  • Python3使用requests登录人人影视网站的方法

    2021-10-04 15:42:19
  • 手把手教你将Flask应用封装成Docker服务的实现

    2023-05-27 06:57:15
  • asp之家 网络编程 m.aspxhome.com