如何将bag文件批量转成mp4

这篇文章主要介绍了将bag文件批量转成mp4,这篇博客涉及的脚本用来将bag文件批量转化为mp4文件,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

简介

这篇博客涉及的脚本用来将bag文件批量转化为mp4文件

dockerfile

FROM osrf/ros:kinetic-desktop-full RUN sudo apt update && \ sudo apt install -y ffmpeg

Build docker image

docker build -t ros/kinetic:latest .

Build docker container

这里链接的数据卷,要换成你自己的本地路径。容器路径也可以自定义。

docker run -it -v /C/yp/project/realsense2video:/test ros/kinetic:latest

Run script

cd ${your project root} python main.py --data_root /test/data --out_root /test/save --fps 25
param data_root:包含bag的文件夹 param out_root:输出目录 param fps:每秒的帧数

Source code

bag2video.py

# -*- coding: utf-8 -*- #!/usr/bin/env python2 import roslib #roslib.load_manifest('rosbag') import rospy import rosbag import sys, getopt import os from sensor_msgs.msg import CompressedImage  #压缩图片 from sensor_msgs.msg import Image import cv2 import numpy as np import shlex, subprocess  #读取命令行参数 #subprocess 是一个 python 标准类库,用于创建进程运行系统命令,并且可以连接进程的输入输出和 #错误管道,获取它们的返回,使用起来要优于 os.system,在这里我们使用这个库运行 hive 语句并获取返回结果。 #shlex 是一个 python 标准类库,使用这个类我们可以轻松的做出对 linux shell 的词法分析,在 #这里我们将格式化好的 hive 连接语句用 shlex 切分,配合 subprocess.run 使用。 MJPEG_VIDEO = 1 RAWIMAGE_VIDEO = 2 VIDEO_CONVERTER_TO_USE = "ffmpeg" # or you may want to use "avconv" #视频转换器 def print_help(): print('rosbag2video.py [--fps 25] [--rate 1] [-o outputfile] [-v] [-s] [-t topic] bagfile1 [bagfile2] ...') print() print('Converts image sequence(s) in ros bag file(s) to video file(s) with fixed frame rate using',VIDEO_CONVERTER_TO_USE) print(VIDEO_CONVERTER_TO_USE,'needs to be installed!') print() print('--fps   Sets FPS value that is passed to',VIDEO_CONVERTER_TO_USE) print('        Default is 25.') print('-h      Displays this help.') print('--ofile (-o) sets output file name.') print('        If no output file name (-o) is given the filename \'.mp4\' is used and default output codec is h264.') print('        Multiple image topics are supported only when -o option is _not_ used.') print('        ',VIDEO_CONVERTER_TO_USE,' will guess the format according to given extension.') print('        Compressed and raw image messages are supported with mono8 and bgr8/rgb8/bggr8/rggb8 formats.') print('--rate  (-r) You may slow down or speed up the video.') print('        Default is 1.0, that keeps the original speed.') print('-s      Shows each and every image extracted from the rosbag file (cv_bride is needed).') print('--topic (-t) Only the images from topic "topic" are used for the video output.') print('-v      Verbose messages are displayed.') print('--prefix (-p) set a output file name prefix othervise \'bagfile1\' is used (if -o is not set).') print('--start Optional start time in seconds.') print('--end   Optional end time in seconds.') class RosVideoWriter(): def __init__(self, fps=25.0, rate=1.0, topic="", output_filename ="", display= False, verbose = False, start = rospy.Time(0), end = rospy.Time(sys.maxsize)): self.opt_topic = topic self.opt_out_file = output_filename self.opt_verbose = verbose self.opt_display_images = display self.opt_start = start self.opt_end = end self.rate = rate self.fps = fps self.opt_prefix= None self.t_first={} self.t_file={} self.t_video={} self.p_avconv = {} #语法分析Args def parseArgs(self, args): opts, opt_files = getopt.getopt(args,"hsvr:o:t:p:",["fps=","rate=","ofile=","topic=","start=","end=","prefix="]) #getopt() for opt, arg in opts: if opt == '-h': print_help() sys.exit(0) elif opt == '-s': self.opt_display_images = True elif opt == '-v': self.opt_verbose = True elif opt in ("--fps"): self.fps = float(arg) elif opt in ("-r", "--rate"): self.rate = float(arg) elif opt in ("-o", "--ofile"): self.opt_out_file = arg elif opt in ("-t", "--topic"): self.opt_topic = arg elif opt in ("-p", "--prefix"): self.opt_prefix = arg elif opt in ("--start"): self.opt_start = rospy.Time(int(arg)) if(self.opt_verbose): print("starting at",self.opt_start.to_sec()) elif opt in ("--end"): self.opt_end = rospy.Time(int(arg)) if(self.opt_verbose): print("ending at",self.opt_end.to_sec()) else: print("opt:", opt,'arg:', arg) if (self.fps<=0): print("invalid fps", self.fps) self.fps = 1 if (self.rate<=0): print("invalid rate", self.rate) self.rate = 1 if(self.opt_verbose): print("using ",self.fps," FPS") return opt_files # filter messages using type or only the opic we whant from the 'topic' argument def filter_image_msgs(self, topic, datatype, md5sum, msg_def, header): if(datatype=="sensor_msgs/CompressedImage"): if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "": print("############# COMPRESSED IMAGE  ######################") print(topic,' with datatype:', str(datatype)) print() return True; if(datatype=="theora_image_transport/Packet"): if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "": print(topic,' with datatype:', str(datatype)) print('!!! theora is not supported, sorry !!!') return False; if(datatype=="sensor_msgs/Image"): if (self.opt_topic != "" and self.opt_topic == topic) or self.opt_topic == "": print("############# UNCOMPRESSED IMAGE ######################") print(topic,' with datatype:', str(datatype)) print() return True; return False; def write_output_video(self, msg, topic, t, video_fmt, pix_fmt = ""): # no data in this topic if len(msg.data) == 0 : return # initiate data for this topic if not topic in self.t_first : self.t_first[topic] = t # timestamp of first image for this topic self.t_video[topic] = 0 self.t_file[topic] = 0 # if multiple streams of images will start at different times the resulting video files will not be in sync # current offset time we are in the bag file self.t_file[topic] = (t-self.t_first[topic]).to_sec() # fill video file up with images until we reache the current offset from the beginning of the bag file while self.t_video[topic] 

main.py

import glob import sys import glob import os import argparse def mkdir(path): if not os.path.exists(path): os.mkdir(path) if __name__ == "__main__": parser = argparse.ArgumentParser( description='''This is a code for bag2video.''') parser.add_argument('--data_root', type=str, default=r'/test/data', help='path to the root of data') parser.add_argument('--out_root', type=str, default=r'/test/save', help='path to the root of data') parser.add_argument('--fps', type=int, default=25, help='path to the root of data') args = parser.parse_args() mkdir(args.out_root) bagList = glob.glob(os.path.join(args.data_root, "*.bag")) for bagPath in bagList: baseName = os.path.basename(bagPath).split(".")[0] outPath = os.path.join(args.out_root, baseName + ".mp4") os.system("python bag2video.py -o {outName} --fps {fps} {bagPath}".format(outName=outPath, fps=args.fps, bagPath=bagPath))

到此这篇关于如何将bag文件批量转成mp4的文章就介绍到这了,更多相关bag文件批量转成mp4内容请搜索0133技术站以前的文章或继续浏览下面的相关文章希望大家以后多多支持0133技术站!

以上就是如何将bag文件批量转成mp4的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » python