Skip to content

feat: update ground segmentation evaluator to be compatible with lidarseg dataset #156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@


class Condition(BaseModel):
Method: Literal["annotated_pcd", "annotated_rosbag"]
ground_label: number
obstacle_label: number
accuracy_min: number
Expand Down
143 changes: 51 additions & 92 deletions driving_log_replayer_v2/scripts/ground_segmentation_evaluator_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import json
from pathlib import Path

import message_filters
import numpy as np
from rclpy.qos import qos_profile_sensor_data
import ros2_numpy
Expand All @@ -34,7 +33,7 @@


class GroundSegmentationEvaluator(DLREvaluatorV2):
CLOUD_DIM = 6
CLOUD_DIM = 5
TS_DIFF_THRESH = 75000

def __init__(self, name: str) -> None:
Expand All @@ -51,51 +50,45 @@
self.get_parameter("evaluation_target_topic").get_parameter_value().string_value
)

if eval_condition.Method == "annotated_pcd":
# pcd eval mode
sample_data_path = Path(self._t4_dataset_paths[0], "annotation", "sample_data.json")
sample_data = json.load(sample_data_path.open())
sample_data = list(filter(lambda d: d["filename"].split(".")[-2] == "pcd", sample_data))

self.ground_truth: dict[int, np.ndarray] = {}
for i in range(len(sample_data)):
pcd_file_path = Path(
self._t4_dataset_paths[0],
sample_data[i]["filename"],
).as_posix()
raw_points = np.fromfile(pcd_file_path, dtype=np.float32)
points: np.ndarray = raw_points.reshape((-1, self.CLOUD_DIM))
self.ground_truth[int(sample_data[i]["timestamp"])] = points

self.__sub_pointcloud = self.create_subscription(
PointCloud2,
self.eval_target_topic,
self.annotated_pcd_eval_cb,
qos_profile_sensor_data,
)
elif eval_condition.Method == "annotated_rosbag":
# rosbag (AWSIM) eval mode

self.__sub_gt_cloud = message_filters.Subscriber(
self,
PointCloud2,
"/sensing/lidar/concatenated/pointcloud",
qos_profile=qos_profile_sensor_data,
)
self.__sub_eval_target_cloud = message_filters.Subscriber(
self,
PointCloud2,
self.eval_target_topic,
qos_profile=qos_profile_sensor_data,
)
self.__sync_sub = message_filters.TimeSynchronizer(
[self.__sub_gt_cloud, self.__sub_eval_target_cloud],
1000,
)
self.__sync_sub.registerCallback(self.annotated_rosbag_eval_cb)
else:
err = 'The "Method" field must be set to either "annotated_rosbag" or "annotated_pcd"'
raise ValueError(err)
sample_data_path = Path(self._t4_dataset_paths[0], "annotation", "sample_data.json")
sample_data = json.load(sample_data_path.open())
sample_data = list(filter(lambda d: d["filename"].split(".")[-2] == "pcd", sample_data))

# load gt annotation data
lidarseg_json_path = Path(self._t4_dataset_paths[0], "annotation", "lidarseg.json")

Check warning on line 58 in driving_log_replayer_v2/scripts/ground_segmentation_evaluator_node.py

View workflow job for this annotation

GitHub Actions / spell-check

Unknown word (lidarseg)

Check warning on line 58 in driving_log_replayer_v2/scripts/ground_segmentation_evaluator_node.py

View workflow job for this annotation

GitHub Actions / spell-check

Unknown word (lidarseg)
lidarseg_data = json.load(lidarseg_json_path.open())

Check warning on line 59 in driving_log_replayer_v2/scripts/ground_segmentation_evaluator_node.py

View workflow job for this annotation

GitHub Actions / spell-check

Unknown word (lidarseg)

Check warning on line 59 in driving_log_replayer_v2/scripts/ground_segmentation_evaluator_node.py

View workflow job for this annotation

GitHub Actions / spell-check

Unknown word (lidarseg)
token_to_seg_data = {}
for annotation_data in lidarseg_data:

Check warning on line 61 in driving_log_replayer_v2/scripts/ground_segmentation_evaluator_node.py

View workflow job for this annotation

GitHub Actions / spell-check

Unknown word (lidarseg)
token_to_seg_data[annotation_data["sample_data_token"]] = annotation_data

self.ground_truth: dict[
int, tuple[np.ndarray, np.ndarray]
] = {} # timestamp: (points, labels)
for i in range(len(sample_data)):
raw_points_file_path = Path(
self._t4_dataset_paths[0],
sample_data[i]["filename"],
).as_posix()
raw_points = np.fromfile(raw_points_file_path, dtype=np.float32)
token = sample_data[i]["token"]

if token not in token_to_seg_data:
continue
annotation_file_path = Path(
self._t4_dataset_paths[0], token_to_seg_data[token]["filename"]
).as_posix()
labels = np.fromfile(annotation_file_path, dtype=np.uint8)

points: np.ndarray = raw_points.reshape((-1, self.CLOUD_DIM))

self.ground_truth[int(sample_data[i]["timestamp"])] = (points, labels)

self.__sub_pointcloud = self.create_subscription(
PointCloud2,
self.eval_target_topic,
self.annotated_pcd_eval_cb,
qos_profile_sensor_data,
)

def annotated_pcd_eval_cb(self, msg: PointCloud2) -> None:
unix_time: int = eval_conversions.unix_time_from_ros_msg(msg.header)
Expand All @@ -106,7 +99,8 @@

# get ground truth pointcloud in this frame
# construct kd-tree from gt cloud
gt_frame_cloud: np.ndarray = self.ground_truth[gt_frame_ts]
gt_frame_cloud: np.ndarray = self.ground_truth[gt_frame_ts][0]
gt_frame_label: np.ndarray = self.ground_truth[gt_frame_ts][1]
kdtree = cKDTree(gt_frame_cloud[:, 0:3])

# convert ros2 pointcloud to numpy
Expand All @@ -116,16 +110,21 @@
pointcloud[:, 1] = numpy_pcd["y"]
pointcloud[:, 2] = numpy_pcd["z"]

assert gt_frame_cloud.shape[0] == gt_frame_label.shape[0], (
"ground truth cloud and label size mismatch"
)

# count TP+FN, TN+FP
tp_fn = np.count_nonzero(gt_frame_cloud[:, 5] == self.ground_label)
fp_tn = np.count_nonzero(gt_frame_cloud[:, 5] == self.obstacle_label)
tp_fn = np.count_nonzero(gt_frame_label[:] == self.ground_label)
fp_tn = np.count_nonzero(gt_frame_label[:] == self.obstacle_label)

tn: int = 0
fn: int = 0
for p in pointcloud:
_, idx = kdtree.query(p, k=1)
if gt_frame_cloud[idx][5] == self.ground_label:
if gt_frame_label[idx] == self.ground_label:
fn += 1
elif gt_frame_cloud[idx][5] == self.obstacle_label:
elif gt_frame_label[idx] == self.obstacle_label:
tn += 1
tp = tp_fn - fn
fp = fp_tn - tn
Expand All @@ -148,46 +147,6 @@
self._result.set_frame(frame_result)
self._result_writer.write_result(self._result)

def annotated_rosbag_eval_cb(
self,
gt_cloud_msg: PointCloud2,
eval_target_cloud_msg: PointCloud2,
) -> None:
np_gt_cloud: np.array = ros2_numpy.numpify(gt_cloud_msg)
np_target_cloud: np.array = ros2_numpy.numpify(eval_target_cloud_msg)

# guard
if (
"entity_id" not in np_gt_cloud.dtype.fields
or "entity_id" not in np_target_cloud.dtype.fields
):
self.get_logger().warn('The input PointCloud doesn\'t have a field named "entity_id"')
return

tp_fn = np.count_nonzero(np_gt_cloud["entity_id"] == self.ground_label)
tn_fp = np_gt_cloud.size - tp_fn
fn = np.count_nonzero(np_target_cloud["entity_id"] == self.ground_label)
tn = np_target_cloud.size - fn

tp = tp_fn - fn
fp = tn_fp - tn
self.get_logger().info(f"TP+FN = {tp_fn}, TN+FP = {tn_fp}")
self.get_logger().info(f"TP {tp}, FP {fp}, TN {tn}, FN {fn}")
metrics_list = self.__compute_metrics(tp, fp, tn, fn)
frame_result = GroundSegmentationEvalResult()
frame_result.tp = tp
frame_result.fp = fp
frame_result.tn = tn
frame_result.fn = fn
frame_result.accuracy = metrics_list[0]
frame_result.precision = metrics_list[1]
frame_result.recall = metrics_list[2]
frame_result.specificity = metrics_list[3]
frame_result.f1_score = metrics_list[4]

self._result.set_frame(frame_result)
self._result_writer.write_result(self._result)

def __get_gt_frame_ts(self, unix_time: int) -> int:
ts_itr = iter(self.ground_truth.keys())
ret_ts: int = int(next(ts_itr))
Expand Down
Loading