Find Dense Optical Flow (OpenCV)

Dense Optical Flow in OpenCV

Farneback optical flow algorithm

Based on Gunner Farneback’s algorithm which is explained in “Two-Frame Motion Estimation Based on Polynomial Expansion” by Gunner Farneback in 2003.

requiements: Python3, OpenCV3, Numpy

参考: Dense Optical Flow in OpenCV

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import cv2
import numpy as np
import glob


def get_frames_path(data_path):
return sorted(glob.glob(data_path + '/' + '*.JPEG'))


def show_flow_hsv(flow, show_style=1):
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

hsv = np.zeros((flow.shape[0], flow.shape[1], 3), np.uint8)

if show_style == 1:
hsv[..., 0] = ang * 180 / np.pi / 2
hsv[..., 1] = 255
hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
elif show_type == 2:
hsv[..., 0] = ang * 180 / np.pi / 2
hsv[..., 1] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
hsv[..., 2] = 255

bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

return bgr


v_data_type = 'frame'

if v_data_type == 'frame':
data_path = '/home/ubuntu/PycharmProjects/Flow_extract/data'
frames_path = get_frames_path(data_path)
frame_length = len(frames_path)
ind = 0
frame1 = cv2.imread(frames_path[ind])
elif v_data_type == 'video':
cap = cv2.VideoCapture("vtest.avi")
ret, frame1 = cap.read()

prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

while (1):
if v_data_type == 'frame':
if ind == frame_length - 1:
break
else:
ind += 1
frame2 = cv2.imread(frames_path[ind])
elif v_data_type == 'video':
ret, frame2 = cap.read()

next = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

# flow shape: (H, W, 2)
flow = cv2.calcOpticalFlowFarneback(prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)

bgr = show_flow_hsv(flow, show_style=2)
cv2.imshow('frame2', bgr)
k = cv2.waitKey(30) & 0xff
if k == 27:
break
elif k == ord('s'):
# cv2.imwrite('opticalfb.png', frame2)
# cv2.imwrite('opticalhsv.png', bgr)
pass

prvs = next

if v_data_type == 'video':
cap.release()

cv2.destroyAllWindows()

运行后显示结果如前面gif

用到的OpenCV函数:

光流可视化:

  1. 将2通道flow图转换为HSV图片

    HSV(Hue 色调, Saturation 饱和度, Value 亮度)空间:

    flow中的笛卡尔直角坐标转换为极坐标后可以获取极轴和极径,正好与HSV可以对应,但是还少了一个元素,这时候有两种补偿方式,一是把V全置255(最后转换的RGB图像较亮),一是把S全置255(最后转换的RGB图像较暗)。

  2. 通过cv2.cvtColor将HSV图片转换为便于显示的RGB图片

    颜色表示运动方向,运动速度由亮度或饱和度表示(取决于把V还是S置为255)

注意一下:

1
2
3
4
5
k = cv2.waitKey(30) & 0xff # expect keycode isn't ASCII
if k == 27: # 'Esc' ASCII
pass
elif k == ord('s'): # ord() return ASCII
pass

处理光流函数

参考opencv/samples/python/opt_flow.py

warp_flow

1
2
3
4
5
6
7
def warp_flow(img, flow):
h, w = flow.shape[:2]
flow = -flow
flow[:,:,0] += np.arange(w)
flow[:,:,1] += np.arange(h)[:,np.newaxis]
res = cv2.remap(img, flow, None, cv2.INTER_LINEAR)
return res

cv2.remap, 参数flow是每一个像素点的x,y方向运动信息(映射关系)。将当前图片用光流信息进行remap,推测运动后图片。

draw_flow

1
2
3
4
5
6
7
8
9
10
11
def draw_flow(img, flow, step=16):
h, w = img.shape[:2]
y, x = np.mgrid[step/2:h:step, step/2:w:step].reshape(2,-1).astype(int)
fx, fy = flow[y,x].T
lines = np.vstack([x, y, x+fx, y+fy]).T.reshape(-1, 2, 2)
lines = np.int32(lines + 0.5)
vis = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
cv2.polylines(vis, lines, 0, (0, 255, 0))
for (x1, y1), (_x2, _y2) in lines:
cv2.circle(vis, (x1, y1), 1, (0, 255, 0), -1)
return vis

效果图:

绿色箭头表示运动方向,线段表示运动速度。

DIS optical flow algorithm

Till Kroeger, Radu Timofte, Dengxin Dai, and Luc Van Gool. Fast optical flow using dense inverse search. In Proceedings of the European Conference on Computer Vision (ECCV), 2016.

code (OpenCV4版本),该算法在OpenCV3版本的 contrib 中。

calc())

createOptFlow_DIS())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import numpy as np
import cv2
import glob
import sys
import time


def get_frames_path(data_path, picture_format):
return sorted(glob.glob(data_path + '/' + '*.' + picture_format))

def show_flow_hsv(flow, show_style=1):
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

hsv = np.zeros((flow.shape[0], flow.shape[1], 3), np.uint8)

if show_style == 1:
hsv[..., 0] = ang * 180 / np.pi / 2
hsv[..., 1] = 255
hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
elif show_style == 2:
hsv[..., 0] = ang * 180 / np.pi / 2
hsv[..., 1] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
hsv[..., 2] = 255

bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

return bgr

if __name__ == '__main__':
v_data_type = 'frame'

if v_data_type == 'frame':
data_path = '/home/ubuntu/VScode_projects/Python/pictures_for_test/test_op_flow/img1'
frames_path = get_frames_path(data_path, 'jpg')
frame_length = len(frames_path)
ind = 0
frame1 = cv2.imread(frames_path[ind])
prvs_img_gray = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
elif v_data_type == 'video':
cap = cv2.VideoCapture("vtest.avi")
ret, frame1 = cap.read()

inst = cv2.optflow.createOptFlow_DIS(cv2.optflow.DISOpticalFlow_PRESET_MEDIUM)
inst.setUseSpatialPropagation(True)

flow = None
while True:
if v_data_type == 'frame':
if ind == frame_length - 1:
break
else:
ind += 1
frame2 = cv2.imread(frames_path[ind])
next_img_gray = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
elif v_data_type == 'video':
ret, frame2 = cap.read()

flow = inst.calc(prvs_img_gray, next_img_gray, None)
prvs_img_gray = next_img_gray

cv2.imshow('flow HSV', show_flow_hsv(flow, show_style=1))

k = cv2.waitKey(30) & 0xff
if k == 27:
break

cv2.destroyAllWindows()

根据:

Includes three presets with preselected parameters to provide reasonable trade-off between speed and quality. However, even the slowest preset is still relatively fast, use DeepFlow if you need better quality and don’t care about speed.

cv2.optflow.createOptFlow_DIS()中的参数是来协调速度和质量的,有三种选项:

  • cv2.optflow.DISOpticalFlow_PRESET_MEDIUM
  • cv2.optflow.DISOpticalFlow_PRESET_FAST
  • cv2.optflow.DISOpticalFlow_PRESET_ULTRAFAST

保存和读取光流文件

光流格式文件的后缀名为 .flo

保存

writeOpticalFlow()

1
cv2.optflow.writeOpticalFlow('×××.flo', flow)

读取

readOpticalFlow()

1
flow = cv2.optflow.readOpticalFlow(flow_path)

对ISLVRC VID数据集提取光流信息

将ISLVRC VID数据集中的图片转化为光流数据保存

程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import cv2
import numpy as np
import glob
from tqdm import tqdm
import os

def get_frame_dirs(dataset_path, mode='train'):
if mode == 'train':
dirs = sorted(glob.glob(dataset_path + '/' + mode + '/*/*'))
elif mode == 'val':
pass
elif mode == 'test':
pass
return dirs


def save_color_or_flow(frame_path, flow, is_save_color=False):
"""
'is_save_color == False' means saving flow
"""

if is_save_color:
path = frame_path.replace('VID', 'VID_flow_color', 1)
else:
path = frame_path.replace('VID', 'VID_flow', 1).replace('JPEG', 'flo')

if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))

if is_save_color:
bgr = flow_2_bgr(flow, style=1)
cv2.imwrite(path, bgr)
else:
cv2.optflow.writeOpticalFlow(path, flow)


def flow_2_bgr(flow, style=1):
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

hsv = np.zeros((flow.shape[0], flow.shape[1], 3), np.uint8)

if style == 1:
hsv[..., 0] = ang * 180 / np.pi / 2
hsv[..., 1] = 255
hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
elif style == 2:
hsv[..., 0] = ang * 180 / np.pi / 2
hsv[..., 1] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
hsv[..., 2] = 255

bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

return bgr


def compute_flow(frames_path, is_save=False, is_display=False):
frames = sorted(glob.glob(frames_path + '/' + '*.JPEG'))

ind = 0
frame1 = cv2.imread(frames[ind])
prvs_frame = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

for frame in frames[1:]:
frame2 = cv2.imread(frame)
next_frame = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

flow = cv2.calcOpticalFlowFarneback(prvs_frame, next_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)

if is_save:
save_color_or_flow(frame, flow, is_save_color=True)
if is_display:
bgr = flow_2_bgr(flow, style=1)
cv2.imshow('frame', bgr)

k = cv2.waitKey(30) & 0xff
if k == 27:
break

prvs_frame = next_frame

def terminate_continue(dirs, break_dir):
"""
:param dirs: must be sorted.
:param break_dir: start from break_dir, break_dir is also redo.
:return: dirs slice from break_dir to end.
"""
continue_index = dirs.index(break_dir)

progress = ('%0.2f' % (continue_index / len(dirs) * 100)) + '%'

print('Continue: %s has been completed' % progress)
print('Remains:')

return dirs[continue_index:]


if __name__ == '__main__':
vid_dataset_path = '/home/ubuntu/Downloads/dataset/ILSVRC/Data/VID'

dirs = get_frame_dirs(vid_dataset_path, 'train')

# break_dir = '/home/ubuntu/Downloads/dataset/ILSVRC/Data/VID/train/ILSVRC2015_VID_train_0000/ILSVRC2015_train_00047024'

# dirs = terminate_continue(dirs, break_dir)

for every_dir in tqdm(dirs):
compute_flow(every_dir, is_save=True, is_display=False)

输出进度,一小时25分,才进行了3%.

1
3%|▎         | 98/3862 [1:25:37<22:12:17, 21.24s/it]

提升速度,可以:

  1. 改为多进程(应该是CPU密集型任务)。
  2. 交叉编译为C++。

运行到15%时,提示硬盘空间不足,这才发现,flo格式的文件很大(大小约6M~8M,原图是1920x1080).

针对一个640x358图片(大小约25k)进行分析:

查看程序中生成的flow (shape : (358, 640, 2)) 占用内存大小

1
2
3
4
import sys

...
print(sys.getsizeof(flow))

1833088 Bytes ≈ 1.748 MB, 生成的flo文件也是不到1.8 MB, 看样子OpenCV并没有对数据进行压缩。 $ 1833088÷(358×640×2)=4 Bytes $ ,flow中的数据类型是float32(numpy)。

遇到的问题

install tqdm

为了增加进度条,使用tqdm:

1
conda install tqdm

结果因为依赖问题使opencv不能用了(opencv也是用conda安装的),只好用pip重新安装opencv,感觉安装tqdm的时候应该用pip方式安装的。

重新安装opencv后还缺少cv2.optflow.writeOpticalFlow,需要再安装opencv-contrib:

1
pip install opencv-contrib-python

----------over----------


文章标题:Find Dense Optical Flow (OpenCV)

文章作者:Ge垚

发布时间:2018年10月12日 - 09:10

最后更新:2019年04月07日 - 22:04

原始链接:http://geyao1995.com/extract_optical_flow/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。