208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681 | class Features:
"""
Features class represents a collection of several Feature objects detected in an image.
Attributes:
_values (dict): A dictionary that maps feature IDs to Feature objects.
_last_id (int): The last assigned feature ID.
_iter (int): An iterator used to iterate over the feature IDs.
_descriptor_size (int): The size of the feature descriptors in the collection.
Note:
Use getters and setters methods to access to the features stored in Features object.
"""
def __init__(self):
self._values = {}
self._last_id = -1
self._iter = 0
self._descriptor_size = 256
"""
__init__ Initialize Features object
"""
def __len__(self) -> int:
"""
__len__ Get number of features stored in Features object
Returns:
int: number of features
"""
return len(self._values)
def __getitem__(self, track_id: np.int32) -> Feature:
"""
__getitem__ Get Feature object by calling Features instance with [] based on track_id (e.g., features[track_id] to get first Feature object)
Args:
track_id (int): track_id of the feature to extract
Returns:
Feature: requested Feature object
"""
if track_id in list(self._values.keys()):
return self._values[track_id]
else:
logger.warning(f"Feature with track id {track_id} not available.")
return None
def __contains__(self, track_id: np.int32) -> bool:
"""
__contains__ Check if a feature with given track_id is present in Features object.
Args:
track_id (np.int32): track_id of the feature to check
Returns:
bool: True if the feature is present.
"""
if track_id in list(self._values.keys()):
return True
else:
return False
def __delitem__(self, track_id: np.int32) -> bool:
"""
__delitem__ Deleate a feature with its given track_id, if this is present in Features object (log a warning otherwise).
Args:
track_id (np.int32): track_id of the feature to delete
Returns:
bool: True if the item was present and deleted, False otherwise.
"""
if track_id not in self:
logger.warning(f"Feature with track_id {track_id} not present")
return False
else:
del self._values[track_id]
return True
def __iter__(self):
self._iter = 0
return self
def __next__(self):
while self._iter < len(self):
f = self._values[self._iter]
self._iter += 1
return f
else:
self._iter = 0
raise StopIteration
def __repr__(self) -> str:
return f"Features with {len(self)} features"
@property
def num_features(self):
"""
num_features Number of features stored in Features object
"""
return len(self._values)
@property
def last_track_id(self):
"""
last_track_id Track_id of the last features stored in Features object
"""
return self._last_id
def get_track_ids(self) -> Tuple[np.int32]:
"""
get_track_ids Get a ordered tuple of track_id of all the features
Returns:
tuple: tuple of size (n,) with track_ids
"""
return tuple([np.int32(x) for x in self._values.keys()])
def append_feature(self, new_feature: Feature) -> None:
"""
append_feature append a single Feature object to Features.
Args:
new_feature (Feature): Feature object to be appended. It must contain at least the x and y coordinates of the keypoint.
"""
assert isinstance(
new_feature, Feature
), "Invalid input feature. It must be Feature object"
self._last_id += 1
self._values[self._last_id] = new_feature
if new_feature.descr is not None:
if len(self) > 0:
assert (
self._descriptor_size == new_feature.descr.shape[0]
), "Descriptor size of the new feature does not match with that of the existing feature"
else:
self._descriptor_size = new_feature.descr.shape[0]
def set_last_track_id(self, last_track_id: np.int32) -> None:
"""
set_last_track_id set track_id of last feature to a custom value
Args:
last_track_id (np.int32): track_id to set.
"""
try:
last_id = np.int32(last_track_id)
except:
raise ValueError(
"Invalid input argument last_track_id. It must be an integer number."
)
self._last_id = last_id
def append_features_from_numpy(
self,
x: np.ndarray,
y: np.ndarray,
descr: np.ndarray = None,
scores: np.ndarray = None,
track_ids: List[np.int32] = None,
epoch: np.int32 = None,
) -> None:
"""
append_features_from_numpy append new features to Features object, starting from numpy arrays of x and y coordinates, descriptors and scores.
Args:
x (np.ndarray): nx1 numpy array containing x coordinates of all keypoints
y (np.ndarray): nx1 numpy array containing y coordinates of all keypoints
descr (np.ndarray, optional): mxn numpy array containing the descriptors of all the features (where m is the dimension of the descriptor that can be either 128 or 256). Defaults to None.
scores (np.ndarray, optional): nx1 numpy array containing scores of all keypoints. Defaults to None.
track_ids (List[int]): Sorted list containing the track_id of each point to be added to Features object. Default to None.
epoch (np.int32, optional): Epoch in which the incoming features are detected (or belongs to). Defaults to None.
"""
assert isinstance(x, np.ndarray), "invalid type of x vector"
assert isinstance(y, np.ndarray), "invalid type of y vector"
if not np.any(x):
logger.warning("Empty input feature arrays. Nothing done.")
return None
x = float32_type_check(x, cast_integers=True)
y = float32_type_check(y, cast_integers=True)
xx = x.flatten()
yy = y.flatten()
if descr is not None:
assert descr.shape[0] in [
128,
256,
], "invalid shape of the descriptor array. It must be of size mxn (m: descriptor size [128, 256], n: number of features"
if len(self) > 0:
assert (
self._descriptor_size == descr.shape[0]
), "Descriptor size of the new feature does not match with that of the existing feature"
else:
self._descriptor_size = descr.shape[0]
descr = float32_type_check(descr.T)
else:
descr = [None for _ in range(len(xx))]
if track_ids is None:
ids = range(self._last_id + 1, self._last_id + len(xx) + 1)
else:
assert isinstance(
track_ids, list
), "Invalid track_ids input. It must be a list of integers of the same size of the input arrays."
assert len(track_ids) == len(
xx
), "invalid size of track_id input. It must be a list of the same size of the input arrays."
try:
for id in track_ids:
if id in list(self._values.keys()):
msg = f"Feature with track_id {id} is already present in Features object. Ignoring input track_id and assigning progressive track_ids."
logger.error(msg)
raise ValueError(msg)
ids = track_ids
except ValueError:
ids = range(self._last_id + 1, self._last_id + len(xx) + 1)
if scores is not None:
scores = float32_type_check(scores) # .squeeze()
else:
scores = [None for _ in range(len(xx))]
if epoch is not None:
msg = "Invalid input argument epoch. It must be an integer number."
try:
epoch = np.int32(epoch)
except:
raise ValueError(msg)
assert isinstance(epoch, np.int32), msg
self.epoch = epoch
for t_id, x, y, d, s in zip(ids, xx, yy, descr, scores):
if s is not None:
score_val = s[0] if s.shape == (1,) else s
else:
score_val = None
self._values[t_id] = Feature(
x,
y,
track_id=t_id,
descr=d,
score=score_val,
epoch=epoch,
)
self._last_id = t_id
def to_numpy(
self,
get_descr: bool = False,
get_score: bool = False,
) -> dict:
"""
to_numpy Get all keypoints as a nx2 numpy array of float32 coordinates.
If 'get_descr' and 'get_score' arguments are set to true, get also descriptors and scores as numpy arrays (default: return only keypoints). Outputs are returned in a dictionary with keys ["kpts", "descr", "scores"]
Args:
get_descr (bool, optional): get descriptors as mxn array. Defaults to False.
get_score (bool, optional): get scores as nx1 array. Defaults to False.
Returns:
dict: dictionary containing the following keys (depending on the input arguments): ["kpts", "descr", "scores"]
"""
kpts = np.empty((len(self), 2), dtype=np.float32)
for i, v in enumerate(self._values.values()):
kpts[i, :] = np.float32(v.xy)
if get_descr and get_score:
descr = self.descr_to_numpy()
scores = self.scores_to_numpy()
return {"kpts": kpts, "descr": descr, "scores": scores}
elif get_descr:
descr = self.descr_to_numpy()
return {"kpts": kpts, "descr": descr}
else:
return {"kpts": kpts}
def kpts_to_numpy(self) -> np.ndarray:
"""
kpts_to_numpy Get all keypoints coordinates stacked as a nx2 numpy array.
Returns:
np.ndarray: nx2 numpy array containing xy coordinates of all keypoints
"""
kpts = np.empty((len(self), 2))
for i, v in enumerate(self._values.values()):
kpts[i, :] = v.xy
return np.float32(kpts)
def descr_to_numpy(self) -> np.ndarray:
"""
descr_to_numpy Get all descriptors stacked as a mxn (m is the descriptor size [128 or 256]) numpy array.
Returns:
np.ndarray: mxn numpy array containing the descriptors of all the features (where m is the dimension of the descriptor that can be either 128 or 256)
"""
assert any(
[self._values[i].descr is not None for i in list(self._values.keys())]
), "Descriptors non availble"
descr = np.empty((self._descriptor_size, len(self)), dtype=np.float32)
for i, v in enumerate(self._values.values()):
descr[:, i : i + 1] = v.descr.reshape(-1, 1)
return np.float32(descr)
def scores_to_numpy(self) -> np.ndarray:
"""
scores_to_numpy Get all scores stacked as a nx1 numpy array.
Returns:
np.ndarray: nx1 array with scores
"""
assert any(
[self._values[i].score is not None for i in list(self._values.keys())]
), "Scores non availble"
score = np.empty(len(self), dtype=np.float32)
for i, v in enumerate(self._values.values()):
score[i] = v.score
return np.float32(score)
def get_features_as_dict(self, get_track_id: bool = False) -> dict:
"""
get_features_as_dict Get a dictionary with keypoints, descriptors and scores, organized for SuperGlue
Args:
get_track_id (bool, optional): get a tuple with the track_id of all the features as an additionally dictionary key ["track_id"]. Defaults to False.
Returns:
dict: dictionary containing the following keys (depending on the input arguments): ["keypoints0", "descriptors0", "scores0"]
"""
dict = {
"keypoints0": self.kpts_to_numpy(),
"descriptors0": self.descr_to_numpy(),
"scores0": self.scores_to_numpy(),
}
if get_track_id:
dict["track_id"] = self.get_track_ids()
return dict
def reset_fetures(self):
"""Reset Features instance"""
self._values = {}
self._last_id = -1
self._iter = 0
def filter_feature_by_mask(self, inlier_mask: List[bool]) -> None:
"""
delete_feature_by_mask Keep only inlier features, given a mask array as a list of boolean values. Note that this function does NOT take into account the track_id of the features! Inlier mask must have the same lenght as the number of features stored in the Features instance.
Args:
inlier_mask (List[bool]): boolean mask with True value in correspondance of the features to keep. inlier_mask must have the same length as the total number of features.
"""
inlier_mask = np.array(inlier_mask)
assert np.array_equal(
inlier_mask, inlier_mask.astype(bool)
), "Invalid type of input argument for inlier_mask. It must be a boolean vector with the same lenght as the number of features stored in the Features object."
assert len(inlier_mask) == len(
self
), "Invalid shape of input argument for inlier_mask. It must be a boolean vector with the same lenght as the number of features stored in the Features object."
feat_idx = list(self._values.keys())
indexes = list(compress(feat_idx, inlier_mask))
self.filter_feature_by_index(indexes)
def filter_feature_by_index(self, indexes: List[np.int32]) -> None:
"""
delete_feature_by_mask Keep only inlier features, given a list of index (int values) of the features to keep.
Args:
indexes (List[int]): List with the index of the features to keep.
"""
for k in set(self._values.keys()) - set(indexes):
del self._values[k]
def get_feature_by_index(self, indexes: List[np.int32]) -> dict:
"""
get_feature_by_index Get inlier features, given a list of index (int values) of the features to keep.
Args:
indexes (List[int]): List with the index of the features to keep.
Returns:
dict: dictionary containing the selected features with track_id as keys and Feature object as values {track_id: Feature}
"""
return {k: v for k, v in self._values.items() if v.track_id in indexes}
def save_as_txt(
self,
path: Union[str, Path],
fmt: str = "%i",
delimiter: str = ",",
header: str = "x,y",
):
"""Save keypoints in a .txt file"""
kpts = self.kpts_to_numpy()
np.savetxt(
path, kpts, fmt=fmt, delimiter=delimiter, newline="\n", header=header
)
def save_as_pickle(self, path: Union[str, Path]) -> True:
"""Save keypoints in as pickle file"""
path = Path(path)
with open(path, "wb") as f:
pickle.dump(self, f, protocol=pickle.HIGHEST_PROTOCOL)
# def save_as_h5(self, path: Union[str, Path]) -> bool:
# key1, key2 = images[cams[0]][epoch], images[cams[1]][epoch]
# mkpts0 = features[epoch][cams[0]].kpts_to_numpy()
# mkpts1 = features[epoch][cams[1]].kpts_to_numpy()
# n_matches = len(mkpts0)
# output_dir = Path(epochdir)
# db_name = output_dir / f"{epoch_dict[epoch]}.h5"
# with h5py.File(db_name, mode="w") as f_match:
# group = f_match.require_group(key1)
# if n_matches >= MIN_MATCHES:
# group.create_dataset(
# key2, data=np.concatenate([mkpts0, mkpts1], axis=1)
# )
# kpts = defaultdict(list)
# match_indexes = defaultdict(dict)
# total_kpts = defaultdict(int)
def plot_features(
self,
image: np.ndarray,
**kwargs,
) -> None:
"""
plot_features Plot features on input image.
Args:
image (np.ndarray): A numpy array with RGB channels.
**kwargs: additional keyword arguments for plotting characteristics (e.g. `s`, `c`, `marker`, etc.). Refer to matplotlib.pyplot.scatter documentation for more information https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.scatter.html.
Note:
This methods is a simplified version of icepy4d.visualization.visualization.plot_points() function. It cannot be called directly inside this method due to a circular import problem.
Returns:
None
"""
points = self.to_numpy()["kpts"]
s = 6
c = "y"
marker = "o"
alpha = 0.8
edgecolors = "r"
linewidths = 1
# overwrite default values with kwargs if provided
s = kwargs.get("s", s)
c = kwargs.get("c", c)
marker = kwargs.get("marker", marker)
alpha = kwargs.get("alpha", alpha)
edgecolors = kwargs.get("edgecolors", edgecolors)
linewidths = kwargs.get("linewidths", linewidths)
_, ax = plt.subplots()
ax.imshow(image)
ax.scatter(
points[:, 0],
points[:, 1],
s=s,
c=c,
marker=marker,
alpha=alpha,
edgecolors=edgecolors,
linewidths=linewidths,
)
plt.show()
|