TensorRT-Demo/utils/camera.py

274 lines
10 KiB
Python
Raw Permalink Normal View History

2023-03-06 20:44:29 +08:00
"""camera.py
This code implements the Camera class, which encapsulates code to
handle IP CAM, USB webcam or the Jetson onboard camera. In
addition, this Camera class is further extended to take a video
file or an image file as input.
"""
import logging
import threading
import subprocess
import numpy as np
import cv2
# The following flag ise used to control whether to use a GStreamer
# pipeline to open USB webcam source. If set to False, we just open
# the webcam using cv2.VideoCapture(index) machinery. i.e. relying
# on cv2's built-in function to capture images from the webcam.
USB_GSTREAMER = True
def add_camera_args(parser):
"""Add parser augument for camera options."""
parser.add_argument('--image', type=str, default=None,
help='image file name, e.g. dog.jpg')
parser.add_argument('--video', type=str, default=None,
help='video file name, e.g. traffic.mp4')
parser.add_argument('--video_looping', action='store_true',
help='loop around the video file [False]')
parser.add_argument('--rtsp', type=str, default=None,
help=('RTSP H.264 stream, e.g. '
'rtsp://admin:123456@192.168.1.64:554'))
parser.add_argument('--rtsp_latency', type=int, default=200,
help='RTSP latency in ms [200]')
parser.add_argument('--usb', type=int, default=None,
help='USB webcam device id (/dev/video?) [None]')
parser.add_argument('--gstr', type=str, default=None,
help='GStreamer string [None]')
parser.add_argument('--onboard', type=int, default=None,
help='Jetson onboard camera [None]')
parser.add_argument('--copy_frame', action='store_true',
help=('copy video frame internally [False]'))
parser.add_argument('--do_resize', action='store_true',
help=('resize image/video [False]'))
parser.add_argument('--width', type=int, default=640,
help='image width [640]')
parser.add_argument('--height', type=int, default=480,
help='image height [480]')
return parser
def open_cam_rtsp(uri, width, height, latency):
"""Open an RTSP URI (IP CAM)."""
gst_elements = str(subprocess.check_output('gst-inspect-1.0'))
if 'omxh264dec' in gst_elements:
# Use hardware H.264 decoder on Jetson platforms
gst_str = ('rtspsrc location={} latency={} ! '
'rtph264depay ! h264parse ! omxh264dec ! '
'nvvidconv ! '
'video/x-raw, width=(int){}, height=(int){}, '
'format=(string)BGRx ! videoconvert ! '
'appsink').format(uri, latency, width, height)
elif 'avdec_h264' in gst_elements:
# Otherwise try to use the software decoder 'avdec_h264'
# NOTE: in case resizing images is necessary, try adding
# a 'videoscale' into the pipeline
gst_str = ('rtspsrc location={} latency={} ! '
'rtph264depay ! h264parse ! avdec_h264 ! '
'videoconvert ! appsink').format(uri, latency)
else:
raise RuntimeError('H.264 decoder not found!')
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_cam_usb(dev, width, height):
"""Open a USB webcam."""
if USB_GSTREAMER:
gst_str = ('v4l2src device=/dev/video{} ! '
'video/x-raw, width=(int){}, height=(int){} ! '
'videoconvert ! appsink').format(dev, width, height)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
else:
return cv2.VideoCapture(dev)
def open_cam_gstr(gstr, width, height):
"""Open camera using a GStreamer string.
Example:
gstr = 'v4l2src device=/dev/video0 ! video/x-raw, width=(int){width}, height=(int){height} ! videoconvert ! appsink'
"""
gst_str = gstr.format(width=width, height=height)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_cam_onboard(width, height):
"""Open the Jetson onboard camera."""
gst_elements = str(subprocess.check_output('gst-inspect-1.0'))
if 'nvcamerasrc' in gst_elements:
# On versions of L4T prior to 28.1, you might need to add
# 'flip-method=2' into gst_str below.
gst_str = ('nvcamerasrc ! '
'video/x-raw(memory:NVMM), '
'width=(int)2592, height=(int)1458, '
'format=(string)I420, framerate=(fraction)30/1 ! '
'nvvidconv ! '
'video/x-raw, width=(int){}, height=(int){}, '
'format=(string)BGRx ! '
'videoconvert ! appsink').format(width, height)
elif 'nvarguscamerasrc' in gst_elements:
gst_str = ('nvarguscamerasrc ! '
'video/x-raw(memory:NVMM), '
'width=(int)1920, height=(int)1080, '
'format=(string)NV12, framerate=(fraction)30/1 ! '
'nvvidconv flip-method=2 ! '
'video/x-raw, width=(int){}, height=(int){}, '
'format=(string)BGRx ! '
'videoconvert ! appsink').format(width, height)
else:
raise RuntimeError('onboard camera source not found!')
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def grab_img(cam):
"""This 'grab_img' function is designed to be run in the sub-thread.
Once started, this thread continues to grab a new image and put it
into the global 'img_handle', until 'thread_running' is set to False.
"""
while cam.thread_running:
_, cam.img_handle = cam.cap.read()
if cam.img_handle is None:
#logging.warning('Camera: cap.read() returns None...')
break
cam.thread_running = False
class Camera():
"""Camera class which supports reading images from theses video sources:
1. Image (jpg, png, etc.) file, repeating indefinitely
2. Video file
3. RTSP (IP CAM)
4. USB webcam
5. Jetson onboard camera
"""
def __init__(self, args):
self.args = args
self.is_opened = False
self.video_file = ''
self.video_looping = args.video_looping
self.thread_running = False
self.img_handle = None
self.copy_frame = args.copy_frame
self.do_resize = args.do_resize
self.img_width = args.width
self.img_height = args.height
self.cap = None
self.thread = None
self._open() # try to open the camera
def _open(self):
"""Open camera based on command line arguments."""
if self.cap is not None:
raise RuntimeError('camera is already opened!')
a = self.args
if a.image:
logging.info('Camera: using a image file %s' % a.image)
self.cap = 'image'
self.img_handle = cv2.imread(a.image)
if self.img_handle is not None:
if self.do_resize:
self.img_handle = cv2.resize(
self.img_handle, (a.width, a.height))
self.is_opened = True
self.img_height, self.img_width, _ = self.img_handle.shape
elif a.video:
logging.info('Camera: using a video file %s' % a.video)
self.video_file = a.video
self.cap = cv2.VideoCapture(a.video)
self._start()
elif a.rtsp:
logging.info('Camera: using RTSP stream %s' % a.rtsp)
self.cap = open_cam_rtsp(a.rtsp, a.width, a.height, a.rtsp_latency)
self._start()
elif a.usb is not None:
logging.info('Camera: using USB webcam /dev/video%d' % a.usb)
self.cap = open_cam_usb(a.usb, a.width, a.height)
self._start()
elif a.gstr is not None:
logging.info('Camera: using GStreamer string "%s"' % a.gstr)
self.cap = open_cam_gstr(a.gstr, a.width, a.height)
self._start()
elif a.onboard is not None:
logging.info('Camera: using Jetson onboard camera')
self.cap = open_cam_onboard(a.width, a.height)
self._start()
else:
raise RuntimeError('no camera type specified!')
def isOpened(self):
return self.is_opened
def _start(self):
if not self.cap.isOpened():
logging.warning('Camera: starting while cap is not opened!')
return
# Try to grab the 1st image and determine width and height
_, self.img_handle = self.cap.read()
if self.img_handle is None:
logging.warning('Camera: cap.read() returns no image!')
self.is_opened = False
return
self.is_opened = True
if self.video_file:
if not self.do_resize:
self.img_height, self.img_width, _ = self.img_handle.shape
else:
self.img_height, self.img_width, _ = self.img_handle.shape
# start the child thread if not using a video file source
# i.e. rtsp, usb or onboard
assert not self.thread_running
self.thread_running = True
self.thread = threading.Thread(target=grab_img, args=(self,))
self.thread.start()
def _stop(self):
if self.thread_running:
self.thread_running = False
#self.thread.join()
def read(self):
"""Read a frame from the camera object.
Returns None if the camera runs out of image or error.
"""
if not self.is_opened:
return None
if self.video_file:
_, img = self.cap.read()
if img is None:
logging.info('Camera: reaching end of video file')
if self.video_looping:
self.cap.release()
self.cap = cv2.VideoCapture(self.video_file)
_, img = self.cap.read()
if img is not None and self.do_resize:
img = cv2.resize(img, (self.img_width, self.img_height))
return img
elif self.cap == 'image':
return np.copy(self.img_handle)
else:
if self.copy_frame:
return self.img_handle.copy()
else:
return self.img_handle
def release(self):
self._stop()
try:
self.cap.release()
except:
pass
self.is_opened = False
def __del__(self):
self.release()