|
|
import os |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from os.path import * |
|
|
import re |
|
|
import json |
|
|
import imageio |
|
|
import cv2 |
|
|
cv2.setNumThreads(0) |
|
|
cv2.ocl.setUseOpenCL(False) |
|
|
|
|
|
TAG_CHAR = np.array([202021.25], np.float32) |
|
|
|
|
|
def readFlow(fn): |
|
|
""" Read .flo file in Middlebury format""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open(fn, 'rb') as f: |
|
|
magic = np.fromfile(f, np.float32, count=1) |
|
|
if 202021.25 != magic: |
|
|
print('Magic number incorrect. Invalid .flo file') |
|
|
return None |
|
|
else: |
|
|
w = np.fromfile(f, np.int32, count=1) |
|
|
h = np.fromfile(f, np.int32, count=1) |
|
|
|
|
|
data = np.fromfile(f, np.float32, count=2*int(w)*int(h)) |
|
|
|
|
|
|
|
|
return np.resize(data, (int(h), int(w), 2)) |
|
|
|
|
|
def readPFM(file): |
|
|
file = open(file, 'rb') |
|
|
|
|
|
color = None |
|
|
width = None |
|
|
height = None |
|
|
scale = None |
|
|
endian = None |
|
|
|
|
|
header = file.readline().rstrip() |
|
|
if header == b'PF': |
|
|
color = True |
|
|
elif header == b'Pf': |
|
|
color = False |
|
|
else: |
|
|
raise Exception('Not a PFM file.') |
|
|
|
|
|
dim_match = re.match(rb'^(\d+)\s(\d+)\s$', file.readline()) |
|
|
if dim_match: |
|
|
width, height = map(int, dim_match.groups()) |
|
|
else: |
|
|
raise Exception('Malformed PFM header.') |
|
|
|
|
|
scale = float(file.readline().rstrip()) |
|
|
if scale < 0: |
|
|
endian = '<' |
|
|
scale = -scale |
|
|
else: |
|
|
endian = '>' |
|
|
|
|
|
data = np.fromfile(file, endian + 'f') |
|
|
shape = (height, width, 3) if color else (height, width) |
|
|
|
|
|
data = np.reshape(data, shape) |
|
|
data = np.flipud(data) |
|
|
return data |
|
|
|
|
|
def writePFM(file, array): |
|
|
import os |
|
|
assert type(file) is str and type(array) is np.ndarray and \ |
|
|
os.path.splitext(file)[1] == ".pfm" |
|
|
with open(file, 'wb') as f: |
|
|
H, W = array.shape |
|
|
headers = ["Pf\n", f"{W} {H}\n", "-1\n"] |
|
|
for header in headers: |
|
|
f.write(str.encode(header)) |
|
|
array = np.flip(array, axis=0).astype(np.float32) |
|
|
f.write(array.tobytes()) |
|
|
|
|
|
|
|
|
|
|
|
def writeFlow(filename,uv,v=None): |
|
|
""" Write optical flow to file. |
|
|
|
|
|
If v is None, uv is assumed to contain both u and v channels, |
|
|
stacked in depth. |
|
|
Original code by Deqing Sun, adapted from Daniel Scharstein. |
|
|
""" |
|
|
nBands = 2 |
|
|
|
|
|
if v is None: |
|
|
assert(uv.ndim == 3) |
|
|
assert(uv.shape[2] == 2) |
|
|
u = uv[:,:,0] |
|
|
v = uv[:,:,1] |
|
|
else: |
|
|
u = uv |
|
|
|
|
|
assert(u.shape == v.shape) |
|
|
height,width = u.shape |
|
|
f = open(filename,'wb') |
|
|
|
|
|
f.write(TAG_CHAR) |
|
|
np.array(width).astype(np.int32).tofile(f) |
|
|
np.array(height).astype(np.int32).tofile(f) |
|
|
|
|
|
tmp = np.zeros((height, width*nBands)) |
|
|
tmp[:,np.arange(width)*2] = u |
|
|
tmp[:,np.arange(width)*2 + 1] = v |
|
|
tmp.astype(np.float32).tofile(f) |
|
|
f.close() |
|
|
|
|
|
|
|
|
def readFlowKITTI(filename): |
|
|
flow = cv2.imread(filename, cv2.IMREAD_ANYDEPTH|cv2.IMREAD_COLOR) |
|
|
flow = flow[:,:,::-1].astype(np.float32) |
|
|
flow, valid = flow[:, :, :2], flow[:, :, 2] |
|
|
flow = (flow - 2**15) / 64.0 |
|
|
return flow, valid |
|
|
|
|
|
def readDispKITTI(filename): |
|
|
disp = cv2.imread(filename, cv2.IMREAD_ANYDEPTH) / 256.0 |
|
|
valid = disp > 0.0 |
|
|
return disp, valid |
|
|
|
|
|
def writeDispKITTI(filename, disp): |
|
|
disp = np.round(disp * 256).astype(np.uint16) |
|
|
|
|
|
cv2.imwrite(filename, disp) |
|
|
|
|
|
def readDispCRES(filename): |
|
|
try: |
|
|
disp = cv2.imread(filename, cv2.IMREAD_ANYDEPTH).astype(np.float32) / 32.0 |
|
|
valid = disp > 0.0 |
|
|
return disp, valid |
|
|
except Exception as err: |
|
|
raise(Exception(err, "Something wrong with {}".format(filename), os.getcwd())) |
|
|
|
|
|
def writeDispCRES(filename, disp): |
|
|
disp = np.round(disp * 32).astype(np.uint16) |
|
|
|
|
|
cv2.imwrite(filename, disp) |
|
|
|
|
|
def readDispNerfS(filename): |
|
|
disp = cv2.imread(filename, cv2.IMREAD_ANYDEPTH).astype(np.float32) / 64.0 |
|
|
|
|
|
match = re.search(r"(.*?/Q/)", filename) |
|
|
if match: |
|
|
prefix = match.group(1) |
|
|
suffix = os.path.basename(filename) |
|
|
|
|
|
ao_path = f"{prefix}AO/{suffix}" |
|
|
|
|
|
else: |
|
|
raise Exception("corrupted path for NerfStereo: {}".format(filename)) |
|
|
valid = cv2.imread(ao_path, cv2.IMREAD_ANYDEPTH).astype(np.float32) / 65535 |
|
|
return disp, valid |
|
|
|
|
|
def writeDispNerfS(filename, disp): |
|
|
disp = np.round(disp * 64).astype(np.uint16) |
|
|
|
|
|
cv2.imwrite(filename, disp) |
|
|
|
|
|
def readDispBooster(file_name): |
|
|
disp = np.load(file_name, encoding='bytes', allow_pickle=True) |
|
|
|
|
|
mask_cat_path = os.path.join(os.path.split(file_name)[0], 'mask_cat.png') |
|
|
mask_cat = cv2.imread(mask_cat_path, cv2.IMREAD_ANYDEPTH).astype(np.float32) |
|
|
valid = mask_cat |
|
|
return disp, valid |
|
|
|
|
|
def writeDispBooster(filename, disp): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
np.save(filename, disp) |
|
|
|
|
|
|
|
|
def readDispSintelStereo(file_name): |
|
|
a = np.array(Image.open(file_name)) |
|
|
d_r, d_g, d_b = np.split(a, axis=2, indices_or_sections=3) |
|
|
disp = (d_r * 4 + d_g / (2**6) + d_b / (2**14))[..., 0] |
|
|
mask = np.array(Image.open(file_name.replace('disparities', 'occlusions'))) |
|
|
valid = ((mask == 0) & (disp > 0)) |
|
|
return disp, valid |
|
|
|
|
|
|
|
|
def readDispFallingThings(file_name): |
|
|
a = np.array(Image.open(file_name)) |
|
|
with open('/'.join(file_name.split('/')[:-1] + ['_camera_settings.json']), 'r') as f: |
|
|
intrinsics = json.load(f) |
|
|
fx = intrinsics['camera_settings'][0]['intrinsic_settings']['fx'] |
|
|
disp = (fx * 6.0 * 100) / a.astype(np.float32) |
|
|
valid = disp > 0 |
|
|
return disp, valid |
|
|
|
|
|
|
|
|
def readDispTartanAir(file_name): |
|
|
depth = np.load(file_name) |
|
|
disp = 80.0 / depth |
|
|
valid = disp > 0 |
|
|
return disp, valid |
|
|
|
|
|
|
|
|
def readDispMiddlebury(file_name): |
|
|
if basename(file_name) == 'disp0GT.pfm': |
|
|
disp = readPFM(file_name).astype(np.float32) |
|
|
assert len(disp.shape) == 2 |
|
|
nocc_pix = file_name.replace('disp0GT.pfm', 'mask0nocc.png') |
|
|
assert exists(nocc_pix) |
|
|
nocc_pix = imageio.imread(nocc_pix) == 255 |
|
|
assert np.any(nocc_pix) |
|
|
return disp, nocc_pix |
|
|
elif basename(file_name) == 'disp0.pfm': |
|
|
disp = readPFM(file_name).astype(np.float32) |
|
|
valid = disp < 1e3 |
|
|
return disp, valid |
|
|
|
|
|
def writeDispMiddlebury(file_name, disp): |
|
|
writePFM(file_name, disp) |
|
|
|
|
|
def writeFlowKITTI(filename, uv): |
|
|
uv = 64.0 * uv + 2**15 |
|
|
valid = np.ones([uv.shape[0], uv.shape[1], 1]) |
|
|
uv = np.concatenate([uv, valid], axis=-1).astype(np.uint16) |
|
|
cv2.imwrite(filename, uv[..., ::-1]) |
|
|
|
|
|
|
|
|
def read_gen(file_name, pil=False): |
|
|
ext = splitext(file_name)[-1] |
|
|
if ext == '.png' or ext == '.jpeg' or ext == '.ppm' or ext == '.jpg': |
|
|
return Image.open(file_name) |
|
|
elif ext == '.bin' or ext == '.raw': |
|
|
return np.load(file_name) |
|
|
elif ext == '.flo': |
|
|
return readFlow(file_name).astype(np.float32) |
|
|
elif ext == '.pfm': |
|
|
flow = readPFM(file_name).astype(np.float32) |
|
|
if len(flow.shape) == 2: |
|
|
return flow |
|
|
else: |
|
|
return flow[:, :, :-1] |
|
|
return [] |
|
|
|
|
|
def write_gen(file_name, disp, pil=False): |
|
|
ext = splitext(file_name)[-1] |
|
|
if ext == '.png' or ext == '.jpeg' or ext == '.ppm' or ext == '.jpg': |
|
|
raise Exception("no support for {} file".format(ext)) |
|
|
elif ext == '.bin' or ext == '.raw': |
|
|
np.save(disp, file_name) |
|
|
elif ext == '.flo': |
|
|
writeFlow(file_name, disp) |
|
|
elif ext == '.pfm': |
|
|
writePFM(file_name, disp) |
|
|
else: |
|
|
raise Exception("no support for {} file".format(ext)) |