Common library with python code snippets used frequently.
- OpenCV
- Python multiprocessing
- VideoManager
- Camera capture utils
- Motion detector
- common Inference clients
- Usb camera mount path finder
pip install --user git+https://gitlab.com/_macherlabs/visonlibs/common.git
video_writer = VideoWriter(filename='pc-rgb.'+'.avi',
fourcc = 'XVID',
fps = 1,
frameSize = (640,480),
base_dir = '/LFS/depth_vids',
max_sizeMB = 2,
cleanupDays = 3,
annotations= True,
annotations_header=['frame_number','xmin','ymin','xmax','ymax'])
img=np.ones((640,480)).astype('uint8')
annotation = ["10","20","30","40"]
video_writer.write(img,annotation)
Returns the mount path for usb camera say ('/dev/video0')
cam_mount_path=find_usb_cam_path(
VENDOR_ID='1234',
MODEL_ID ='12',
SERIAL_ID='123')
Wrapper for reading camera streams
supported streams-rtsp,http,usb
# Camera config
cam_config={
"name":"testcamera",
"url":"rtsp://test.avi",
"_id":"1234",
"params":
{
"readMethod":"video", # if video, will read images from saved video via video manager, else rtsp
"motionEnabled":True # if true, will save videos only of frames with motion
}
}
# Video config
vid_config =
{
"base_dir":"/Users/saurabh_veda/workspace_kumar/new_test"
}
cap = cap_rtsp(cam_config,vid_config)
while True:
ret,image=cap.read()
image_time = cap.get_process_time() # Actual time when image was recorded
if ret:
cv2.imshow("vid",image)
cv2.waitKey(1)
Check if motion is observed in camera stream
config ={
"threshold_scale_factor":0.1, threshold above which event is registered
"start_at":"12:02am", # start time for motion detection
"end_at":"11:59pm", # end time for motion detection
"roi":{"x":10,"y":10,"x1":20,"y1":50}, # check motion only in roi
"timeCheck":False # if true checks motion during specific time limit only
}
detector=motionDetector(config)
cap=cv2.VideoCapture(0)
while True:
ret,img=cap.read()
if ret:
det= detector.detect(img)
Client for detection inferences
config={"endpoint":"http://localhost",
"port":9000,
"route":"detect"}
detector=detectorClient(config)
while True:
img=np.random.rand(300,300,3)
det= detector.detect(img)
print("dets",det)
Client for insight face embedding calculator
config={"endpoint":"http://localhost",
"apiPort":9000,
"mlPort":6500,
"api_version":"api/v1",
"service":"recognition",
"api_key":"00000000-0000-0000-0000-000000000002",
"maxFailedAttempts":5000,
"matchThreshold":0.6
}
recognizer=recognizerClient(config)
while True:
img=np.random.rand(300,300,3)
embedding= recognizer.get_encodings(img)
print("dets",det)
Video manager for writing and reading saved videos
manager = VideoManager(filename='test.avi',
fourcc = 'XVID', #encoding for saved video
fps = 30, # fps of saved video
frameSize = (640,480), # resolution of saved video
base_dir = '/Users/saurabh_veda/workspace_kumar/test_videos', # directory where videos will be saved
max_sizeMB = 500, # max size after video will be released
max_seconds= 60, # max duration of video after which video will be released
max_period = 60 # max time period after which video will be released
cleanupDays = 1 # no of days video will be retained
)
while True:
ret,img=manager.read()
if ret:
cv2.imshow("vid",img)
cv2.waitKey(1)